• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5923086724

21 Aug 2023 07:05AM UTC coverage: 74.763% (-1.2%) from 75.988%
5923086724

push

github

web-flow
chore: Remove short form of `enable_progress` because it's conflicting with `dozer cloud` (#1876)

46105 of 61668 relevant lines covered (74.76%)

39792.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.24
/dozer-cli/src/simple/executor.rs
1
use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint;
2
use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
3
use dozer_cache::dozer_log::replication::{Log, LogOptions};
4
use dozer_types::models::api_endpoint::ApiEndpoint;
5
use tokio::runtime::Runtime;
6
use tokio::sync::Mutex;
7

8
use std::sync::Arc;
9

10
use dozer_types::models::source::Source;
11

12
use crate::pipeline::PipelineBuilder;
13
use crate::shutdown::ShutdownReceiver;
14
use dozer_core::executor::{DagExecutor, ExecutorOptions};
15

16
use dozer_types::indicatif::MultiProgress;
17

18
use dozer_types::models::connection::Connection;
19
use OrchestrationError::ExecutionError;
20

21
use crate::errors::OrchestrationError;
22

23
pub struct Executor<'a> {
24
    connections: &'a [Connection],
25
    sources: &'a [Source],
26
    sql: Option<&'a str>,
27
    /// `ApiEndpoint` and its log.
28
    endpoint_and_logs: Vec<(ApiEndpoint, LogEndpoint)>,
29
    multi_pb: MultiProgress,
30
}
31

32
impl<'a> Executor<'a> {
33
    pub async fn new(
6✔
34
        home_dir: &'a HomeDir,
6✔
35
        connections: &'a [Connection],
6✔
36
        sources: &'a [Source],
6✔
37
        sql: Option<&'a str>,
6✔
38
        api_endpoints: &'a [ApiEndpoint],
6✔
39
        log_options: LogOptions,
6✔
40
        multi_pb: MultiProgress,
6✔
41
    ) -> Result<Executor<'a>, OrchestrationError> {
6✔
42
        let build_path = home_dir
6✔
43
            .find_latest_build_path()
6✔
44
            .map_err(|(path, error)| OrchestrationError::FileSystem(path.into(), error))?
6✔
45
            .ok_or(OrchestrationError::NoBuildFound)?;
6✔
46
        let mut endpoint_and_logs = vec![];
6✔
47
        for endpoint in api_endpoints {
12✔
48
            let log_endpoint =
6✔
49
                create_log_endpoint(&build_path, &endpoint.name, log_options.clone()).await?;
30✔
50
            endpoint_and_logs.push((endpoint.clone(), log_endpoint));
6✔
51
        }
52

×
53
        Ok(Executor {
6✔
54
            connections,
6✔
55
            sources,
6✔
56
            sql,
6✔
57
            endpoint_and_logs,
6✔
58
            multi_pb,
6✔
59
        })
6✔
60
    }
6✔
61

×
62
    pub fn endpoint_and_logs(&self) -> &[(ApiEndpoint, LogEndpoint)] {
6✔
63
        &self.endpoint_and_logs
6✔
64
    }
6✔
65

×
66
    pub async fn create_dag_executor(
6✔
67
        &self,
6✔
68
        runtime: &Arc<Runtime>,
6✔
69
        executor_options: ExecutorOptions,
6✔
70
        shutdown: ShutdownReceiver,
6✔
71
    ) -> Result<DagExecutor, OrchestrationError> {
6✔
72
        let builder = PipelineBuilder::new(
6✔
73
            self.connections,
6✔
74
            self.sources,
6✔
75
            self.sql,
6✔
76
            self.endpoint_and_logs
6✔
77
                .iter()
6✔
78
                .map(|(endpoint, log)| (endpoint.clone(), Some(log.log.clone())))
6✔
79
                .collect(),
6✔
80
            self.multi_pb.clone(),
6✔
81
        );
6✔
82

×
83
        let dag = builder.build(runtime, shutdown).await?;
24✔
84
        let exec = DagExecutor::new(dag, executor_options)?;
6✔
85

×
86
        Ok(exec)
6✔
87
    }
6✔
88
}
×
89

×
90
pub fn run_dag_executor(
6✔
91
    dag_executor: DagExecutor,
6✔
92
    shutdown: ShutdownReceiver,
6✔
93
) -> Result<(), OrchestrationError> {
6✔
94
    let join_handle = dag_executor.start(shutdown.get_running_flag())?;
6✔
95
    join_handle.join().map_err(ExecutionError)
6✔
96
}
6✔
97

98
async fn create_log_endpoint(
6✔
99
    build_path: &BuildPath,
6✔
100
    endpoint_name: &str,
6✔
101
    log_options: LogOptions,
6✔
102
) -> Result<LogEndpoint, OrchestrationError> {
6✔
103
    let endpoint_path = build_path.get_endpoint_path(endpoint_name);
6✔
104

105
    let schema_string = tokio::fs::read_to_string(&endpoint_path.schema_path)
6✔
106
        .await
6✔
107
        .map_err(|e| OrchestrationError::FileSystem(endpoint_path.schema_path.into(), e))?;
6✔
108

×
109
    let descriptor_bytes = tokio::fs::read(&build_path.descriptor_path)
6✔
110
        .await
6✔
111
        .map_err(|e| {
6✔
112
            OrchestrationError::FileSystem(build_path.descriptor_path.clone().into(), e)
×
113
        })?;
6✔
114

115
    let log = Log::new(log_options, endpoint_path.log_dir.into_string(), false).await?;
18✔
116
    let log = Arc::new(Mutex::new(log));
6✔
117

6✔
118
    Ok(LogEndpoint {
6✔
119
        build_id: build_path.id.clone(),
6✔
120
        schema_string,
6✔
121
        descriptor_bytes,
6✔
122
        log,
6✔
123
    })
6✔
124
}
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc