• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/pipeline/builder.rs
1
use std::collections::HashMap;
2
use std::sync::Arc;
3

4
use dozer_core::dag::app::AppPipeline;
5
use dozer_core::dag::executor::DagExecutor;
6
use dozer_core::dag::DEFAULT_PORT_HANDLE;
7
use dozer_sql::pipeline::builder::{QueryTableInfo, SchemaSQLContext};
8

9
use dozer_api::grpc::internal_grpc::PipelineResponse;
10
use dozer_core::dag::app::App;
11
use dozer_sql::pipeline::builder::statement_to_pipeline;
12
use dozer_types::indicatif::MultiProgress;
13
use dozer_types::models::app_config::Config;
14
use std::path::PathBuf;
15
use std::sync::atomic::AtomicBool;
16

17
use dozer_api::CacheEndpoint;
18

19
use crate::pipeline::{CacheSinkFactory, CacheSinkSettings};
20

21
use super::source_builder::{IngestorVec, SourceBuilder};
22
use super::validate::validate_grouped_connections;
23
use crate::errors::OrchestrationError;
24
use dozer_types::crossbeam;
25
use dozer_types::log::{error, info};
26
use OrchestrationError::ExecutionError;
27

28
pub enum OutputTableInfo {
29
    Transformed(QueryTableInfo),
30
    Original(OriginalTableInfo),
31
}
32

33
pub struct OriginalTableInfo {
34
    pub table_name: String,
35
    pub connection_name: String,
36
}
37

38
pub struct PipelineBuilder {
39
    config: Config,
40
    cache_endpoints: Vec<CacheEndpoint>,
41
    pipeline_dir: PathBuf,
42
    running: Arc<AtomicBool>,
43
    progress: MultiProgress,
44
}
45
impl PipelineBuilder {
46
    pub fn new(
×
47
        config: Config,
×
48
        cache_endpoints: Vec<CacheEndpoint>,
×
49
        running: Arc<AtomicBool>,
×
50
        pipeline_dir: PathBuf,
×
51
    ) -> Self {
×
52
        Self {
×
53
            config,
×
54
            cache_endpoints,
×
55
            pipeline_dir,
×
56
            running,
×
57
            progress: MultiProgress::new(),
×
58
        }
×
59
    }
×
60

×
61
    // This function is used by both migrate and actual execution
×
62
    pub fn build(
×
63
        &self,
×
64
        notifier: Option<crossbeam::channel::Sender<PipelineResponse>>,
×
65
        api_dir: PathBuf,
×
66
        settings: CacheSinkSettings,
×
67
    ) -> Result<(dozer_core::dag::Dag<SchemaSQLContext>, IngestorVec), OrchestrationError> {
×
68
        let sources = self.config.sources.clone();
×
69

×
70
        let grouped_connections = SourceBuilder::group_connections(sources);
×
71

×
72
        validate_grouped_connections(&grouped_connections)?;
×
73

×
74
        let mut pipelines: Vec<AppPipeline<SchemaSQLContext>> = vec![];
×
75
        let mut used_sources = vec![];
×
76

×
77
        let mut pipeline = AppPipeline::new();
×
78

×
79
        let mut available_output_tables: HashMap<String, OutputTableInfo> = HashMap::new();
×
80

×
81
        // Add all source tables to available output tables
82
        for (connection_name, sources) in grouped_connections.clone() {
×
83
            for source in sources {
×
84
                available_output_tables.insert(
×
85
                    source.name.clone(),
×
86
                    OutputTableInfo::Original(OriginalTableInfo {
×
87
                        connection_name: connection_name.clone(),
×
88
                        table_name: source.name.clone(),
×
89
                    }),
×
90
                );
×
91
            }
×
92
        }
×
93

×
94
        if let Some(sql) = self.config.sql.clone() {
×
95
            let query_context = statement_to_pipeline(&sql, &mut pipeline, None)
×
96
                .map_err(OrchestrationError::PipelineError)?;
×
97

×
98
            for (name, table_info) in query_context.output_tables_map {
×
99
                if available_output_tables.contains_key(name.as_str()) {
×
100
                    return Err(OrchestrationError::DuplicateTable(name));
×
101
                }
×
102
                available_output_tables
×
103
                    .insert(name.clone(), OutputTableInfo::Transformed(table_info));
×
104
            }
×
105

106
            for name in query_context.used_sources {
×
107
                // Add all source tables to input tables
×
108
                used_sources.push(name.clone());
×
109
            }
×
110
        }
×
111
        // Add Used Souces if direct from source
×
112
        for cache_endpoint in self.cache_endpoints.iter().cloned() {
×
113
            let api_endpoint = cache_endpoint.endpoint.clone();
×
114

×
115
            let table_name = api_endpoint.table_name.clone();
×
116

×
117
            let table_info = available_output_tables
×
118
                .get(&table_name)
×
119
                .ok_or_else(|| OrchestrationError::EndpointTableNotFound(table_name.clone()))?;
×
120

×
121
            if let OutputTableInfo::Original(table_info) = table_info {
×
122
                used_sources.push(table_info.table_name.clone());
×
123
            }
×
124
        }
125

×
126
        let source_builder = SourceBuilder::new(used_sources, grouped_connections);
×
127

×
128
        let conn_ports = source_builder.get_ports();
×
129

×
130
        let pipeline_ref = &mut pipeline;
×
131

×
132
        for cache_endpoint in self.cache_endpoints.iter().cloned() {
×
133
            let api_endpoint = cache_endpoint.endpoint.clone();
×
134

×
135
            let cache = cache_endpoint.cache;
×
136

×
137
            let table_name = api_endpoint.table_name.clone();
×
138

×
139
            let table_info = available_output_tables
×
140
                .get(&table_name)
×
141
                .ok_or_else(|| OrchestrationError::EndpointTableNotFound(table_name.clone()))?;
×
142

×
143
            let snk_factory = Arc::new(CacheSinkFactory::new(
×
144
                vec![DEFAULT_PORT_HANDLE],
×
145
                cache,
×
146
                api_endpoint,
×
147
                notifier.clone(),
×
148
                api_dir.clone(),
×
149
                self.progress.clone(),
×
150
                settings.to_owned(),
×
151
            ));
×
152

×
153
            match table_info {
×
154
                OutputTableInfo::Transformed(table_info) => {
×
155
                    pipeline_ref.add_sink(snk_factory, cache_endpoint.endpoint.name.as_str());
×
156

×
157
                    pipeline_ref
×
158
                        .connect_nodes(
×
159
                            &table_info.node,
×
160
                            Some(table_info.port),
×
161
                            cache_endpoint.endpoint.name.as_str(),
×
162
                            Some(DEFAULT_PORT_HANDLE),
×
163
                            true,
×
164
                        )
×
165
                        .map_err(ExecutionError)?;
×
166
                }
×
167
                OutputTableInfo::Original(table_info) => {
×
168
                    pipeline_ref.add_sink(snk_factory, cache_endpoint.endpoint.name.as_str());
×
169

×
170
                    let conn_port = conn_ports
×
171
                        .get(&(
×
172
                            table_info.connection_name.clone(),
×
173
                            table_info.table_name.clone(),
×
174
                        ))
×
175
                        .expect("port should be present based on source mapping");
×
176

×
177
                    pipeline_ref
×
178
                        .connect_nodes(
×
179
                            &table_info.connection_name,
×
180
                            Some(*conn_port),
×
181
                            cache_endpoint.endpoint.name.as_str(),
×
182
                            Some(DEFAULT_PORT_HANDLE),
×
183
                            false,
×
184
                        )
×
185
                        .map_err(ExecutionError)?;
×
186
                }
×
187
            }
×
188
        }
×
189

×
190
        pipelines.push(pipeline);
×
191

×
192
        let (asm, ingestors) = source_builder.build_source_manager(self.running.clone())?;
×
193
        let mut app = App::new(asm);
×
194

×
195
        Vec::into_iter(pipelines).for_each(|p| {
×
196
            app.add_pipeline(p);
×
197
        });
×
198

×
199
        let dag = app.get_dag().map_err(ExecutionError)?;
×
200

×
201
        DagExecutor::validate(&dag, &self.pipeline_dir)
×
202
            .map(|_| {
×
203
                info!("[pipeline] Validation completed");
×
204
            })
×
205
            .map_err(|e| {
×
206
                error!("[pipeline] Validation error: {}", e);
×
207
                OrchestrationError::PipelineValidationError
×
208
            })?;
×
209

×
210
        Ok((dag, ingestors))
×
211
    }
×
212
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc