• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6011532274

29 Aug 2023 11:17AM UTC coverage: 76.491% (-0.1%) from 76.616%
6011532274

push

github

web-flow
fix: Include connection type in `GenerateDot`. Fix `AggregationProcessorFactory::type_name` (#1934)

170 of 170 new or added lines in 5 files covered. (100.0%)

49016 of 64081 relevant lines covered (76.49%)

48200.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/state.rs
1
use std::{sync::Arc, thread::JoinHandle};
2

3
use clap::Parser;
4
use dozer_cache::dozer_log::camino::Utf8Path;
5
use dozer_core::{app::AppPipeline, dag_schemas::DagSchemas, Dag};
6
use dozer_sql::pipeline::builder::statement_to_pipeline;
7
use dozer_types::{
8
    grpc_types::{
9
        contract::{DotResponse, SchemasResponse},
10
        live::{BuildResponse, BuildStatus, ConnectResponse, LiveApp, LiveResponse, RunRequest},
11
    },
12
    indicatif::MultiProgress,
13
    log::info,
14
    models::{
15
        api_config::{ApiConfig, AppGrpcOptions},
16
        api_endpoint::ApiEndpoint,
17
        flags::Flags,
18
    },
19
};
20
use tokio::{runtime::Runtime, sync::RwLock};
21

22
use crate::{
23
    cli::{init_dozer, types::Cli},
24
    errors::OrchestrationError,
25
    pipeline::PipelineBuilder,
26
    shutdown::{self, ShutdownReceiver, ShutdownSender},
27
    simple::{helper::validate_config, Contract, SimpleOrchestrator},
28
};
29

30
use super::{progress::progress_stream, LiveError};
31

32
struct DozerAndContract {
33
    dozer: SimpleOrchestrator,
34
    contract: Option<Contract>,
35
}
36

37
#[derive(Debug)]
×
38
pub enum BroadcastType {
39
    Start,
40
    Success,
41
    Failed(String),
42
}
43

44
pub struct LiveState {
45
    dozer: RwLock<Option<DozerAndContract>>,
46
    run_thread: RwLock<Option<ShutdownSender>>,
47
    error_message: RwLock<Option<String>>,
48
    sender: RwLock<Option<tokio::sync::broadcast::Sender<ConnectResponse>>>,
49
}
50

51
impl LiveState {
52
    pub fn new() -> Self {
×
53
        Self {
×
54
            dozer: RwLock::new(None),
×
55
            run_thread: RwLock::new(None),
×
56
            sender: RwLock::new(None),
×
57
            error_message: RwLock::new(None),
×
58
        }
×
59
    }
×
60

61
    async fn create_contract_if_missing(&self) -> Result<(), LiveError> {
×
62
        let mut dozer_and_contract_lock = self.dozer.write().await;
×
63
        if let Some(dozer_and_contract) = dozer_and_contract_lock.as_mut() {
×
64
            if dozer_and_contract.contract.is_none() {
×
65
                let contract = create_contract(dozer_and_contract.dozer.clone()).await?;
×
66
                dozer_and_contract.contract = Some(contract);
×
67
            }
×
68
        }
×
69
        Ok(())
×
70
    }
×
71

72
    pub async fn set_sender(&self, sender: tokio::sync::broadcast::Sender<ConnectResponse>) {
×
73
        *self.sender.write().await = Some(sender);
×
74
    }
×
75

76
    pub async fn broadcast(&self, broadcast_type: BroadcastType) {
×
77
        let sender = self.sender.read().await;
×
78
        info!("Broadcasting state: {:?}", broadcast_type);
×
79
        if let Some(sender) = sender.as_ref() {
×
80
            let res = match broadcast_type {
×
81
                BroadcastType::Start => ConnectResponse {
×
82
                    live: None,
×
83
                    progress: None,
×
84
                    build: Some(BuildResponse {
×
85
                        status: BuildStatus::BuildStart as i32,
×
86
                        message: None,
×
87
                    }),
×
88
                },
×
89
                BroadcastType::Failed(msg) => ConnectResponse {
×
90
                    live: None,
×
91
                    progress: None,
×
92
                    build: Some(BuildResponse {
×
93
                        status: BuildStatus::BuildFailed as i32,
×
94
                        message: Some(msg),
×
95
                    }),
×
96
                },
×
97
                BroadcastType::Success => {
98
                    let res = self.get_current().await;
×
99
                    ConnectResponse {
×
100
                        live: Some(res),
×
101
                        progress: None,
×
102
                        build: None,
×
103
                    }
×
104
                }
105
            };
106
            let _ = sender.send(res);
×
107
        }
×
108
    }
×
109

110
    pub async fn set_error_message(&self, error_message: Option<String>) {
×
111
        *self.error_message.write().await = error_message;
×
112
    }
×
113

114
    pub async fn build(&self, runtime: Arc<Runtime>) -> Result<(), LiveError> {
×
115
        // Taking lock to ensure that we don't have multiple builds running at the same time
116
        let mut lock = self.dozer.write().await;
×
117

118
        let cli = Cli::parse();
×
119

120
        let dozer = init_dozer(
×
121
            runtime,
×
122
            cli.config_paths.clone(),
×
123
            cli.config_token.clone(),
×
124
            cli.config_overrides.clone(),
×
125
            cli.ignore_pipe,
×
126
            false,
×
127
        )
×
128
        .await?;
×
129

130
        let contract = create_contract(dozer.clone()).await;
×
131
        *lock = Some(DozerAndContract {
×
132
            dozer,
×
133
            contract: match &contract {
×
134
                Ok(contract) => Some(contract.clone()),
×
135
                Err(_) => None,
×
136
            },
137
        });
138
        if let Err(e) = &contract {
×
139
            self.set_error_message(Some(e.to_string())).await;
×
140
        } else {
141
            self.set_error_message(None).await;
×
142
        }
143

144
        contract
×
145
            .map(|_| ())
×
146
            .map_err(|e| LiveError::OrchestrationError(Box::new(e)))
×
147
    }
×
148
    pub async fn get_current(&self) -> LiveResponse {
×
149
        let dozer = self.dozer.read().await;
×
150
        let app = dozer.as_ref().map(|dozer| {
×
151
            let connections = dozer
×
152
                .dozer
×
153
                .config
×
154
                .connections
×
155
                .iter()
×
156
                .map(|c| c.name.clone())
×
157
                .collect();
×
158
            let endpoints = dozer
×
159
                .dozer
×
160
                .config
×
161
                .endpoints
×
162
                .iter()
×
163
                .map(|c| c.name.clone())
×
164
                .collect();
×
165
            LiveApp {
×
166
                app_name: dozer.dozer.config.app_name.clone(),
×
167
                connections,
×
168
                endpoints,
×
169
            }
×
170
        });
×
171

×
172
        LiveResponse {
×
173
            initialized: app.is_some(),
×
174
            running: self.run_thread.read().await.is_some(),
×
175
            error_message: self.error_message.read().await.as_ref().cloned(),
×
176
            app,
×
177
        }
×
178
    }
×
179

180
    pub async fn get_endpoints_schemas(&self) -> Result<SchemasResponse, LiveError> {
×
181
        self.create_contract_if_missing().await?;
×
182
        let dozer = self.dozer.read().await;
×
183
        let contract = get_contract(&dozer)?;
×
184

185
        Ok(SchemasResponse {
×
186
            schemas: contract.get_endpoints_schemas(),
×
187
        })
×
188
    }
×
189
    pub async fn get_source_schemas(
×
190
        &self,
×
191
        connection_name: String,
×
192
    ) -> Result<SchemasResponse, LiveError> {
×
193
        self.create_contract_if_missing().await?;
×
194
        let dozer = self.dozer.read().await;
×
195
        let contract = get_contract(&dozer)?;
×
196

197
        contract
×
198
            .get_source_schemas(&connection_name)
×
199
            .ok_or(LiveError::ConnectionNotFound(connection_name))
×
200
            .map(|schemas| SchemasResponse { schemas })
×
201
    }
×
202

203
    pub async fn get_graph_schemas(&self) -> Result<SchemasResponse, LiveError> {
×
204
        self.create_contract_if_missing().await?;
×
205
        let dozer = self.dozer.read().await;
×
206
        let contract = get_contract(&dozer)?;
×
207

208
        Ok(SchemasResponse {
×
209
            schemas: contract.get_graph_schemas(),
×
210
        })
×
211
    }
×
212

213
    pub async fn generate_dot(&self) -> Result<DotResponse, LiveError> {
×
214
        self.create_contract_if_missing().await?;
×
215
        let dozer = self.dozer.read().await;
×
216
        let contract = get_contract(&dozer)?;
×
217

218
        Ok(DotResponse {
×
219
            dot: contract.generate_dot(),
×
220
        })
×
221
    }
×
222

223
    pub async fn run(&self, request: RunRequest) -> Result<(), LiveError> {
×
224
        let dozer = self.dozer.read().await;
×
225
        let dozer = &dozer.as_ref().ok_or(LiveError::NotInitialized)?.dozer;
×
226

227
        // kill if a handle already exists
228
        self.stop().await?;
×
229

230
        let (shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
×
231
        let metrics_shutdown = shutdown_receiver.clone();
×
232
        let _handle = run(dozer.clone(), request, shutdown_receiver)?;
×
233

234
        // Initialize progress
235
        let metrics_sender = self.sender.read().await.as_ref().unwrap().clone();
×
236
        tokio::spawn(async {
×
237
            progress_stream(metrics_sender, metrics_shutdown)
×
238
                .await
×
239
                .unwrap()
×
240
        });
×
241

242
        let mut lock = self.run_thread.write().await;
×
243
        *lock = Some(shutdown_sender);
×
244

×
245
        Ok(())
×
246
    }
×
247

248
    pub async fn stop(&self) -> Result<(), LiveError> {
×
249
        let mut lock = self.run_thread.write().await;
×
250
        if let Some(shutdown) = lock.take() {
×
251
            shutdown.shutdown()
×
252
        }
×
253
        *lock = None;
×
254
        Ok(())
×
255
    }
×
256
}
257

258
fn get_contract(dozer_and_contract: &Option<DozerAndContract>) -> Result<&Contract, LiveError> {
×
259
    dozer_and_contract
×
260
        .as_ref()
×
261
        .ok_or(LiveError::NotInitialized)?
×
262
        .contract
263
        .as_ref()
×
264
        .ok_or(LiveError::NotInitialized)
×
265
}
×
266

267
async fn create_contract(dozer: SimpleOrchestrator) -> Result<Contract, OrchestrationError> {
×
268
    let dag = create_dag(&dozer).await?;
×
269
    let schemas = DagSchemas::new(dag)?;
×
270
    let contract = Contract::new(
×
271
        &schemas,
×
272
        &dozer.config.connections,
×
273
        &dozer.config.endpoints,
×
274
        // We don't care about API generation options here. They are handled in `run_all`.
×
275
        false,
×
276
        true,
×
277
    )?;
×
278
    Ok(contract)
×
279
}
×
280

×
281
async fn create_dag(dozer: &SimpleOrchestrator) -> Result<Dag, OrchestrationError> {
×
282
    let endpoint_and_logs = dozer
×
283
        .config
×
284
        .endpoints
×
285
        .iter()
×
286
        // We're not really going to run the pipeline, so we don't create logs.
×
287
        .map(|endpoint| (endpoint.clone(), None))
×
288
        .collect();
×
289
    let builder = PipelineBuilder::new(
×
290
        &dozer.config.connections,
×
291
        &dozer.config.sources,
×
292
        dozer.config.sql.as_deref(),
×
293
        endpoint_and_logs,
×
294
        MultiProgress::new(),
×
295
        Flags::default(),
×
296
    );
×
297
    let (_shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
×
298
    builder.build(&dozer.runtime, shutdown_receiver).await
×
299
}
×
300

×
301
fn run(
×
302
    dozer: SimpleOrchestrator,
×
303
    request: RunRequest,
×
304
    shutdown_receiver: ShutdownReceiver,
×
305
) -> Result<JoinHandle<()>, OrchestrationError> {
×
306
    let mut dozer = get_dozer_run_instance(dozer, request)?;
×
307

×
308
    validate_config(&dozer.config)?;
×
309

×
310
    let runtime = dozer.runtime.clone();
×
311
    let run_thread = std::thread::spawn(move || dozer.run_all(shutdown_receiver));
×
312

×
313
    let handle = std::thread::spawn(move || {
×
314
        runtime.block_on(async {
×
315
            run_thread.join().unwrap().unwrap();
×
316
        });
×
317
    });
×
318

×
319
    Ok(handle)
×
320
}
×
321

×
322
fn get_dozer_run_instance(
323
    mut dozer: SimpleOrchestrator,
324
    req: RunRequest,
325
) -> Result<SimpleOrchestrator, LiveError> {
326
    match req.request {
×
327
        Some(dozer_types::grpc_types::live::run_request::Request::Sql(req)) => {
×
328
            let context = statement_to_pipeline(
×
329
                &req.sql,
×
330
                &mut AppPipeline::new(dozer.config.flags.clone().unwrap_or_default().into()),
×
331
                None,
×
332
            )
×
333
            .map_err(LiveError::PipelineError)?;
×
334

×
335
            //overwrite sql
336
            dozer.config.sql = Some(req.sql);
×
337

×
338
            dozer.config.endpoints = vec![];
×
339
            let endpoints = context.output_tables_map.keys().collect::<Vec<_>>();
×
340
            for endpoint in endpoints {
×
341
                let endpoint = ApiEndpoint {
×
342
                    name: endpoint.to_string(),
×
343
                    table_name: endpoint.to_string(),
×
344
                    path: format!("/{}", endpoint),
×
345
                    ..Default::default()
×
346
                };
×
347
                dozer.config.endpoints.push(endpoint);
×
348
            }
×
349
        }
×
350
        Some(dozer_types::grpc_types::live::run_request::Request::Source(req)) => {
×
351
            dozer.config.sql = None;
×
352
            dozer.config.endpoints = vec![];
×
353
            let endpoint = req.source;
×
354
            dozer.config.endpoints.push(ApiEndpoint {
×
355
                name: endpoint.to_string(),
×
356
                table_name: endpoint.to_string(),
×
357
                path: format!("/{}", endpoint),
×
358
                ..Default::default()
×
359
            });
×
360
        }
×
361
        None => {}
×
362
    };
×
363

364
    dozer.config.api = Some(ApiConfig {
×
365
        app_grpc: Some(AppGrpcOptions {
×
366
            port: 5678,
×
367
            host: "0.0.0.0".to_string(),
×
368
        }),
×
369

×
370
        ..Default::default()
×
371
    });
×
372

×
373
    let temp_dir = tempdir::TempDir::new("live").unwrap();
×
374
    let temp_dir = temp_dir.path().to_str().unwrap();
×
375
    dozer.config.home_dir = temp_dir.to_string();
×
376
    dozer.config.cache_dir = AsRef::<Utf8Path>::as_ref(temp_dir).join("cache").into();
×
377

×
378
    Ok(dozer)
×
379
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc