• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3965135367

pending completion
3965135367

Pull #680

github

GitHub
Merge 1add77327 into 56c0cf2b3
Pull Request #680: feat: Implement nested queries and CTE.

506 of 506 new or added lines in 18 files covered. (100.0%)

21999 of 33062 relevant lines covered (66.54%)

50489.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.2
/dozer-orchestrator/src/pipeline/connector_source.rs
1
use dozer_core::dag::channels::SourceChannelForwarder;
2
use dozer_core::dag::errors::ExecutionError::ReplicationTypeNotFound;
3
use dozer_core::dag::errors::{ExecutionError, SourceError};
4
use dozer_core::dag::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
5
use dozer_ingestion::connectors::{get_connector, TableInfo};
6
use dozer_ingestion::errors::ConnectorError;
7
use dozer_ingestion::ingestion::{IngestionIterator, Ingestor};
8
use dozer_types::ingestion_types::IngestionOperation;
9
use dozer_types::log::info;
10
use dozer_types::models::connection::Connection;
11
use dozer_types::parking_lot::RwLock;
12
use dozer_types::types::{Operation, ReplicationChangesTrackingType, Schema, SchemaIdentifier};
13
use std::collections::HashMap;
14
use std::sync::atomic::{AtomicBool, Ordering};
15
use std::sync::Arc;
16
use std::thread;
17

18
#[derive(Debug)]
×
19
pub struct ConnectorSourceFactory {
20
    pub ingestor: Arc<RwLock<Ingestor>>,
21
    pub iterator: Arc<RwLock<IngestionIterator>>,
22
    pub ports: HashMap<String, u16>,
23
    pub schema_port_map: HashMap<u32, u16>,
24
    pub schema_map: HashMap<u16, Schema>,
25
    pub replication_changes_type_map: HashMap<u16, ReplicationChangesTrackingType>,
26
    pub tables: Vec<TableInfo>,
27
    pub connection: Connection,
28
    pub running: Arc<AtomicBool>,
29
}
30

31
fn map_replication_type_to_output_port_type(
×
32
    typ: &ReplicationChangesTrackingType,
×
33
) -> OutputPortType {
×
34
    match typ {
×
35
        ReplicationChangesTrackingType::FullChanges => {
36
            OutputPortType::StatefulWithPrimaryKeyLookup {
×
37
                retr_old_records_for_deletes: false,
×
38
                retr_old_records_for_updates: false,
×
39
            }
×
40
        }
41
        ReplicationChangesTrackingType::OnlyPK => OutputPortType::StatefulWithPrimaryKeyLookup {
×
42
            retr_old_records_for_deletes: true,
×
43
            retr_old_records_for_updates: true,
×
44
        },
×
45
        ReplicationChangesTrackingType::Nothing => OutputPortType::AutogenRowKeyLookup,
×
46
    }
47
}
×
48

49
impl ConnectorSourceFactory {
50
    pub fn new(
4✔
51
        ingestor: Arc<RwLock<Ingestor>>,
4✔
52
        iterator: Arc<RwLock<IngestionIterator>>,
4✔
53
        ports: HashMap<String, u16>,
4✔
54
        tables: Vec<TableInfo>,
4✔
55
        connection: Connection,
4✔
56
        running: Arc<AtomicBool>,
4✔
57
    ) -> Self {
4✔
58
        let (schema_map, schema_port_map, replication_changes_type_map) =
4✔
59
            Self::get_schema_map(connection.clone(), tables.clone(), ports.clone());
4✔
60
        Self {
4✔
61
            ingestor,
4✔
62
            iterator,
4✔
63
            ports,
4✔
64
            schema_port_map,
4✔
65
            schema_map,
4✔
66
            replication_changes_type_map,
4✔
67
            tables,
4✔
68
            connection,
4✔
69
            running,
4✔
70
        }
4✔
71
    }
4✔
72

73
    fn get_schema_map(
4✔
74
        connection: Connection,
4✔
75
        tables: Vec<TableInfo>,
4✔
76
        ports: HashMap<String, u16>,
4✔
77
    ) -> (
4✔
78
        HashMap<u16, Schema>,
4✔
79
        HashMap<u32, u16>,
4✔
80
        HashMap<u16, ReplicationChangesTrackingType>,
4✔
81
    ) {
4✔
82
        let mut tables_map = HashMap::new();
4✔
83
        for t in &tables {
9✔
84
            tables_map.insert(t.table_name.clone(), t.name.clone());
5✔
85
        }
5✔
86

×
87
        let connector = get_connector(connection).unwrap();
4✔
88
        let schema_tuples = connector.get_schemas(Some(tables)).unwrap();
4✔
89

4✔
90
        let mut schema_map = HashMap::new();
4✔
91
        let mut schema_port_map: HashMap<u32, u16> = HashMap::new();
4✔
92
        let mut replication_changes_type_map: HashMap<u16, ReplicationChangesTrackingType> =
4✔
93
            HashMap::new();
4✔
94

×
95
        for (table_name, schema, replication_changes_type) in schema_tuples {
4✔
96
            let source_name = tables_map.get(&table_name).unwrap();
×
97
            let port: u16 = *ports
×
98
                .get(source_name)
×
99
                .map_or(Err(ExecutionError::PortNotFound(table_name.clone())), Ok)
×
100
                .unwrap();
×
101
            let schema_id = get_schema_id(schema.identifier.as_ref()).unwrap();
×
102

×
103
            schema_port_map.insert(schema_id, port);
×
104
            schema_map.insert(port, schema);
×
105
            replication_changes_type_map.insert(port, replication_changes_type);
×
106
        }
×
107

×
108
        (schema_map, schema_port_map, replication_changes_type_map)
4✔
109
    }
4✔
110
}
×
111

×
112
impl SourceFactory for ConnectorSourceFactory {
×
113
    fn get_output_schema(&self, port: &PortHandle) -> Result<Schema, ExecutionError> {
×
114
        self.schema_map.get(port).map_or(
×
115
            Err(ExecutionError::PortNotFoundInSource(*port)),
×
116
            |schema_name| Ok(schema_name.clone()),
×
117
        )
×
118
    }
×
119

×
120
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
×
121
        self.ports
×
122
            .values()
×
123
            .map(|e| {
×
124
                self.replication_changes_type_map.get(e).map_or(
×
125
                    Err(ReplicationTypeNotFound),
×
126
                    |typ| {
×
127
                        Ok(OutputPortDef::new(
×
128
                            *e,
×
129
                            map_replication_type_to_output_port_type(typ),
×
130
                        ))
×
131
                    },
×
132
                )
×
133
            })
×
134
            .collect()
×
135
    }
×
136

×
137
    fn prepare(&self, output_schemas: HashMap<PortHandle, Schema>) -> Result<(), ExecutionError> {
×
138
        use std::println as info;
×
139
        for (port, schema) in output_schemas {
×
140
            let (name, _) = self
×
141
                .ports
×
142
                .iter()
×
143
                .find(|(_, p)| **p == port)
×
144
                .map_or(Err(ExecutionError::PortNotFound(port.to_string())), Ok)?;
×
145
            info!("Source: Initializing input schema: {}", name);
×
146
            schema.print().printstd();
×
147
        }
×
148
        Ok(())
×
149
    }
×
150

×
151
    fn build(
×
152
        &self,
×
153
        _output_schemas: HashMap<PortHandle, Schema>,
×
154
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
155
        Ok(Box::new(ConnectorSource {
×
156
            ingestor: self.ingestor.clone(),
×
157
            iterator: self.iterator.clone(),
×
158
            schema_port_map: self.schema_port_map.clone(),
×
159
            tables: self.tables.clone(),
×
160
            connection: self.connection.clone(),
×
161
            running: self.running.clone(),
×
162
        }))
×
163
    }
×
164
}
165

166
#[derive(Debug)]
×
167
pub struct ConnectorSource {
168
    ingestor: Arc<RwLock<Ingestor>>,
169
    iterator: Arc<RwLock<IngestionIterator>>,
170
    schema_port_map: HashMap<u32, u16>,
171
    tables: Vec<TableInfo>,
×
172
    connection: Connection,
×
173
    running: Arc<AtomicBool>,
×
174
}
×
175

×
176
impl Source for ConnectorSource {
×
177
    fn start(
×
178
        &self,
×
179
        fw: &mut dyn SourceChannelForwarder,
×
180
        from_seq: Option<(u64, u64)>,
×
181
    ) -> Result<(), ExecutionError> {
×
182
        let mut connector = get_connector(self.connection.to_owned())
×
183
            .map_err(|e| ExecutionError::ConnectorError(Box::new(e)))?;
×
184

×
185
        let ingestor = self.ingestor.clone();
×
186
        let tables = self.tables.clone();
×
187
        let con_fn = move || -> Result<(), ConnectorError> {
×
188
            connector.initialize(ingestor, Some(tables))?;
×
189
            connector.start(from_seq)?;
×
190
            Ok(())
×
191
        };
×
192
        let running = self.running.clone();
×
193
        let t = thread::spawn(move || {
×
194
            if let Err(e) = con_fn() {
×
195
                if running.load(Ordering::Relaxed) {
×
196
                    std::panic::panic_any(e);
×
197
                }
×
198
            }
×
199
        });
×
200

×
201
        loop {
×
202
            let msg = self.iterator.write().next();
×
203
            if let Some(msg) = msg {
×
204
                match msg {
×
205
                    ((lsn, seq_no), IngestionOperation::OperationEvent(op)) => {
×
206
                        let identifier = match &op.operation {
×
207
                            Operation::Delete { old } => old.schema_id.to_owned(),
×
208
                            Operation::Insert { new } => new.schema_id.to_owned(),
×
209
                            Operation::Update { old: _, new } => new.schema_id.to_owned(),
×
210
                        };
×
211
                        let schema_id = get_schema_id(identifier.as_ref())?;
×
212
                        let port = self.schema_port_map.get(&schema_id).map_or(
×
213
                            Err(ExecutionError::SourceError(SourceError::PortError(
×
214
                                schema_id.to_string(),
×
215
                            ))),
×
216
                            Ok,
×
217
                        )?;
×
218
                        fw.send(lsn, seq_no, op.operation.to_owned(), port.to_owned())?
×
219
                    }
×
220
                }
×
221
            } else {
×
222
                break;
×
223
            }
×
224
        }
×
225

×
226
        t.join().unwrap();
×
227

×
228
        Ok(())
×
229
    }
×
230
}
×
231

×
232
fn get_schema_id(op_schema_id: Option<&SchemaIdentifier>) -> Result<u32, ExecutionError> {
×
233
    Ok(op_schema_id
×
234
        .map_or(Err(ExecutionError::SchemaNotInitialized), Ok)?
×
235
        .id)
236
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc