• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.18
/dozer-cli/src/simple/executor.rs
1
use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint;
2
use dozer_cache::dozer_log::camino::Utf8Path;
3
use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
4
use dozer_cache::dozer_log::replication::Log;
5
use dozer_core::checkpoint::{CheckpointOptions, OptionCheckpoint};
6
use dozer_tracing::LabelsAndProgress;
7
use dozer_types::models::api_endpoint::ApiEndpoint;
8
use dozer_types::models::flags::Flags;
9
use tokio::runtime::Runtime;
10
use tokio::sync::Mutex;
11

12
use std::sync::{atomic::AtomicBool, Arc};
13

14
use dozer_types::models::source::Source;
15
use dozer_types::models::udf_config::UdfConfig;
16

17
use crate::pipeline::PipelineBuilder;
18
use crate::shutdown::ShutdownReceiver;
19
use dozer_core::executor::{DagExecutor, ExecutorOptions};
20

21
use dozer_types::models::connection::Connection;
22

23
use crate::errors::{BuildError, OrchestrationError};
24

25
use super::Contract;
26

27
pub struct Executor<'a> {
28
    connections: &'a [Connection],
29
    sources: &'a [Source],
30
    sql: Option<&'a str>,
31
    checkpoint: OptionCheckpoint,
32
    /// `ApiEndpoint` and its log.
33
    endpoint_and_logs: Vec<(ApiEndpoint, LogEndpoint)>,
34
    labels: LabelsAndProgress,
35
    udfs: &'a [UdfConfig],
36
}
37

38
impl<'a> Executor<'a> {
39
    // TODO: Refactor this to not require both `contract` and all of
40
    // connections, sources and sql
41
    #[allow(clippy::too_many_arguments)]
42
    pub async fn new(
30✔
43
        home_dir: &'a HomeDir,
30✔
44
        contract: &Contract,
30✔
45
        connections: &'a [Connection],
30✔
46
        sources: &'a [Source],
30✔
47
        sql: Option<&'a str>,
30✔
48
        api_endpoints: &'a [ApiEndpoint],
30✔
49
        checkpoint_options: CheckpointOptions,
30✔
50
        labels: LabelsAndProgress,
30✔
51
        udfs: &'a [UdfConfig],
30✔
52
    ) -> Result<Executor<'a>, OrchestrationError> {
30✔
53
        // Find the build path.
54
        let build_path = home_dir
30✔
55
            .find_latest_build_path()
30✔
56
            .map_err(|(path, error)| OrchestrationError::FileSystem(path.into(), error))?
30✔
57
            .ok_or(OrchestrationError::NoBuildFound)?;
30✔
58

59
        // Load pipeline checkpoint.
60
        let checkpoint =
30✔
61
            OptionCheckpoint::new(build_path.data_dir.to_string(), checkpoint_options).await?;
30✔
62

63
        let mut endpoint_and_logs = vec![];
30✔
64
        for endpoint in api_endpoints {
60✔
65
            let log_endpoint = create_log_endpoint(
30✔
66
                contract,
30✔
67
                &build_path,
30✔
68
                &endpoint.name,
30✔
69
                &checkpoint,
30✔
70
                checkpoint.num_slices(),
30✔
71
            )
30✔
72
            .await?;
30✔
73
            endpoint_and_logs.push((endpoint.clone(), log_endpoint));
30✔
74
        }
75

76
        Ok(Executor {
30✔
77
            connections,
30✔
78
            sources,
30✔
79
            sql,
30✔
80
            checkpoint,
30✔
81
            endpoint_and_logs,
30✔
82
            labels,
30✔
83
            udfs,
30✔
84
        })
30✔
85
    }
30✔
86

87
    pub fn endpoint_and_logs(&self) -> &[(ApiEndpoint, LogEndpoint)] {
30✔
88
        &self.endpoint_and_logs
30✔
89
    }
30✔
90

91
    pub async fn create_dag_executor(
30✔
92
        self,
30✔
93
        runtime: &Arc<Runtime>,
30✔
94
        executor_options: ExecutorOptions,
30✔
95
        shutdown: ShutdownReceiver,
30✔
96
        flags: Flags,
30✔
97
    ) -> Result<DagExecutor, OrchestrationError> {
30✔
98
        let builder = PipelineBuilder::new(
30✔
99
            self.connections,
30✔
100
            self.sources,
30✔
101
            self.sql,
30✔
102
            self.endpoint_and_logs
30✔
103
                .into_iter()
30✔
104
                .map(|(endpoint, log)| (endpoint, Some(log.log)))
30✔
105
                .collect(),
30✔
106
            self.labels.clone(),
30✔
107
            flags,
30✔
108
            self.udfs,
30✔
109
        );
30✔
110

111
        let dag = builder.build(runtime, shutdown).await?;
120✔
112
        let exec = DagExecutor::new(dag, self.checkpoint, executor_options).await?;
30✔
113

114
        Ok(exec)
30✔
115
    }
30✔
116
}
117

118
pub fn run_dag_executor(
30✔
119
    dag_executor: DagExecutor,
30✔
120
    running: Arc<AtomicBool>,
30✔
121
    labels: LabelsAndProgress,
30✔
122
) -> Result<(), OrchestrationError> {
30✔
123
    let join_handle = dag_executor.start(running, labels)?;
30✔
124
    join_handle
30✔
125
        .join()
30✔
126
        .map_err(OrchestrationError::ExecutionError)
30✔
127
}
30✔
128

129
async fn create_log_endpoint(
30✔
130
    contract: &Contract,
30✔
131
    build_path: &BuildPath,
30✔
132
    endpoint_name: &str,
30✔
133
    checkpoint: &OptionCheckpoint,
30✔
134
    num_persisted_entries_to_keep: usize,
30✔
135
) -> Result<LogEndpoint, OrchestrationError> {
30✔
136
    let endpoint_path = build_path.get_endpoint_path(endpoint_name);
30✔
137

138
    let schema = contract
30✔
139
        .endpoints
30✔
140
        .get(endpoint_name)
30✔
141
        .ok_or_else(|| BuildError::MissingEndpoint(endpoint_name.to_owned()))?;
30✔
142
    let schema_string =
30✔
143
        dozer_types::serde_json::to_string(schema).map_err(BuildError::SerdeJson)?;
30✔
144

145
    let descriptor_bytes = tokio::fs::read(&build_path.descriptor_path)
30✔
146
        .await
30✔
147
        .map_err(|e| {
30✔
148
            OrchestrationError::FileSystem(build_path.descriptor_path.clone().into(), e)
×
149
        })?;
30✔
150

151
    let log_prefix = AsRef::<Utf8Path>::as_ref(checkpoint.prefix())
30✔
152
        .join(&endpoint_path.log_dir_relative_to_data_dir);
30✔
153
    let log = Log::new(
30✔
154
        checkpoint.storage(),
30✔
155
        log_prefix.into(),
30✔
156
        num_persisted_entries_to_keep,
30✔
157
    )
30✔
158
    .await?;
×
159
    let log = Arc::new(Mutex::new(log));
30✔
160

30✔
161
    Ok(LogEndpoint {
30✔
162
        build_id: build_path.id.clone(),
30✔
163
        schema_string,
30✔
164
        descriptor_bytes,
30✔
165
        log,
30✔
166
    })
30✔
167
}
30✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc