• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3978628498

pending completion
3978628498

Pull #705

github

GitHub
Merge 8775fcda7 into e2f9ad287
Pull Request #705: chore: support for generic schema context in `Sink`, `Processor` and `Source` factories

572 of 572 new or added lines in 35 files covered. (100.0%)

22294 of 34850 relevant lines covered (63.97%)

40332.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.06
/dozer-orchestrator/src/pipeline/connector_source.rs
1
use dozer_core::dag::channels::SourceChannelForwarder;
2
use dozer_core::dag::errors::ExecutionError::ReplicationTypeNotFound;
3
use dozer_core::dag::errors::{ExecutionError, SourceError};
4
use dozer_core::dag::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
5
use dozer_ingestion::connectors::{get_connector, TableInfo};
6
use dozer_ingestion::errors::ConnectorError;
7
use dozer_ingestion::ingestion::{IngestionIterator, Ingestor};
8
use dozer_sql::pipeline::builder::SchemaSQLContext;
9
use dozer_types::ingestion_types::IngestionOperation;
10
use dozer_types::log::info;
11
use dozer_types::models::connection::Connection;
12
use dozer_types::parking_lot::RwLock;
13
use dozer_types::types::{Operation, ReplicationChangesTrackingType, Schema, SchemaIdentifier};
14
use std::collections::HashMap;
15
use std::sync::atomic::{AtomicBool, Ordering};
16
use std::sync::Arc;
17
use std::thread;
18

×
19
#[derive(Debug)]
×
20
pub struct ConnectorSourceFactory {
21
    pub ingestor: Arc<RwLock<Ingestor>>,
22
    pub iterator: Arc<RwLock<IngestionIterator>>,
23
    pub ports: HashMap<String, u16>,
24
    pub schema_port_map: HashMap<u32, u16>,
25
    pub schema_map: HashMap<u16, Schema>,
26
    pub replication_changes_type_map: HashMap<u16, ReplicationChangesTrackingType>,
27
    pub tables: Vec<TableInfo>,
28
    pub connection: Connection,
29
    pub running: Arc<AtomicBool>,
30
}
31

×
32
fn map_replication_type_to_output_port_type(
×
33
    typ: &ReplicationChangesTrackingType,
×
34
) -> OutputPortType {
×
35
    match typ {
×
36
        ReplicationChangesTrackingType::FullChanges => {
×
37
            OutputPortType::StatefulWithPrimaryKeyLookup {
×
38
                retr_old_records_for_deletes: false,
×
39
                retr_old_records_for_updates: false,
×
40
            }
×
41
        }
×
42
        ReplicationChangesTrackingType::OnlyPK => OutputPortType::StatefulWithPrimaryKeyLookup {
×
43
            retr_old_records_for_deletes: true,
×
44
            retr_old_records_for_updates: true,
×
45
        },
×
46
        ReplicationChangesTrackingType::Nothing => OutputPortType::AutogenRowKeyLookup,
×
47
    }
×
48
}
×
49

50
impl ConnectorSourceFactory {
×
51
    pub fn new(
4✔
52
        ingestor: Arc<RwLock<Ingestor>>,
4✔
53
        iterator: Arc<RwLock<IngestionIterator>>,
4✔
54
        ports: HashMap<String, u16>,
4✔
55
        tables: Vec<TableInfo>,
4✔
56
        connection: Connection,
4✔
57
        running: Arc<AtomicBool>,
4✔
58
    ) -> Self {
4✔
59
        let (schema_map, schema_port_map, replication_changes_type_map) =
4✔
60
            Self::get_schema_map(connection.clone(), tables.clone(), ports.clone());
4✔
61
        Self {
4✔
62
            ingestor,
4✔
63
            iterator,
4✔
64
            ports,
4✔
65
            schema_port_map,
4✔
66
            schema_map,
4✔
67
            replication_changes_type_map,
4✔
68
            tables,
4✔
69
            connection,
4✔
70
            running,
4✔
71
        }
4✔
72
    }
4✔
73

×
74
    fn get_schema_map(
4✔
75
        connection: Connection,
4✔
76
        tables: Vec<TableInfo>,
4✔
77
        ports: HashMap<String, u16>,
4✔
78
    ) -> (
4✔
79
        HashMap<u16, Schema>,
4✔
80
        HashMap<u32, u16>,
4✔
81
        HashMap<u16, ReplicationChangesTrackingType>,
4✔
82
    ) {
4✔
83
        let mut tables_map = HashMap::new();
4✔
84
        for t in &tables {
9✔
85
            tables_map.insert(t.table_name.clone(), t.name.clone());
5✔
86
        }
5✔
87

×
88
        let connector = get_connector(connection).unwrap();
4✔
89
        let schema_tuples = connector.get_schemas(Some(tables)).unwrap();
4✔
90

4✔
91
        let mut schema_map = HashMap::new();
4✔
92
        let mut schema_port_map: HashMap<u32, u16> = HashMap::new();
4✔
93
        let mut replication_changes_type_map: HashMap<u16, ReplicationChangesTrackingType> =
4✔
94
            HashMap::new();
4✔
95

×
96
        for (table_name, schema, replication_changes_type) in schema_tuples {
4✔
97
            let source_name = tables_map.get(&table_name).unwrap();
×
98
            let port: u16 = *ports
×
99
                .get(source_name)
×
100
                .map_or(Err(ExecutionError::PortNotFound(table_name.clone())), Ok)
×
101
                .unwrap();
×
102
            let schema_id = get_schema_id(schema.identifier.as_ref()).unwrap();
×
103

×
104
            schema_port_map.insert(schema_id, port);
×
105
            schema_map.insert(port, schema);
×
106
            replication_changes_type_map.insert(port, replication_changes_type);
×
107
        }
×
108

×
109
        (schema_map, schema_port_map, replication_changes_type_map)
4✔
110
    }
4✔
111
}
112

113
impl SourceFactory<SchemaSQLContext> for ConnectorSourceFactory {
×
114
    fn get_output_schema(
×
115
        &self,
×
116
        port: &PortHandle,
×
117
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
×
118
        self.schema_map.get(port).map_or(
×
119
            Err(ExecutionError::PortNotFoundInSource(*port)),
×
120
            |schema_name| Ok((schema_name.clone(), SchemaSQLContext {})),
×
121
        )
×
122
    }
×
123

×
124
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
×
125
        self.ports
×
126
            .values()
×
127
            .map(|e| {
×
128
                self.replication_changes_type_map.get(e).map_or(
×
129
                    Err(ReplicationTypeNotFound),
×
130
                    |typ| {
×
131
                        Ok(OutputPortDef::new(
×
132
                            *e,
×
133
                            map_replication_type_to_output_port_type(typ),
×
134
                        ))
×
135
                    },
×
136
                )
×
137
            })
×
138
            .collect()
×
139
    }
×
140

×
141
    fn prepare(
×
142
        &self,
×
143
        output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
144
    ) -> Result<(), ExecutionError> {
×
145
        use std::println as info;
×
146
        for (port, schema) in output_schemas {
×
147
            let (name, _) = self
×
148
                .ports
×
149
                .iter()
×
150
                .find(|(_, p)| **p == port)
×
151
                .map_or(Err(ExecutionError::PortNotFound(port.to_string())), Ok)?;
×
152
            info!("Source: Initializing input schema: {}", name);
×
153
            schema.0.print().printstd();
×
154
        }
×
155
        Ok(())
×
156
    }
×
157

×
158
    fn build(
×
159
        &self,
×
160
        _output_schemas: HashMap<PortHandle, Schema>,
×
161
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
162
        Ok(Box::new(ConnectorSource {
×
163
            ingestor: self.ingestor.clone(),
×
164
            iterator: self.iterator.clone(),
×
165
            schema_port_map: self.schema_port_map.clone(),
×
166
            tables: self.tables.clone(),
×
167
            connection: self.connection.clone(),
×
168
            running: self.running.clone(),
×
169
        }))
×
170
    }
×
171
}
172

173
#[derive(Debug)]
×
174
pub struct ConnectorSource {
175
    ingestor: Arc<RwLock<Ingestor>>,
176
    iterator: Arc<RwLock<IngestionIterator>>,
177
    schema_port_map: HashMap<u32, u16>,
×
178
    tables: Vec<TableInfo>,
×
179
    connection: Connection,
×
180
    running: Arc<AtomicBool>,
×
181
}
×
182

×
183
impl Source for ConnectorSource {
×
184
    fn start(
×
185
        &self,
×
186
        fw: &mut dyn SourceChannelForwarder,
×
187
        from_seq: Option<(u64, u64)>,
×
188
    ) -> Result<(), ExecutionError> {
×
189
        let mut connector = get_connector(self.connection.to_owned())
×
190
            .map_err(|e| ExecutionError::ConnectorError(Box::new(e)))?;
×
191

×
192
        let ingestor = self.ingestor.clone();
×
193
        let tables = self.tables.clone();
×
194
        let con_fn = move || -> Result<(), ConnectorError> {
×
195
            connector.initialize(ingestor, Some(tables))?;
×
196
            connector.start(from_seq)?;
×
197
            Ok(())
×
198
        };
×
199
        let running = self.running.clone();
×
200
        let t = thread::spawn(move || {
×
201
            if let Err(e) = con_fn() {
×
202
                if running.load(Ordering::Relaxed) {
×
203
                    std::panic::panic_any(e);
×
204
                }
×
205
            }
×
206
        });
×
207

×
208
        loop {
×
209
            let msg = self.iterator.write().next();
×
210
            if let Some(msg) = msg {
×
211
                match msg {
×
212
                    ((lsn, seq_no), IngestionOperation::OperationEvent(op)) => {
×
213
                        let identifier = match &op.operation {
×
214
                            Operation::Delete { old } => old.schema_id.to_owned(),
×
215
                            Operation::Insert { new } => new.schema_id.to_owned(),
×
216
                            Operation::Update { old: _, new } => new.schema_id.to_owned(),
×
217
                        };
×
218
                        let schema_id = get_schema_id(identifier.as_ref())?;
×
219
                        let port = self.schema_port_map.get(&schema_id).map_or(
×
220
                            Err(ExecutionError::SourceError(SourceError::PortError(
×
221
                                schema_id.to_string(),
×
222
                            ))),
×
223
                            Ok,
×
224
                        )?;
×
225
                        fw.send(lsn, seq_no, op.operation.to_owned(), port.to_owned())?
×
226
                    }
×
227
                }
×
228
            } else {
×
229
                break;
×
230
            }
×
231
        }
×
232

×
233
        t.join().unwrap();
×
234

×
235
        Ok(())
×
236
    }
×
237
}
238

239
fn get_schema_id(op_schema_id: Option<&SchemaIdentifier>) -> Result<u32, ExecutionError> {
×
240
    Ok(op_schema_id
×
241
        .map_or(Err(ExecutionError::SchemaNotInitialized), Ok)?
×
242
        .id)
243
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc