• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6013595925

29 Aug 2023 02:22PM UTC coverage: 76.363% (-1.6%) from 77.986%
6013595925

push

github

web-flow
chore: Increase e2e test timeout to 90 minutes (#1936)

Signed-off-by: Bei Chu <914745487@qq.com>

49036 of 64214 relevant lines covered (76.36%)

48121.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.69
/dozer-cli/src/pipeline/connector_source.rs
1
use dozer_core::channels::SourceChannelForwarder;
2
use dozer_core::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
3
use dozer_ingestion::connectors::{get_connector, CdcType, Connector, TableInfo};
4
use dozer_ingestion::errors::ConnectorError;
5
use dozer_ingestion::ingestion::{IngestionConfig, Ingestor};
6

7
use dozer_types::errors::internal::BoxedError;
8
use dozer_types::indicatif::{MultiProgress, ProgressBar, ProgressStyle};
9
use dozer_types::ingestion_types::{IngestionMessage, IngestionMessageKind, IngestorError};
10
use dozer_types::log::info;
11
use dozer_types::models::connection::Connection;
12
use dozer_types::parking_lot::Mutex;
13
use dozer_types::thiserror::{self, Error};
14
use dozer_types::tracing::{span, Level};
15
use dozer_types::types::{Operation, Schema, SourceDefinition};
16
use futures::stream::{AbortHandle, Abortable, Aborted};
17
use metrics::{describe_counter, increment_counter};
18
use std::collections::HashMap;
19
use std::sync::Arc;
20
use std::thread;
21
use tokio::runtime::Runtime;
22

23
use crate::shutdown::ShutdownReceiver;
24

25
fn attach_progress(multi_pb: Option<MultiProgress>) -> ProgressBar {
40✔
26
    let pb = ProgressBar::new_spinner();
40✔
27
    multi_pb.as_ref().map(|m| m.add(pb.clone()));
40✔
28
    pb.set_style(
40✔
29
        ProgressStyle::with_template("{spinner:.red} {msg}: {pos}: {per_sec}")
40✔
30
            .unwrap()
40✔
31
            // For more spinners check out the cli-spinners project:
40✔
32
            // https://github.com/sindresorhus/cli-spinners/blob/master/spinners.json
40✔
33
            .tick_strings(&[
40✔
34
                "▹▹▹▹▹",
40✔
35
                "▸▹▹▹▹",
40✔
36
                "▹▸▹▹▹",
40✔
37
                "▹▹▸▹▹",
40✔
38
                "▹▹▹▸▹",
40✔
39
                "▹▹▹▹▸",
40✔
40
                "▪▪▪▪▪",
40✔
41
            ]),
40✔
42
    );
40✔
43
    pb
40✔
44
}
40✔
45

×
46
#[derive(Debug)]
×
47
struct Table {
×
48
    schema_name: Option<String>,
49
    name: String,
50
    columns: Vec<String>,
51
    schema: Schema,
52
    cdc_type: CdcType,
53
    port: PortHandle,
54
}
55

56
#[derive(Debug, Error)]
×
57
pub enum ConnectorSourceFactoryError {
×
58
    #[error("Connector error: {0}")]
×
59
    Connector(#[from] ConnectorError),
60
    #[error("Port not found for source: {0}")]
61
    PortNotFoundInSource(PortHandle),
62
    #[error("Schema not initialized")]
63
    SchemaNotInitialized,
64
}
65

66
#[derive(Debug)]
×
67
pub struct ConnectorSourceFactory {
×
68
    connection_name: String,
69
    tables: Vec<Table>,
70
    /// Will be moved to `ConnectorSource` in `build`.
71
    connector: Mutex<Option<Box<dyn Connector>>>,
72
    runtime: Arc<Runtime>,
73
    progress: Option<MultiProgress>,
74
    shutdown: ShutdownReceiver,
75
}
76

77
fn map_replication_type_to_output_port_type(typ: &CdcType) -> OutputPortType {
400✔
78
    match typ {
400✔
79
        CdcType::FullChanges => OutputPortType::Stateless,
105✔
80
        CdcType::OnlyPK => OutputPortType::StatefulWithPrimaryKeyLookup,
×
81
        CdcType::Nothing => OutputPortType::Stateless,
295✔
82
    }
×
83
}
400✔
84

×
85
impl ConnectorSourceFactory {
86
    pub async fn new(
61✔
87
        table_and_ports: Vec<(TableInfo, PortHandle)>,
61✔
88
        connection: Connection,
61✔
89
        runtime: Arc<Runtime>,
61✔
90
        progress: Option<MultiProgress>,
61✔
91
        shutdown: ShutdownReceiver,
61✔
92
    ) -> Result<Self, ConnectorSourceFactoryError> {
61✔
93
        let connection_name = connection.name.clone();
61✔
94

×
95
        let connector = get_connector(connection)?;
61✔
96
        let tables: Vec<TableInfo> = table_and_ports
61✔
97
            .iter()
61✔
98
            .map(|(table, _)| table.clone())
82✔
99
            .collect();
61✔
100
        let source_schemas = connector.get_schemas(&tables).await?;
121✔
101

×
102
        let mut tables = vec![];
61✔
103
        for ((table, port), source_schema) in table_and_ports.into_iter().zip(source_schemas) {
82✔
104
            let name = table.name;
82✔
105
            let columns = table.column_names;
82✔
106
            let source_schema = source_schema?;
82✔
107
            let schema = source_schema.schema;
82✔
108
            let cdc_type = source_schema.cdc_type;
82✔
109

82✔
110
            let table = Table {
82✔
111
                name,
82✔
112
                schema_name: table.schema.clone(),
82✔
113
                columns,
82✔
114
                schema,
82✔
115
                cdc_type,
82✔
116
                port,
82✔
117
            };
82✔
118

82✔
119
            tables.push(table);
82✔
120
        }
×
121

122
        Ok(Self {
61✔
123
            connection_name,
61✔
124
            tables,
61✔
125
            connector: Mutex::new(Some(connector)),
61✔
126
            runtime,
61✔
127
            progress,
61✔
128
            shutdown,
61✔
129
        })
61✔
130
    }
61✔
131
}
×
132

133
impl SourceFactory for ConnectorSourceFactory {
134
    fn get_output_schema(&self, port: &PortHandle) -> Result<Schema, BoxedError> {
90✔
135
        let table = self
90✔
136
            .tables
90✔
137
            .iter()
90✔
138
            .find(|table| table.port == *port)
120✔
139
            .ok_or(ConnectorSourceFactoryError::PortNotFoundInSource(*port))?;
90✔
140
        let mut schema = table.schema.clone();
90✔
141
        let table_name = &table.name;
90✔
142

×
143
        // Add source information to the schema.
×
144
        for field in &mut schema.fields {
510✔
145
            field.source = SourceDefinition::Table {
420✔
146
                connection: self.connection_name.clone(),
420✔
147
                name: table_name.clone(),
420✔
148
            };
420✔
149
        }
420✔
150

×
151
        info!(
90✔
152
            "Source: Initializing input schema: {}\n{}",
45✔
153
            table_name,
45✔
154
            schema.print()
45✔
155
        );
×
156

×
157
        Ok(schema)
90✔
158
    }
90✔
159

160
    fn get_output_port_name(&self, port: &PortHandle) -> String {
130✔
161
        let table = self
130✔
162
            .tables
130✔
163
            .iter()
130✔
164
            .find(|table| table.port == *port)
170✔
165
            .unwrap_or_else(|| panic!("Port {} not found", port));
130✔
166
        table.name.clone()
130✔
167
    }
130✔
168

×
169
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
270✔
170
        self.tables
270✔
171
            .iter()
270✔
172
            .map(|table| {
400✔
173
                let typ = map_replication_type_to_output_port_type(&table.cdc_type);
400✔
174
                OutputPortDef::new(table.port, typ)
400✔
175
            })
400✔
176
            .collect()
270✔
177
    }
270✔
178

×
179
    fn build(
30✔
180
        &self,
30✔
181
        _output_schemas: HashMap<PortHandle, Schema>,
30✔
182
    ) -> Result<Box<dyn Source>, BoxedError> {
30✔
183
        let tables = self
30✔
184
            .tables
30✔
185
            .iter()
30✔
186
            .map(|table| TableInfo {
40✔
187
                schema: table.schema_name.clone(),
40✔
188
                name: table.name.clone(),
40✔
189
                column_names: table.columns.clone(),
40✔
190
            })
40✔
191
            .collect();
30✔
192
        let ports = self.tables.iter().map(|table| table.port).collect();
40✔
193

30✔
194
        let connector = self
30✔
195
            .connector
30✔
196
            .lock()
30✔
197
            .take()
30✔
198
            .expect("ConnectorSource was already built");
30✔
199

30✔
200
        let mut bars = vec![];
30✔
201
        for table in &self.tables {
70✔
202
            let pb = attach_progress(self.progress.clone());
40✔
203
            pb.set_message(table.name.clone());
40✔
204
            bars.push(pb);
40✔
205
        }
40✔
206

×
207
        Ok(Box::new(ConnectorSource {
30✔
208
            tables,
30✔
209
            ports,
30✔
210
            connector,
30✔
211
            runtime: self.runtime.clone(),
30✔
212
            connection_name: self.connection_name.clone(),
30✔
213
            bars,
30✔
214
            shutdown: self.shutdown.clone(),
30✔
215
            ingestion_config: IngestionConfig::default(),
30✔
216
        }))
30✔
217
    }
30✔
218
}
×
219

×
220
#[derive(Debug)]
×
221
pub struct ConnectorSource {
×
222
    tables: Vec<TableInfo>,
223
    ports: Vec<PortHandle>,
224
    connector: Box<dyn Connector>,
×
225
    runtime: Arc<Runtime>,
226
    connection_name: String,
227
    bars: Vec<ProgressBar>,
228
    shutdown: ShutdownReceiver,
229
    ingestion_config: IngestionConfig,
230
}
231

232
const SOURCE_OPERATION_COUNTER_NAME: &str = "source_operation";
233

234
impl Source for ConnectorSource {
235
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, BoxedError> {
×
236
        Ok(false)
×
237
    }
×
238

239
    fn start(
30✔
240
        &self,
30✔
241
        fw: &mut dyn SourceChannelForwarder,
30✔
242
        _last_checkpoint: Option<(u64, u64)>,
30✔
243
    ) -> Result<(), BoxedError> {
30✔
244
        thread::scope(|scope| {
30✔
245
            describe_counter!(
30✔
246
                SOURCE_OPERATION_COUNTER_NAME,
×
247
                "Number of operation processed by source"
×
248
            );
×
249

×
250
            let mut counter = vec![0; self.tables.len()];
30✔
251

30✔
252
            let (ingestor, mut iterator) =
30✔
253
                Ingestor::initialize_channel(self.ingestion_config.clone());
30✔
254
            let t = scope.spawn(|| {
30✔
255
                self.runtime.block_on(async move {
30✔
256
                    let ingestor = ingestor;
30✔
257
                    let shutdown_future = self.shutdown.create_shutdown_future();
30✔
258
                    let tables = self.tables.clone();
30✔
259
                    let (abort_handle, abort_registration) = AbortHandle::new_pair();
30✔
260

30✔
261
                    // Abort the connector when we shut down
30✔
262
                    // TODO: pass a `CancellationToken` to the connector to allow
30✔
263
                    // it to gracefully shut down.
30✔
264
                    tokio::spawn(async move {
30✔
265
                        shutdown_future.await;
30✔
266
                        abort_handle.abort();
30✔
267
                    });
30✔
268
                    let result =
30✔
269
                        Abortable::new(self.connector.start(&ingestor, tables), abort_registration)
30✔
270
                            .await;
65✔
271
                    match result {
×
272
                        Ok(Ok(_)) => {}
×
273
                        // If we get a channel error, it means the source sender thread has quit.
×
274
                        // Any error handling is done in that thread.
×
275
                        Ok(Err(ConnectorError::IngestorError(IngestorError::ChannelError(_)))) => {}
×
276
                        Ok(Err(e)) => std::panic::panic_any(e),
×
277
                        // Aborted means we are shutting down
278
                        Err(Aborted) => (),
30✔
279
                    }
×
280
                })
30✔
281
            });
30✔
282

×
283
            for IngestionMessage { identifier, kind } in iterator.by_ref() {
1,385✔
284
                let span = span!(
1,385✔
285
                    Level::TRACE,
2,770✔
286
                    "pipeline_source_start",
287
                    self.connection_name,
×
288
                    identifier.txid,
×
289
                    identifier.seq_in_tx
×
290
                );
291
                let _enter = span.enter();
1,385✔
292

1,385✔
293
                match kind {
1,385✔
294
                    IngestionMessageKind::OperationEvent { table_index, op } => {
1,375✔
295
                        let port = self.ports[table_index];
1,375✔
296
                        let table_name = &self.tables[table_index].name;
1,375✔
297

1,375✔
298
                        // Update metrics
1,375✔
299
                        let mut labels = vec![
1,375✔
300
                            ("connection", self.connection_name.clone()),
1,375✔
301
                            ("table", table_name.clone()),
1,375✔
302
                        ];
1,375✔
303
                        const OPERATION_TYPE_LABEL: &str = "operation_type";
1,375✔
304
                        match &op {
1,375✔
305
                            Operation::Delete { .. } => {
×
306
                                labels.push((OPERATION_TYPE_LABEL, "delete".to_string()));
×
307
                            }
×
308
                            Operation::Insert { .. } => {
1,375✔
309
                                labels.push((OPERATION_TYPE_LABEL, "insert".to_string()));
1,375✔
310
                            }
1,375✔
311
                            Operation::Update { .. } => {
×
312
                                labels.push((OPERATION_TYPE_LABEL, "update".to_string()));
×
313
                            }
×
314
                        }
×
315
                        increment_counter!(SOURCE_OPERATION_COUNTER_NAME, &labels);
1,375✔
316

×
317
                        // Send message to the pipeline
×
318
                        fw.send(
1,375✔
319
                            IngestionMessage {
1,375✔
320
                                identifier,
1,375✔
321
                                kind: IngestionMessageKind::OperationEvent { table_index, op },
1,375✔
322
                            },
1,375✔
323
                            port,
1,375✔
324
                        )?;
1,375✔
325

×
326
                        // Update counter
×
327
                        let counter = &mut counter[table_index];
1,375✔
328
                        *counter += 1;
1,375✔
329
                        if *counter % 1000 == 0 {
1,375✔
330
                            self.bars[table_index].set_position(*counter);
×
331
                        }
1,375✔
332
                    }
×
333
                    IngestionMessageKind::SnapshottingDone
×
334
                    | IngestionMessageKind::SnapshottingStarted => {
×
335
                        for port in &self.ports {
30✔
336
                            fw.send(
20✔
337
                                IngestionMessage {
20✔
338
                                    identifier,
20✔
339
                                    kind: kind.clone(),
20✔
340
                                },
20✔
341
                                *port,
20✔
342
                            )?;
20✔
343
                        }
×
344
                    }
×
345
                }
×
346
            }
×
347

348
            // If we reach here, it means the connector thread has quit and the `ingestor` has been dropped.
349
            // `join` will not block.
350
            if let Err(e) = t.join() {
30✔
351
                std::panic::panic_any(e);
×
352
            }
30✔
353

30✔
354
            Ok(())
30✔
355
        })
30✔
356
    }
30✔
357
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc