• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/simple/executor.rs
1
use dozer_api::grpc::internal_grpc::PipelineResponse;
2
use dozer_core::dag::app::{App, AppPipeline};
3
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
4
use dozer_types::models::app_config::Config;
5
use dozer_types::types::{Operation, SchemaWithChangesType};
6
use std::collections::HashMap;
7
use std::path::PathBuf;
8
use std::sync::atomic::AtomicBool;
9
use std::sync::Arc;
10

11
use dozer_api::CacheEndpoint;
12
use dozer_types::models::source::Source;
13

14
use crate::pipeline::validate::validate;
15
use crate::pipeline::{CacheSinkSettings, PipelineBuilder, StreamingSinkFactory};
16
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
17
use dozer_core::dag::DEFAULT_PORT_HANDLE;
18
use dozer_ingestion::connectors::get_connector;
19

20
use dozer_types::crossbeam;
21

22
use dozer_types::models::connection::Connection;
23
use OrchestrationError::ExecutionError;
24

25
use crate::errors::OrchestrationError;
26
use crate::pipeline::source_builder::{IngestorVec, SourceBuilder};
27

28
pub struct Executor {
29
    config: Config,
30
    cache_endpoints: Vec<CacheEndpoint>,
31
    pipeline_dir: PathBuf,
32
    running: Arc<AtomicBool>,
33
}
34
impl Executor {
35
    pub fn new(
×
36
        config: Config,
×
37
        cache_endpoints: Vec<CacheEndpoint>,
×
38
        running: Arc<AtomicBool>,
×
39
        pipeline_dir: PathBuf,
×
40
    ) -> Self {
×
41
        Self {
×
42
            config,
×
43
            cache_endpoints,
×
44
            pipeline_dir,
×
45
            running,
×
46
        }
×
47
    }
×
48

×
49
    pub fn get_connection_groups(&self) -> HashMap<String, Vec<Source>> {
×
50
        SourceBuilder::group_connections(self.config.sources.clone())
×
51
    }
×
52

×
53
    // This function is used to run a query using a temporary pipeline
×
54
    pub fn query(
×
55
        &self,
×
56
        sql: String,
×
57
        sender: crossbeam::channel::Sender<Operation>,
×
58
    ) -> Result<dozer_core::dag::Dag<SchemaSQLContext>, OrchestrationError> {
×
59
        let grouped_connections = self.get_connection_groups();
×
60

×
61
        let mut pipeline = AppPipeline::new();
×
62
        let transform_response = statement_to_pipeline(&sql, &mut pipeline, None)
×
63
            .map_err(OrchestrationError::PipelineError)?;
×
64
        pipeline.add_sink(
×
65
            Arc::new(StreamingSinkFactory::new(sender)),
×
66
            "streaming_sink",
×
67
        );
×
68

×
69
        let table_info = transform_response
×
70
            .output_tables_map
×
71
            .values()
×
72
            .next()
×
73
            .unwrap();
×
74
        pipeline
×
75
            .connect_nodes(
×
76
                &table_info.node,
×
77
                Some(table_info.port),
×
78
                "streaming_sink",
×
79
                Some(DEFAULT_PORT_HANDLE),
×
80
                true,
×
81
            )
×
82
            .map_err(OrchestrationError::ExecutionError)?;
×
83

×
84
        let used_sources: Vec<String> = pipeline.get_entry_points_sources_names();
×
85

×
86
        let source_builder = SourceBuilder::new(used_sources, grouped_connections);
×
87
        let asm = source_builder.build_source_manager(self.running.clone())?.0;
×
88
        let mut app = App::new(asm);
×
89
        app.add_pipeline(pipeline);
×
90

×
91
        let dag = app.get_dag().map_err(OrchestrationError::ExecutionError)?;
×
92
        let path = &self.pipeline_dir;
×
93
        let mut exec = DagExecutor::new(
×
94
            dag.clone(),
×
95
            path.as_path(),
×
96
            ExecutorOptions::default(),
×
97
            self.running.clone(),
×
98
        )?;
×
99

×
100
        exec.start()?;
×
101
        Ok(dag)
×
102
    }
×
103

×
104
    pub fn get_tables(
×
105
        connections: &Vec<Connection>,
×
106
    ) -> Result<HashMap<String, Vec<SchemaWithChangesType>>, OrchestrationError> {
×
107
        let mut schema_map = HashMap::new();
×
108
        for connection in connections {
×
109
            validate(connection.to_owned(), None)?;
×
110

×
111
            let connector = get_connector(connection.to_owned())?;
×
112
            let schema_tuples = connector.get_schemas(None)?;
×
113
            schema_map.insert(connection.name.to_owned(), schema_tuples);
×
114
        }
×
115

×
116
        Ok(schema_map)
×
117
    }
×
118

×
119
    pub fn create_dag_executor(
×
120
        &self,
×
121
        notifier: Option<crossbeam::channel::Sender<PipelineResponse>>,
×
122
        settings: CacheSinkSettings,
×
123
    ) -> Result<(DagExecutor<SchemaSQLContext>, IngestorVec), OrchestrationError> {
×
124
        let running_wait = self.running.clone();
×
125

×
126
        let builder = PipelineBuilder::new(
×
127
            self.config.clone(),
×
128
            self.cache_endpoints.clone(),
×
129
            self.running.clone(),
×
130
            self.pipeline_dir.clone(),
×
131
        );
×
132

×
133
        let (parent_dag, ingestors) = builder.build(notifier, PathBuf::default(), settings)?;
×
134
        let path = &self.pipeline_dir;
×
135

×
136
        if !path.exists() {
×
137
            return Err(OrchestrationError::PipelineDirectoryNotFound(
×
138
                path.to_string_lossy().to_string(),
×
139
            ));
×
140
        }
×
141

×
142
        let exec = DagExecutor::new(
×
143
            parent_dag,
×
144
            path.as_path(),
×
145
            ExecutorOptions::default(),
×
146
            running_wait,
×
147
        )?;
×
148

×
149
        Ok((exec, ingestors))
×
150
    }
×
151

×
152
    pub fn run_dag_executor(
×
153
        mut dag_executor: DagExecutor<SchemaSQLContext>,
×
154
    ) -> Result<(), OrchestrationError> {
×
155
        dag_executor.start()?;
×
156
        dag_executor.join().map_err(ExecutionError)
×
157
    }
×
158
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc