• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5725329819

pending completion
5725329819

push

github

web-flow
chore: Add `SourceFactory::get_output_port_name` to simplify ui graph generation (#1812)

132 of 132 new or added lines in 13 files covered. (100.0%)

45518 of 60084 relevant lines covered (75.76%)

39014.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.04
/dozer-cli/src/pipeline/connector_source.rs
1
use dozer_core::channels::SourceChannelForwarder;
2
use dozer_core::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
3
use dozer_ingestion::connectors::{get_connector, CdcType, Connector, TableInfo};
4
use dozer_ingestion::errors::ConnectorError;
5
use dozer_ingestion::ingestion::{IngestionConfig, IngestionIterator, Ingestor};
6
use dozer_sql::pipeline::builder::SchemaSQLContext;
7

8
use dozer_types::errors::internal::BoxedError;
9
use dozer_types::indicatif::{MultiProgress, ProgressBar, ProgressStyle};
10
use dozer_types::ingestion_types::{IngestionMessage, IngestionMessageKind, IngestorError};
11
use dozer_types::log::info;
12
use dozer_types::models::connection::Connection;
13
use dozer_types::parking_lot::Mutex;
14
use dozer_types::thiserror::{self, Error};
15
use dozer_types::tracing::{span, Level};
16
use dozer_types::types::{Operation, Schema, SourceDefinition};
17
use metrics::{describe_counter, increment_counter};
18
use std::collections::HashMap;
19
use std::sync::Arc;
20
use std::thread;
21
use tokio::runtime::Runtime;
22

23
fn attach_progress(multi_pb: Option<MultiProgress>) -> ProgressBar {
10✔
24
    let pb = ProgressBar::new_spinner();
10✔
25
    multi_pb.as_ref().map(|m| m.add(pb.clone()));
10✔
26
    pb.set_style(
10✔
27
        ProgressStyle::with_template("{spinner:.red} {msg}: {pos}: {per_sec}")
10✔
28
            .unwrap()
10✔
29
            // For more spinners check out the cli-spinners project:
10✔
30
            // https://github.com/sindresorhus/cli-spinners/blob/master/spinners.json
10✔
31
            .tick_strings(&[
10✔
32
                "▹▹▹▹▹",
10✔
33
                "▸▹▹▹▹",
10✔
34
                "▹▸▹▹▹",
10✔
35
                "▹▹▸▹▹",
10✔
36
                "▹▹▹▸▹",
10✔
37
                "▹▹▹▹▸",
10✔
38
                "▪▪▪▪▪",
10✔
39
            ]),
10✔
40
    );
10✔
41
    pb
10✔
42
}
10✔
43

44
#[derive(Debug)]
×
45
struct Table {
46
    schema_name: Option<String>,
47
    name: String,
48
    columns: Vec<String>,
49
    schema: Schema,
50
    cdc_type: CdcType,
51
    port: PortHandle,
52
}
53

54
#[derive(Debug, Error)]
×
55
pub enum ConnectorSourceFactoryError {
56
    #[error("Connector error: {0}")]
57
    Connector(#[from] ConnectorError),
58
    #[error("Port not found for source: {0}")]
59
    PortNotFoundInSource(PortHandle),
60
    #[error("Schema not initialized")]
61
    SchemaNotInitialized,
62
}
63

64
#[derive(Debug)]
×
65
pub struct ConnectorSourceFactory {
66
    connection_name: String,
67
    tables: Vec<Table>,
68
    /// Will be moved to `ConnectorSource` in `build`.
69
    connector: Mutex<Option<Box<dyn Connector>>>,
70
    runtime: Arc<Runtime>,
71
    progress: Option<MultiProgress>,
72
}
73

74
fn map_replication_type_to_output_port_type(typ: &CdcType) -> OutputPortType {
108✔
75
    match typ {
108✔
76
        CdcType::FullChanges => OutputPortType::StatefulWithPrimaryKeyLookup,
×
77
        CdcType::OnlyPK => OutputPortType::StatefulWithPrimaryKeyLookup,
×
78
        CdcType::Nothing => OutputPortType::Stateless,
108✔
79
    }
80
}
108✔
81

82
impl ConnectorSourceFactory {
83
    pub async fn new(
13✔
84
        table_and_ports: Vec<(TableInfo, PortHandle)>,
13✔
85
        connection: Connection,
13✔
86
        runtime: Arc<Runtime>,
13✔
87
        progress: Option<MultiProgress>,
13✔
88
    ) -> Result<Self, ConnectorSourceFactoryError> {
13✔
89
        let connection_name = connection.name.clone();
13✔
90

91
        let connector = get_connector(connection)?;
13✔
92
        let tables: Vec<TableInfo> = table_and_ports
13✔
93
            .iter()
13✔
94
            .map(|(table, _)| table.clone())
22✔
95
            .collect();
13✔
96
        let source_schemas = connector.get_schemas(&tables).await?;
49✔
97

98
        let mut tables = vec![];
13✔
99
        for ((table, port), source_schema) in table_and_ports.into_iter().zip(source_schemas) {
22✔
100
            let name = table.name;
22✔
101
            let columns = table.column_names;
22✔
102
            let source_schema = source_schema?;
22✔
103
            let schema = source_schema.schema;
22✔
104
            let cdc_type = source_schema.cdc_type;
22✔
105

22✔
106
            let table = Table {
22✔
107
                name,
22✔
108
                schema_name: table.schema.clone(),
22✔
109
                columns,
22✔
110
                schema,
22✔
111
                cdc_type,
22✔
112
                port,
22✔
113
            };
22✔
114

22✔
115
            tables.push(table);
22✔
116
        }
117

118
        Ok(Self {
13✔
119
            connection_name,
13✔
120
            tables,
13✔
121
            connector: Mutex::new(Some(connector)),
13✔
122
            runtime,
13✔
123
            progress,
13✔
124
        })
13✔
125
    }
13✔
126
}
127

128
impl SourceFactory<SchemaSQLContext> for ConnectorSourceFactory {
129
    fn get_output_schema(
24✔
130
        &self,
24✔
131
        port: &PortHandle,
24✔
132
    ) -> Result<(Schema, SchemaSQLContext), BoxedError> {
24✔
133
        let table = self
24✔
134
            .tables
24✔
135
            .iter()
24✔
136
            .find(|table| table.port == *port)
36✔
137
            .ok_or(ConnectorSourceFactoryError::PortNotFoundInSource(*port))?;
24✔
138
        let mut schema = table.schema.clone();
24✔
139
        let table_name = &table.name;
24✔
140

141
        // Add source information to the schema.
142
        for field in &mut schema.fields {
168✔
143
            field.source = SourceDefinition::Table {
144✔
144
                connection: self.connection_name.clone(),
144✔
145
                name: table_name.clone(),
144✔
146
            };
144✔
147
        }
144✔
148

149
        info!(
24✔
150
            "Source: Initializing input schema: {}\n{}",
18✔
151
            table_name,
18✔
152
            schema.print()
18✔
153
        );
154

155
        Ok((schema, SchemaSQLContext::default()))
24✔
156
    }
24✔
157

158
    fn get_output_port_name(&self, port: &PortHandle) -> String {
24✔
159
        let table = self
24✔
160
            .tables
24✔
161
            .iter()
24✔
162
            .find(|table| table.port == *port)
36✔
163
            .unwrap_or_else(|| panic!("Port {} not found", port));
24✔
164
        table.name.clone()
24✔
165
    }
24✔
166

×
167
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
60✔
168
        self.tables
60✔
169
            .iter()
60✔
170
            .map(|table| {
108✔
171
                let typ = map_replication_type_to_output_port_type(&table.cdc_type);
108✔
172
                OutputPortDef::new(table.port, typ)
108✔
173
            })
108✔
174
            .collect()
60✔
175
    }
60✔
176

×
177
    fn build(
6✔
178
        &self,
6✔
179
        _output_schemas: HashMap<PortHandle, Schema>,
6✔
180
    ) -> Result<Box<dyn Source>, BoxedError> {
6✔
181
        let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
6✔
182

6✔
183
        let tables = self
6✔
184
            .tables
6✔
185
            .iter()
6✔
186
            .map(|table| TableInfo {
10✔
187
                schema: table.schema_name.clone(),
10✔
188
                name: table.name.clone(),
10✔
189
                column_names: table.columns.clone(),
10✔
190
            })
10✔
191
            .collect();
6✔
192
        let ports = self.tables.iter().map(|table| table.port).collect();
10✔
193

6✔
194
        let connector = self
6✔
195
            .connector
6✔
196
            .lock()
6✔
197
            .take()
6✔
198
            .expect("ConnectorSource was already built");
6✔
199

6✔
200
        let mut bars = vec![];
6✔
201
        for table in &self.tables {
16✔
202
            let pb = attach_progress(self.progress.clone());
10✔
203
            pb.set_message(table.name.clone());
10✔
204
            bars.push(pb);
10✔
205
        }
10✔
206

×
207
        Ok(Box::new(ConnectorSource {
6✔
208
            ingestor,
6✔
209
            iterator: Mutex::new(iterator),
6✔
210
            tables,
6✔
211
            ports,
6✔
212
            connector,
6✔
213
            runtime: self.runtime.clone(),
6✔
214
            connection_name: self.connection_name.clone(),
6✔
215
            bars,
6✔
216
        }))
6✔
217
    }
6✔
218
}
219

220
#[derive(Debug)]
×
221
pub struct ConnectorSource {
222
    ingestor: Ingestor,
223
    iterator: Mutex<IngestionIterator>,
224
    tables: Vec<TableInfo>,
225
    ports: Vec<PortHandle>,
226
    connector: Box<dyn Connector>,
×
227
    runtime: Arc<Runtime>,
×
228
    connection_name: String,
×
229
    bars: Vec<ProgressBar>,
230
}
×
231

×
232
const SOURCE_OPERATION_COUNTER_NAME: &str = "source_operation";
×
233

×
234
impl Source for ConnectorSource {
×
235
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, BoxedError> {
×
236
        Ok(false)
×
237
    }
×
238

×
239
    fn start(
6✔
240
        &self,
6✔
241
        fw: &mut dyn SourceChannelForwarder,
6✔
242
        _last_checkpoint: Option<(u64, u64)>,
6✔
243
    ) -> Result<(), BoxedError> {
6✔
244
        thread::scope(|scope| {
6✔
245
            describe_counter!(
6✔
246
                SOURCE_OPERATION_COUNTER_NAME,
×
247
                "Number of operation processed by source"
×
248
            );
249

250
            let mut counter = vec![0; self.tables.len()];
6✔
251
            let t = scope.spawn(|| {
6✔
252
                match self
253
                    .runtime
×
254
                    .block_on(self.connector.start(&self.ingestor, self.tables.clone()))
6✔
255
                {
×
256
                    Ok(_) => {}
6✔
257
                    // If we get a channel error, it means the source sender thread has quit.
×
258
                    // Any error handling is done in that thread.
×
259
                    Err(ConnectorError::IngestorError(IngestorError::ChannelError(_))) => (),
×
260
                    Err(e) => std::panic::panic_any(e),
×
261
                }
262
            });
6✔
263

6✔
264
            let mut iterator = self.iterator.lock();
6✔
265

×
266
            for IngestionMessage { identifier, kind } in iterator.by_ref() {
548✔
267
                let span = span!(
548✔
268
                    Level::TRACE,
1,096✔
269
                    "pipeline_source_start",
×
270
                    self.connection_name,
×
271
                    identifier.txid,
×
272
                    identifier.seq_in_tx
×
273
                );
×
274
                let _enter = span.enter();
548✔
275

548✔
276
                match kind {
548✔
277
                    IngestionMessageKind::OperationEvent { table_index, op } => {
544✔
278
                        let port = self.ports[table_index];
544✔
279
                        let table_name = &self.tables[table_index].name;
544✔
280

544✔
281
                        // Update metrics
544✔
282
                        let mut labels = vec![
544✔
283
                            ("connection", self.connection_name.clone()),
544✔
284
                            ("table", table_name.clone()),
544✔
285
                        ];
544✔
286
                        const OPERATION_TYPE_LABEL: &str = "operation_type";
544✔
287
                        match &op {
544✔
288
                            Operation::Delete { .. } => {
×
289
                                labels.push((OPERATION_TYPE_LABEL, "delete".to_string()));
×
290
                            }
×
291
                            Operation::Insert { .. } => {
544✔
292
                                labels.push((OPERATION_TYPE_LABEL, "insert".to_string()));
544✔
293
                            }
544✔
294
                            Operation::Update { .. } => {
×
295
                                labels.push((OPERATION_TYPE_LABEL, "update".to_string()));
×
296
                            }
×
297
                        }
×
298
                        increment_counter!(SOURCE_OPERATION_COUNTER_NAME, &labels);
544✔
299

300
                        // Send message to the pipeline
301
                        fw.send(
544✔
302
                            IngestionMessage {
544✔
303
                                identifier,
544✔
304
                                kind: IngestionMessageKind::OperationEvent { table_index, op },
544✔
305
                            },
544✔
306
                            port,
544✔
307
                        )?;
544✔
308

309
                        // Update counter
×
310
                        let counter = &mut counter[table_index];
544✔
311
                        *counter += 1;
544✔
312
                        if *counter % 1000 == 0 {
544✔
313
                            self.bars[table_index].set_position(*counter);
×
314
                        }
544✔
315
                    }
×
316
                    IngestionMessageKind::SnapshottingDone
×
317
                    | IngestionMessageKind::SnapshottingStarted => {
318
                        for port in &self.ports {
12✔
319
                            fw.send(
8✔
320
                                IngestionMessage {
8✔
321
                                    identifier,
8✔
322
                                    kind: kind.clone(),
8✔
323
                                },
8✔
324
                                *port,
8✔
325
                            )?;
8✔
326
                        }
×
327
                    }
×
328
                }
×
329
            }
×
330

×
331
            // If we reach here, it means the connector thread has quit and the `ingestor` has been dropped.
332
            // `join` will not block.
333
            if let Err(e) = t.join() {
×
334
                std::panic::panic_any(e);
×
335
            }
×
336

×
337
            Ok(())
×
338
        })
6✔
339
    }
6✔
340
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc