• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4382731818

pending completion
4382731818

push

github

GitHub
fix: Fix publication slot creation (#1202)

60 of 60 new or added lines in 24 files covered. (100.0%)

27881 of 39907 relevant lines covered (69.86%)

58951.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.82
/dozer-orchestrator/src/simple/executor.rs
1
use dozer_api::grpc::internal::internal_pipeline_server::PipelineEventSenders;
2
use dozer_cache::cache::CacheManagerOptions;
3
use dozer_core::app::{App, AppPipeline};
4
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
5
use dozer_types::models::api_endpoint::ApiEndpoint;
6
use dozer_types::types::{Operation, SourceSchema};
7
use std::collections::HashMap;
8
use std::path::Path;
9
use std::sync::atomic::AtomicBool;
10
use std::sync::Arc;
11

12
use dozer_types::models::source::Source;
13

14
use crate::pipeline::validate::validate;
15
use crate::pipeline::{CacheSinkSettings, PipelineBuilder, StreamingSinkFactory};
16
use dozer_core::executor::{DagExecutor, ExecutorOptions};
17
use dozer_core::DEFAULT_PORT_HANDLE;
18
use dozer_ingestion::connectors::get_connector;
19

20
use dozer_types::crossbeam;
21

22
use dozer_types::models::connection::Connection;
23
use OrchestrationError::ExecutionError;
24

25
use crate::errors::OrchestrationError;
26
use crate::pipeline::source_builder::SourceBuilder;
27

28
pub struct Executor<'a> {
29
    sources: &'a [Source],
30
    sql: Option<&'a str>,
31
    api_endpoints: &'a [ApiEndpoint],
32
    pipeline_dir: &'a Path,
33
    running: Arc<AtomicBool>,
34
}
35
impl<'a> Executor<'a> {
36
    pub fn new(
8✔
37
        sources: &'a [Source],
8✔
38
        sql: Option<&'a str>,
8✔
39
        api_endpoints: &'a [ApiEndpoint],
8✔
40
        pipeline_dir: &'a Path,
8✔
41
        running: Arc<AtomicBool>,
8✔
42
    ) -> Self {
8✔
43
        Self {
8✔
44
            sources,
8✔
45
            sql,
8✔
46
            api_endpoints,
8✔
47
            pipeline_dir,
8✔
48
            running,
8✔
49
        }
8✔
50
    }
8✔
51

52
    // This function is used to run a query using a temporary pipeline
53
    pub fn query(
×
54
        &self,
×
55
        sql: String,
×
56
        sender: crossbeam::channel::Sender<Operation>,
×
57
    ) -> Result<dozer_core::Dag<SchemaSQLContext>, OrchestrationError> {
×
58
        let grouped_connections = SourceBuilder::group_connections(self.sources);
×
59

×
60
        let mut pipeline = AppPipeline::new();
×
61
        let transform_response = statement_to_pipeline(&sql, &mut pipeline, None)
×
62
            .map_err(OrchestrationError::PipelineError)?;
×
63
        pipeline.add_sink(
×
64
            Arc::new(StreamingSinkFactory::new(sender)),
×
65
            "streaming_sink",
×
66
        );
×
67

×
68
        let table_info = transform_response
×
69
            .output_tables_map
×
70
            .values()
×
71
            .next()
×
72
            .unwrap();
×
73
        pipeline
×
74
            .connect_nodes(
×
75
                &table_info.node,
×
76
                Some(table_info.port),
×
77
                "streaming_sink",
×
78
                Some(DEFAULT_PORT_HANDLE),
×
79
                true,
×
80
            )
×
81
            .map_err(OrchestrationError::ExecutionError)?;
×
82

83
        let used_sources = pipeline.get_entry_points_sources_names();
×
84

×
85
        let source_builder = SourceBuilder::new(&used_sources, grouped_connections, None);
×
86
        let asm = source_builder.build_source_manager()?;
×
87

88
        let mut app = App::new(asm);
×
89
        app.add_pipeline(pipeline);
×
90

91
        let dag = app.get_dag().map_err(OrchestrationError::ExecutionError)?;
×
92
        let exec = DagExecutor::new(
×
93
            dag.clone(),
×
94
            self.pipeline_dir.to_path_buf(),
×
95
            ExecutorOptions::default(),
×
96
        )?;
×
97

98
        exec.start(self.running.clone())?;
×
99
        Ok(dag)
×
100
    }
×
101

102
    pub fn get_tables(
×
103
        connections: &Vec<Connection>,
×
104
    ) -> Result<HashMap<String, Vec<SourceSchema>>, OrchestrationError> {
×
105
        let mut schema_map = HashMap::new();
×
106
        for connection in connections {
×
107
            validate(connection.to_owned(), None)?;
×
108

109
            let connector = get_connector(connection.to_owned(), None)?;
×
110
            let schema_tuples = connector.get_schemas(None)?;
×
111
            schema_map.insert(connection.name.to_owned(), schema_tuples);
×
112
        }
113

114
        Ok(schema_map)
×
115
    }
×
116

117
    pub fn create_dag_executor(
8✔
118
        &self,
8✔
119
        notifier: Option<PipelineEventSenders>,
8✔
120
        cache_manager_options: CacheManagerOptions,
8✔
121
        settings: CacheSinkSettings,
8✔
122
        executor_options: ExecutorOptions,
8✔
123
    ) -> Result<DagExecutor, OrchestrationError> {
8✔
124
        let builder = PipelineBuilder::new(
8✔
125
            self.sources,
8✔
126
            self.sql,
8✔
127
            self.api_endpoints,
8✔
128
            self.pipeline_dir,
8✔
129
        );
8✔
130

131
        let dag = builder.build(notifier, cache_manager_options, settings)?;
8✔
132
        let path = &self.pipeline_dir;
8✔
133

8✔
134
        if !path.exists() {
8✔
135
            return Err(OrchestrationError::PipelineDirectoryNotFound(
×
136
                path.to_string_lossy().to_string(),
×
137
            ));
×
138
        }
8✔
139

140
        let exec = DagExecutor::new(dag, path.to_path_buf(), executor_options)?;
8✔
141

142
        Ok(exec)
8✔
143
    }
8✔
144

145
    pub fn run_dag_executor(&self, dag_executor: DagExecutor) -> Result<(), OrchestrationError> {
8✔
146
        let join_handle = dag_executor.start(self.running.clone())?;
8✔
147
        join_handle.join().map_err(ExecutionError)
8✔
148
    }
8✔
149
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc