• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5978430793

25 Aug 2023 04:54PM UTC coverage: 75.575% (-0.7%) from 76.279%
5978430793

push

github

web-flow
Bump ordered-float from 3.4.0 to 3.9.1 (#1919)

Bumps [ordered-float](https://github.com/reem/rust-ordered-float) from 3.4.0 to 3.9.1.
- [Release notes](https://github.com/reem/rust-ordered-float/releases)
- [Commits](https://github.com/reem/rust-ordered-float/compare/v3.4.0...v3.9.1)

---
updated-dependencies:
- dependency-name: ordered-float
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

47272 of 62550 relevant lines covered (75.57%)

49425.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.92
/dozer-cli/src/simple/executor.rs
1
use dozer_api::grpc::internal::internal_pipeline_server::LogEndpoint;
2
use dozer_cache::dozer_log::camino::Utf8Path;
3
use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir};
4
use dozer_cache::dozer_log::replication::Log;
5
use dozer_core::checkpoint::{CheckpointFactory, CheckpointFactoryOptions};
6
use dozer_core::processor_record::ProcessorRecordStore;
7
use dozer_types::models::api_endpoint::ApiEndpoint;
8
use dozer_types::models::flags::Flags;
9
use dozer_types::parking_lot::Mutex;
10
use tokio::runtime::Runtime;
11

12
use std::sync::{atomic::AtomicBool, Arc};
13

14
use dozer_types::models::source::Source;
15

16
use crate::pipeline::PipelineBuilder;
17
use crate::shutdown::ShutdownReceiver;
18
use dozer_core::executor::{DagExecutor, ExecutorOptions};
19

20
use dozer_types::indicatif::MultiProgress;
21

22
use dozer_types::models::connection::Connection;
23

24
use crate::errors::OrchestrationError;
25

26
pub struct Executor<'a> {
27
    connections: &'a [Connection],
28
    sources: &'a [Source],
29
    sql: Option<&'a str>,
30
    checkpoint_factory: Arc<CheckpointFactory>,
31
    /// `ApiEndpoint` and its log.
32
    endpoint_and_logs: Vec<(ApiEndpoint, LogEndpoint)>,
33
    multi_pb: MultiProgress,
34
}
35

36
impl<'a> Executor<'a> {
×
37
    pub async fn new(
30✔
38
        home_dir: &'a HomeDir,
30✔
39
        connections: &'a [Connection],
30✔
40
        sources: &'a [Source],
30✔
41
        sql: Option<&'a str>,
30✔
42
        api_endpoints: &'a [ApiEndpoint],
30✔
43
        checkpoint_factory_options: CheckpointFactoryOptions,
30✔
44
        multi_pb: MultiProgress,
30✔
45
    ) -> Result<Executor<'a>, OrchestrationError> {
30✔
46
        // Find the build path.
×
47
        let build_path = home_dir
30✔
48
            .find_latest_build_path()
30✔
49
            .map_err(|(path, error)| OrchestrationError::FileSystem(path.into(), error))?
30✔
50
            .ok_or(OrchestrationError::NoBuildFound)?;
30✔
51

52
        // Load pipeline checkpoint.
×
53
        let record_store = ProcessorRecordStore::new()?;
30✔
54
        let checkpoint_factory = CheckpointFactory::new(
30✔
55
            Arc::new(record_store),
30✔
56
            build_path.data_dir.to_string(),
30✔
57
            checkpoint_factory_options,
30✔
58
        )
30✔
59
        .await?
30✔
60
        .0;
61

×
62
        let mut endpoint_and_logs = vec![];
30✔
63
        for endpoint in api_endpoints {
60✔
64
            let log_endpoint =
30✔
65
                create_log_endpoint(&build_path, &endpoint.name, &checkpoint_factory).await?;
120✔
66
            endpoint_and_logs.push((endpoint.clone(), log_endpoint));
30✔
67
        }
68

×
69
        Ok(Executor {
30✔
70
            connections,
30✔
71
            sources,
30✔
72
            sql,
30✔
73
            checkpoint_factory: Arc::new(checkpoint_factory),
30✔
74
            endpoint_and_logs,
30✔
75
            multi_pb,
30✔
76
        })
30✔
77
    }
30✔
78

×
79
    pub fn endpoint_and_logs(&self) -> &[(ApiEndpoint, LogEndpoint)] {
30✔
80
        &self.endpoint_and_logs
30✔
81
    }
30✔
82

×
83
    pub async fn create_dag_executor(
30✔
84
        &self,
30✔
85
        runtime: &Arc<Runtime>,
30✔
86
        executor_options: ExecutorOptions,
30✔
87
        shutdown: ShutdownReceiver,
30✔
88
        flags: Flags,
30✔
89
    ) -> Result<DagExecutor, OrchestrationError> {
30✔
90
        let builder = PipelineBuilder::new(
30✔
91
            self.connections,
30✔
92
            self.sources,
30✔
93
            self.sql,
30✔
94
            self.endpoint_and_logs
30✔
95
                .iter()
30✔
96
                .map(|(endpoint, log)| (endpoint.clone(), Some(log.log.clone())))
30✔
97
                .collect(),
30✔
98
            self.multi_pb.clone(),
30✔
99
            flags,
30✔
100
        );
30✔
101

102
        let dag = builder.build(runtime, shutdown).await?;
60✔
103
        let exec = DagExecutor::new(dag, self.checkpoint_factory.clone(), executor_options)?;
30✔
104

105
        Ok(exec)
30✔
106
    }
30✔
107
}
×
108

×
109
pub fn run_dag_executor(
30✔
110
    dag_executor: DagExecutor,
30✔
111
    running: Arc<AtomicBool>,
30✔
112
) -> Result<(), OrchestrationError> {
30✔
113
    let join_handle = dag_executor.start(running)?;
30✔
114
    join_handle
30✔
115
        .join()
30✔
116
        .map_err(OrchestrationError::ExecutionError)
30✔
117
}
30✔
118

×
119
async fn create_log_endpoint(
30✔
120
    build_path: &BuildPath,
30✔
121
    endpoint_name: &str,
30✔
122
    checkpoint_factory: &CheckpointFactory,
30✔
123
) -> Result<LogEndpoint, OrchestrationError> {
30✔
124
    let endpoint_path = build_path.get_endpoint_path(endpoint_name);
30✔
125

×
126
    let schema_string = tokio::fs::read_to_string(&endpoint_path.schema_path)
30✔
127
        .await
30✔
128
        .map_err(|e| OrchestrationError::FileSystem(endpoint_path.schema_path.into(), e))?;
30✔
129

×
130
    let descriptor_bytes = tokio::fs::read(&build_path.descriptor_path)
30✔
131
        .await
30✔
132
        .map_err(|e| {
30✔
133
            OrchestrationError::FileSystem(build_path.descriptor_path.clone().into(), e)
×
134
        })?;
30✔
135

×
136
    let log_prefix = AsRef::<Utf8Path>::as_ref(checkpoint_factory.prefix())
30✔
137
        .join(&endpoint_path.log_dir_relative_to_data_dir);
30✔
138
    let log = Log::new(checkpoint_factory.storage(), log_prefix.into(), false).await?;
60✔
139
    let log = Arc::new(Mutex::new(log));
30✔
140

30✔
141
    Ok(LogEndpoint {
30✔
142
        build_id: build_path.id.clone(),
30✔
143
        schema_string,
30✔
144
        descriptor_bytes,
30✔
145
        log,
30✔
146
    })
30✔
147
}
30✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc