• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5954012408

23 Aug 2023 04:32PM UTC coverage: 75.86% (-0.2%) from 76.088%
5954012408

push

github

web-flow
chore: Move ContractService implementation to Contract (#1899)

* chore: Split `build.rs` to several files

* chore: Remove `serde` from `dozer-cli/Cargo.toml`

* chore: Move `ContractService` implementation to `Contract`

461 of 461 new or added lines in 8 files covered. (100.0%)

46996 of 61951 relevant lines covered (75.86%)

73804.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/state.rs
1
use std::{sync::Arc, thread::JoinHandle};
2

3
use clap::Parser;
4
use dozer_core::{app::AppPipeline, dag_schemas::DagSchemas, Dag};
5
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
6
use dozer_types::{
7
    grpc_types::{
8
        contract::{DotResponse, SchemasResponse},
9
        live::{ConnectResponse, LiveApp, LiveResponse, RunRequest},
10
    },
11
    indicatif::MultiProgress,
12
    log::info,
13
    models::{
14
        api_config::{ApiConfig, AppGrpcOptions},
15
        api_endpoint::ApiEndpoint,
16
        telemetry::{TelemetryConfig, TelemetryMetricsConfig},
17
    },
18
};
19
use tokio::{runtime::Runtime, sync::RwLock};
20

21
use crate::{
22
    cli::{init_dozer, types::Cli},
23
    errors::OrchestrationError,
24
    pipeline::PipelineBuilder,
25
    shutdown::{self, ShutdownReceiver, ShutdownSender},
26
    simple::{helper::validate_config, Contract, SimpleOrchestrator},
27
};
28

29
use super::{progress::progress_stream, LiveError};
30

31
struct DozerAndContract {
32
    dozer: SimpleOrchestrator,
33
    contract: Option<Contract>,
34
}
35

36
pub struct LiveState {
37
    dozer: RwLock<Option<DozerAndContract>>,
38
    run_thread: RwLock<Option<ShutdownSender>>,
39
    error_message: RwLock<Option<String>>,
40
    sender: RwLock<Option<tokio::sync::broadcast::Sender<ConnectResponse>>>,
41
}
42

43
impl LiveState {
44
    pub fn new() -> Self {
×
45
        Self {
×
46
            dozer: RwLock::new(None),
×
47
            run_thread: RwLock::new(None),
×
48
            sender: RwLock::new(None),
×
49
            error_message: RwLock::new(None),
×
50
        }
×
51
    }
×
52

53
    async fn create_contract_if_missing(&self) -> Result<(), LiveError> {
×
54
        let mut dozer_and_contract_lock = self.dozer.write().await;
×
55
        if let Some(dozer_and_contract) = dozer_and_contract_lock.as_mut() {
×
56
            if dozer_and_contract.contract.is_none() {
×
57
                let dag = create_dag(&dozer_and_contract.dozer).await?;
×
58
                let schemas = DagSchemas::new(dag)?;
×
59
                let contract = Contract::new(
×
60
                    &schemas,
×
61
                    &dozer_and_contract.dozer.config.endpoints,
×
62
                    // We don't care about API generation options here. They are handled in `run_all`.
×
63
                    false,
×
64
                    true,
×
65
                )?;
×
66
                dozer_and_contract.contract = Some(contract);
×
67
            }
×
68
        }
×
69
        Ok(())
×
70
    }
×
71

×
72
    pub async fn set_sender(&self, sender: tokio::sync::broadcast::Sender<ConnectResponse>) {
×
73
        *self.sender.write().await = Some(sender);
×
74
    }
×
75

×
76
    pub async fn broadcast(&self) {
×
77
        let sender = self.sender.read().await;
×
78
        info!("broadcasting current state");
×
79
        if let Some(sender) = sender.as_ref() {
×
80
            let res = self.get_current().await;
×
81
            // Ignore broadcast error.
×
82
            let _ = sender.send(ConnectResponse {
×
83
                live: Some(res),
×
84
                progress: None,
×
85
            });
×
86
        }
×
87
    }
×
88

89
    pub async fn set_dozer(&self, dozer: Option<SimpleOrchestrator>) {
×
90
        *self.dozer.write().await = dozer.map(|dozer| DozerAndContract {
×
91
            dozer,
×
92
            contract: None,
×
93
        });
×
94
    }
×
95

96
    pub async fn set_error_message(&self, error_message: Option<String>) {
×
97
        *self.error_message.write().await = error_message;
×
98
    }
×
99

×
100
    pub async fn build(&self, runtime: Arc<Runtime>) -> Result<(), LiveError> {
×
101
        // Taking lock to ensure that we don't have multiple builds running at the same time
×
102
        let mut lock = self.dozer.write().await;
×
103

×
104
        let cli = Cli::parse();
×
105

×
106
        let dozer = init_dozer(
×
107
            runtime,
×
108
            cli.config_paths.clone(),
×
109
            cli.config_token.clone(),
×
110
            cli.config_overrides.clone(),
×
111
            cli.ignore_pipe,
×
112
            false,
×
113
        )
×
114
        .await?;
×
115

×
116
        *lock = Some(DozerAndContract {
×
117
            dozer,
×
118
            contract: None,
×
119
        });
×
120
        Ok(())
×
121
    }
×
122
    pub async fn get_current(&self) -> LiveResponse {
×
123
        let dozer = self.dozer.read().await;
×
124
        let app = dozer.as_ref().map(|dozer| {
×
125
            let connections = dozer
×
126
                .dozer
×
127
                .config
×
128
                .connections
×
129
                .iter()
×
130
                .map(|c| c.name.clone())
×
131
                .collect();
×
132
            let endpoints = dozer
×
133
                .dozer
×
134
                .config
×
135
                .endpoints
×
136
                .iter()
×
137
                .map(|c| c.name.clone())
×
138
                .collect();
×
139
            LiveApp {
×
140
                app_name: dozer.dozer.config.app_name.clone(),
×
141
                connections,
×
142
                endpoints,
×
143
            }
×
144
        });
×
145

×
146
        LiveResponse {
×
147
            initialized: app.is_some(),
×
148
            running: self.run_thread.read().await.is_some(),
×
149
            error_message: self.error_message.read().await.as_ref().cloned(),
×
150
            app,
×
151
        }
×
152
    }
×
153

×
154
    pub async fn get_endpoints_schemas(&self) -> Result<SchemasResponse, LiveError> {
×
155
        self.create_contract_if_missing().await?;
×
156
        let dozer = self.dozer.read().await;
×
157
        let contract = get_contract(&dozer)?;
×
158

×
159
        Ok(SchemasResponse {
×
160
            schemas: contract.get_endpoints_schemas(),
×
161
        })
×
162
    }
×
163
    pub async fn get_source_schemas(
×
164
        &self,
×
165
        connection_name: String,
×
166
    ) -> Result<SchemasResponse, LiveError> {
×
167
        self.create_contract_if_missing().await?;
×
168
        let dozer = self.dozer.read().await;
×
169
        let contract = get_contract(&dozer)?;
×
170

×
171
        contract
×
172
            .get_source_schemas(&connection_name)
×
173
            .ok_or(LiveError::ConnectionNotFound(connection_name))
×
174
            .map(|schemas| SchemasResponse { schemas })
×
175
    }
×
176

×
177
    pub async fn get_graph_schemas(&self) -> Result<SchemasResponse, LiveError> {
×
178
        self.create_contract_if_missing().await?;
×
179
        let dozer = self.dozer.read().await;
×
180
        let contract = get_contract(&dozer)?;
×
181

×
182
        Ok(SchemasResponse {
×
183
            schemas: contract.get_graph_schemas(),
×
184
        })
×
185
    }
×
186

×
187
    pub async fn generate_dot(&self) -> Result<DotResponse, LiveError> {
×
188
        self.create_contract_if_missing().await?;
×
189
        let dozer = self.dozer.read().await;
×
190
        let contract = get_contract(&dozer)?;
×
191

×
192
        Ok(DotResponse {
×
193
            dot: contract.generate_dot(),
×
194
        })
×
195
    }
×
196

×
197
    pub async fn run(&self, request: RunRequest) -> Result<(), LiveError> {
×
198
        let dozer = self.dozer.read().await;
×
199
        let dozer = &dozer.as_ref().ok_or(LiveError::NotInitialized)?.dozer;
×
200

201
        // kill if a handle already exists
×
202
        self.stop().await?;
×
203

204
        let (shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
×
205
        let metrics_shutdown = shutdown_receiver.clone();
×
206
        let _handle = run(dozer.clone(), request, shutdown_receiver)?;
×
207

208
        // Initialize progress
209
        let metrics_sender = self.sender.read().await.as_ref().unwrap().clone();
×
210
        tokio::spawn(async {
×
211
            progress_stream(metrics_sender, metrics_shutdown)
×
212
                .await
×
213
                .unwrap()
×
214
        });
×
215

216
        let mut lock = self.run_thread.write().await;
×
217
        *lock = Some(shutdown_sender);
×
218

×
219
        Ok(())
×
220
    }
×
221

×
222
    pub async fn stop(&self) -> Result<(), LiveError> {
×
223
        let mut lock = self.run_thread.write().await;
×
224
        if let Some(shutdown) = lock.take() {
×
225
            shutdown.shutdown()
×
226
        }
×
227
        *lock = None;
×
228
        Ok(())
×
229
    }
×
230
}
×
231

×
232
fn get_contract(dozer_and_contract: &Option<DozerAndContract>) -> Result<&Contract, LiveError> {
×
233
    dozer_and_contract
×
234
        .as_ref()
×
235
        .ok_or(LiveError::NotInitialized)?
×
236
        .contract
×
237
        .as_ref()
×
238
        .ok_or(LiveError::NotInitialized)
×
239
}
×
240

×
241
async fn create_dag(
×
242
    dozer: &SimpleOrchestrator,
×
243
) -> Result<Dag<SchemaSQLContext>, OrchestrationError> {
×
244
    let endpoint_and_logs = dozer
×
245
        .config
×
246
        .endpoints
×
247
        .iter()
×
248
        // We're not really going to run the pipeline, so we don't create logs.
×
249
        .map(|endpoint| (endpoint.clone(), None))
×
250
        .collect();
×
251
    let builder = PipelineBuilder::new(
×
252
        &dozer.config.connections,
×
253
        &dozer.config.sources,
×
254
        dozer.config.sql.as_deref(),
×
255
        endpoint_and_logs,
×
256
        MultiProgress::new(),
×
257
    );
×
258
    let (_shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
×
259
    builder.build(&dozer.runtime, shutdown_receiver).await
×
260
}
×
261

×
262
fn run(
×
263
    dozer: SimpleOrchestrator,
×
264
    request: RunRequest,
×
265
    shutdown_receiver: ShutdownReceiver,
×
266
) -> Result<JoinHandle<()>, OrchestrationError> {
×
267
    let mut dozer = get_dozer_run_instance(dozer, request)?;
×
268

×
269
    validate_config(&dozer.config)?;
×
270

×
271
    let runtime = dozer.runtime.clone();
×
272
    let run_thread = std::thread::spawn(move || {
×
273
        dozer.build(true, shutdown_receiver.clone()).unwrap();
×
274
        dozer.run_all(shutdown_receiver)
×
275
    });
×
276

×
277
    let handle = std::thread::spawn(move || {
×
278
        runtime.block_on(async {
×
279
            run_thread.join().unwrap().unwrap();
×
280
        });
×
281
    });
×
282

×
283
    Ok(handle)
×
284
}
×
285

×
286
fn get_dozer_run_instance(
×
287
    mut dozer: SimpleOrchestrator,
×
288
    req: RunRequest,
×
289
) -> Result<SimpleOrchestrator, LiveError> {
×
290
    match req.request {
×
291
        Some(dozer_types::grpc_types::live::run_request::Request::Sql(req)) => {
×
292
            let context = statement_to_pipeline(&req.sql, &mut AppPipeline::new(), None)
×
293
                .map_err(LiveError::PipelineError)?;
×
294

×
295
            //overwrite sql
×
296
            dozer.config.sql = Some(req.sql);
×
297

×
298
            dozer.config.endpoints = vec![];
×
299
            let endpoints = context.output_tables_map.keys().collect::<Vec<_>>();
×
300
            for endpoint in endpoints {
×
301
                let endpoint = ApiEndpoint {
×
302
                    name: endpoint.to_string(),
×
303
                    table_name: endpoint.to_string(),
×
304
                    path: format!("/{}", endpoint),
×
305
                    ..Default::default()
×
306
                };
×
307
                dozer.config.endpoints.push(endpoint);
×
308
            }
×
309
        }
×
310
        Some(dozer_types::grpc_types::live::run_request::Request::Source(req)) => {
×
311
            dozer.config.sql = None;
×
312
            dozer.config.endpoints = vec![];
×
313
            let endpoint = req.source;
×
314
            dozer.config.endpoints.push(ApiEndpoint {
×
315
                name: endpoint.to_string(),
×
316
                table_name: endpoint.to_string(),
×
317
                path: format!("/{}", endpoint),
×
318
                ..Default::default()
×
319
            });
×
320
        }
×
321
        None => {}
×
322
    };
323

×
324
    dozer.config.api = Some(ApiConfig {
×
325
        app_grpc: Some(AppGrpcOptions {
×
326
            port: 5678,
×
327
            host: "0.0.0.0".to_string(),
×
328
        }),
×
329

×
330
        ..Default::default()
×
331
    });
×
332

×
333
    dozer.config.home_dir = tempdir::TempDir::new("live")
×
334
        .unwrap()
×
335
        .into_path()
×
336
        .to_string_lossy()
×
337
        .to_string();
×
338

×
339
    dozer.config.telemetry = Some(TelemetryConfig {
×
340
        trace: None,
×
341
        metrics: Some(TelemetryMetricsConfig::Prometheus(())),
×
342
    });
×
343

×
344
    Ok(dozer)
×
345
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc