• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007818786

pending completion
4007818786

Pull #733

github

GitHub
Merge baf5c38aa into 6c0ac2b2c
Pull Request #733: Bump diesel from 2.0.2 to 2.0.3

23389 of 34432 relevant lines covered (67.93%)

40326.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.82
/dozer-orchestrator/src/pipeline/connector_source.rs
1
use dozer_core::dag::channels::SourceChannelForwarder;
2
use dozer_core::dag::errors::ExecutionError::ReplicationTypeNotFound;
3
use dozer_core::dag::errors::{ExecutionError, SourceError};
4
use dozer_core::dag::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
5
use dozer_ingestion::connectors::{get_connector, TableInfo};
6
use dozer_ingestion::errors::ConnectorError;
7
use dozer_ingestion::ingestion::{IngestionIterator, Ingestor};
8
use dozer_sql::pipeline::builder::SchemaSQLContext;
9
use dozer_types::ingestion_types::IngestionOperation;
10
use dozer_types::log::info;
11
use dozer_types::models::connection::Connection;
12
use dozer_types::parking_lot::RwLock;
13
use dozer_types::types::{
14
    Operation, ReplicationChangesTrackingType, Schema, SchemaIdentifier, SourceDefinition,
15
};
16
use std::collections::HashMap;
17
use std::sync::atomic::{AtomicBool, Ordering};
18
use std::sync::Arc;
19
use std::thread;
×
20

21
#[derive(Debug)]
×
22
pub struct ConnectorSourceFactory {
23
    pub ingestor: Arc<RwLock<Ingestor>>,
24
    pub iterator: Arc<RwLock<IngestionIterator>>,
25
    pub ports: HashMap<String, u16>,
26
    pub schema_port_map: HashMap<u32, u16>,
27
    pub schema_map: HashMap<u16, Schema>,
28
    pub replication_changes_type_map: HashMap<u16, ReplicationChangesTrackingType>,
29
    pub tables: Vec<TableInfo>,
30
    pub connection: Connection,
31
    pub running: Arc<AtomicBool>,
32
}
×
33

×
34
fn map_replication_type_to_output_port_type(
×
35
    typ: &ReplicationChangesTrackingType,
×
36
) -> OutputPortType {
×
37
    match typ {
×
38
        ReplicationChangesTrackingType::FullChanges => {
×
39
            OutputPortType::StatefulWithPrimaryKeyLookup {
×
40
                retr_old_records_for_deletes: false,
×
41
                retr_old_records_for_updates: false,
×
42
            }
×
43
        }
×
44
        ReplicationChangesTrackingType::OnlyPK => OutputPortType::StatefulWithPrimaryKeyLookup {
×
45
            retr_old_records_for_deletes: true,
×
46
            retr_old_records_for_updates: true,
×
47
        },
×
48
        ReplicationChangesTrackingType::Nothing => OutputPortType::AutogenRowKeyLookup,
×
49
    }
50
}
×
51

×
52
impl ConnectorSourceFactory {
×
53
    pub fn new(
4✔
54
        ingestor: Arc<RwLock<Ingestor>>,
4✔
55
        iterator: Arc<RwLock<IngestionIterator>>,
4✔
56
        ports: HashMap<String, u16>,
4✔
57
        tables: Vec<TableInfo>,
4✔
58
        connection: Connection,
4✔
59
        running: Arc<AtomicBool>,
4✔
60
    ) -> Self {
4✔
61
        let (schema_map, schema_port_map, replication_changes_type_map) =
4✔
62
            Self::get_schema_map(connection.clone(), tables.clone(), ports.clone());
4✔
63
        Self {
4✔
64
            ingestor,
4✔
65
            iterator,
4✔
66
            ports,
4✔
67
            schema_port_map,
4✔
68
            schema_map,
4✔
69
            replication_changes_type_map,
4✔
70
            tables,
4✔
71
            connection,
4✔
72
            running,
4✔
73
        }
4✔
74
    }
4✔
75

×
76
    fn get_schema_map(
4✔
77
        connection: Connection,
4✔
78
        tables: Vec<TableInfo>,
4✔
79
        ports: HashMap<String, u16>,
4✔
80
    ) -> (
4✔
81
        HashMap<u16, Schema>,
4✔
82
        HashMap<u32, u16>,
4✔
83
        HashMap<u16, ReplicationChangesTrackingType>,
4✔
84
    ) {
4✔
85
        let mut tables_map = HashMap::new();
4✔
86
        for t in &tables {
9✔
87
            tables_map.insert(t.table_name.clone(), t.name.clone());
5✔
88
        }
5✔
89

×
90
        let connector = get_connector(connection).unwrap();
4✔
91
        let schema_tuples = connector.get_schemas(Some(tables)).unwrap();
4✔
92

4✔
93
        let mut schema_map = HashMap::new();
4✔
94
        let mut schema_port_map: HashMap<u32, u16> = HashMap::new();
4✔
95
        let mut replication_changes_type_map: HashMap<u16, ReplicationChangesTrackingType> =
4✔
96
            HashMap::new();
4✔
97

×
98
        for (table_name, schema, replication_changes_type) in schema_tuples {
4✔
99
            let source_name = tables_map.get(&table_name).unwrap();
×
100
            let port: u16 = *ports
×
101
                .get(source_name)
×
102
                .map_or(Err(ExecutionError::PortNotFound(table_name.clone())), Ok)
×
103
                .unwrap();
×
104
            let schema_id = get_schema_id(schema.identifier.as_ref()).unwrap();
×
105

×
106
            schema_port_map.insert(schema_id, port);
×
107
            schema_map.insert(port, schema);
×
108
            replication_changes_type_map.insert(port, replication_changes_type);
×
109
        }
×
110

×
111
        (schema_map, schema_port_map, replication_changes_type_map)
4✔
112
    }
4✔
113
}
114

×
115
impl SourceFactory<SchemaSQLContext> for ConnectorSourceFactory {
×
116
    fn get_output_schema(
×
117
        &self,
×
118
        port: &PortHandle,
×
119
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
×
120
        let mut schema = self
×
121
            .schema_map
×
122
            .get(port)
×
123
            .map_or(Err(ExecutionError::PortNotFoundInSource(*port)), |s| {
×
124
                Ok(s.clone())
×
125
            })?;
×
126

×
127
        let table_name = self
×
128
            .ports
×
129
            .iter()
×
130
            .find(|(_, p)| **p == *port)
×
131
            .unwrap()
×
132
            .0
×
133
            .clone();
×
134
        // Add source information to the schema.
×
135
        let mut fields = vec![];
×
136
        for field in schema.fields {
×
137
            let mut f = field.clone();
×
138
            f.source = SourceDefinition::Table {
×
139
                connection: self.connection.name.clone(),
×
140
                name: table_name.clone(),
×
141
            };
×
142
            fields.push(f);
×
143
        }
×
144
        schema.fields = fields;
×
145

×
146
        Ok((schema, SchemaSQLContext::default()))
×
147
    }
×
148

×
149
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
×
150
        self.ports
×
151
            .values()
×
152
            .map(|e| {
×
153
                self.replication_changes_type_map.get(e).map_or(
×
154
                    Err(ReplicationTypeNotFound),
×
155
                    |typ| {
×
156
                        Ok(OutputPortDef::new(
×
157
                            *e,
×
158
                            map_replication_type_to_output_port_type(typ),
×
159
                        ))
×
160
                    },
×
161
                )
×
162
            })
×
163
            .collect()
×
164
    }
×
165

×
166
    fn prepare(
×
167
        &self,
×
168
        output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
169
    ) -> Result<(), ExecutionError> {
×
170
        use std::println as info;
×
171
        for (port, schema) in output_schemas {
×
172
            let (name, _) = self
×
173
                .ports
×
174
                .iter()
×
175
                .find(|(_, p)| **p == port)
×
176
                .map_or(Err(ExecutionError::PortNotFound(port.to_string())), Ok)?;
×
177
            info!("Source: Initializing input schema: {}", name);
×
178
            schema.0.print().printstd();
×
179
        }
180
        Ok(())
×
181
    }
×
182

183
    fn build(
×
184
        &self,
×
185
        _output_schemas: HashMap<PortHandle, Schema>,
×
186
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
187
        Ok(Box::new(ConnectorSource {
×
188
            ingestor: self.ingestor.clone(),
×
189
            iterator: self.iterator.clone(),
×
190
            schema_port_map: self.schema_port_map.clone(),
×
191
            tables: self.tables.clone(),
×
192
            connection: self.connection.clone(),
×
193
            running: self.running.clone(),
×
194
        }))
×
195
    }
×
196
}
×
197

×
198
#[derive(Debug)]
×
199
pub struct ConnectorSource {
×
200
    ingestor: Arc<RwLock<Ingestor>>,
×
201
    iterator: Arc<RwLock<IngestionIterator>>,
×
202
    schema_port_map: HashMap<u32, u16>,
×
203
    tables: Vec<TableInfo>,
×
204
    connection: Connection,
×
205
    running: Arc<AtomicBool>,
×
206
}
×
207

208
impl Source for ConnectorSource {
×
209
    fn start(
×
210
        &self,
×
211
        fw: &mut dyn SourceChannelForwarder,
×
212
        from_seq: Option<(u64, u64)>,
×
213
    ) -> Result<(), ExecutionError> {
×
214
        let mut connector = get_connector(self.connection.to_owned())
×
215
            .map_err(|e| ExecutionError::ConnectorError(Box::new(e)))?;
×
216

×
217
        let ingestor = self.ingestor.clone();
×
218
        let tables = self.tables.clone();
×
219
        let con_fn = move || -> Result<(), ConnectorError> {
×
220
            connector.initialize(ingestor, Some(tables))?;
×
221
            connector.start(from_seq)?;
×
222
            Ok(())
×
223
        };
×
224
        let running = self.running.clone();
×
225
        let t = thread::spawn(move || {
×
226
            if let Err(e) = con_fn() {
×
227
                if running.load(Ordering::Relaxed) {
×
228
                    std::panic::panic_any(e);
×
229
                }
×
230
            }
×
231
        });
×
232

×
233
        loop {
×
234
            let msg = self.iterator.write().next();
×
235
            if let Some(msg) = msg {
×
236
                match msg {
×
237
                    ((lsn, seq_no), IngestionOperation::OperationEvent(op)) => {
×
238
                        let identifier = match &op.operation {
×
239
                            Operation::Delete { old } => old.schema_id.to_owned(),
×
240
                            Operation::Insert { new } => new.schema_id.to_owned(),
×
241
                            Operation::Update { old: _, new } => new.schema_id.to_owned(),
×
242
                        };
243
                        let schema_id = get_schema_id(identifier.as_ref())?;
×
244
                        let port = self.schema_port_map.get(&schema_id).map_or(
×
245
                            Err(ExecutionError::SourceError(SourceError::PortError(
×
246
                                schema_id.to_string(),
×
247
                            ))),
×
248
                            Ok,
×
249
                        )?;
×
250
                        fw.send(lsn, seq_no, op.operation.to_owned(), port.to_owned())?
×
251
                    }
252
                }
253
            } else {
254
                break;
×
255
            }
×
256
        }
×
257

×
258
        t.join().unwrap();
×
259

×
260
        Ok(())
×
261
    }
×
262
}
263

264
fn get_schema_id(op_schema_id: Option<&SchemaIdentifier>) -> Result<u32, ExecutionError> {
×
265
    Ok(op_schema_id
×
266
        .map_or(Err(ExecutionError::SchemaNotInitialized), Ok)?
×
267
        .id)
268
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc