• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4104551083

pending completion
4104551083

Pull #813

github

GitHub
Merge 6205174b9 into 42fcca9f7
Pull Request #813: fix: Rephrase error when primary key is missing

1 of 1 new or added line in 1 file covered. (100.0%)

23334 of 37485 relevant lines covered (62.25%)

34732.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/postgres/schema_helper.rs
1
use std::collections::HashMap;
2

3
use crate::errors::{ConnectorError, PostgresConnectorError, PostgresSchemaError};
4
use dozer_types::types::{
5
    FieldDefinition, ReplicationChangesTrackingType, Schema, SchemaIdentifier,
6
    SchemaWithChangesType, SourceDefinition,
7
};
8

9
use crate::connectors::{TableInfo, ValidationResults};
10

11
use crate::connectors::postgres::connection::helper;
12
use crate::connectors::postgres::helper::postgres_type_to_dozer_type;
13
use crate::errors::PostgresSchemaError::{
14
    InvalidColumnType, PrimaryKeyIsMissingInSchema, ValueConversionError,
15
};
16

17
use postgres_types::Type;
18
use std::collections::hash_map::DefaultHasher;
19
use std::hash::{Hash, Hasher};
20
use tokio_postgres::Row;
21

22
pub struct SchemaHelper {
23
    conn_config: tokio_postgres::Config,
24
    schema: String,
25
}
26

27
type RowsWithColumnsMap = (Vec<Row>, HashMap<String, Vec<String>>);
28

29
impl SchemaHelper {
30
    pub fn new(conn_config: tokio_postgres::Config, schema: Option<String>) -> SchemaHelper {
×
31
        let schema = schema.map_or("public".to_string(), |s| s);
×
32
        Self {
×
33
            conn_config,
×
34
            schema,
×
35
        }
×
36
    }
×
37

38
    pub fn get_tables(
×
39
        &self,
×
40
        tables: Option<Vec<TableInfo>>,
×
41
    ) -> Result<Vec<TableInfo>, ConnectorError> {
×
42
        Ok(self
×
43
            .get_schemas(tables)?
×
44
            .iter()
×
45
            .map(|(name, schema, _)| {
×
46
                let columns = Some(schema.fields.iter().map(|f| f.name.clone()).collect());
×
47
                TableInfo {
×
48
                    name: name.clone(),
×
49
                    table_name: name.clone(),
×
50
                    id: schema.identifier.unwrap().id,
×
51
                    columns,
×
52
                }
×
53
            })
×
54
            .collect())
×
55
    }
×
56

57
    fn get_columns(
×
58
        &self,
×
59
        table_name: Option<&[TableInfo]>,
×
60
    ) -> Result<RowsWithColumnsMap, PostgresConnectorError> {
×
61
        let mut tables_columns_map: HashMap<String, Vec<String>> = HashMap::new();
×
62
        let mut client = helper::connect(self.conn_config.clone())?;
×
63
        let schema = self.schema.clone();
×
64
        let query = if let Some(tables) = table_name {
×
65
            tables.iter().for_each(|t| {
×
66
                if let Some(columns) = t.columns.clone() {
×
67
                    tables_columns_map.insert(t.table_name.clone(), columns);
×
68
                }
×
69
            });
×
70
            let table_names: Vec<String> = tables.iter().map(|t| t.table_name.clone()).collect();
×
71
            let sql = str::replace(SQL, ":tables_condition", "= ANY($1) AND table_schema = $2");
×
72
            client.query(&sql, &[&table_names, &schema])
×
73
        } else {
74
            let sql = str::replace(SQL, ":tables_condition", TABLES_CONDITION);
×
75
            client.query(&sql, &[&schema])
×
76
        };
77

78
        query
×
79
            .map_err(PostgresConnectorError::InvalidQueryError)
×
80
            .map(|rows| (rows, tables_columns_map))
×
81
    }
×
82

83
    pub fn get_schemas(
×
84
        &self,
×
85
        tables: Option<Vec<TableInfo>>,
×
86
    ) -> Result<Vec<SchemaWithChangesType>, PostgresConnectorError> {
×
87
        let (results, tables_columns_map) = self.get_columns(tables.as_deref())?;
×
88

89
        let mut columns_map: HashMap<String, (Vec<FieldDefinition>, Vec<bool>, u32, String)> =
×
90
            HashMap::new();
×
91
        results
×
92
            .iter()
×
93
            .filter(|row| {
×
94
                let table_name: String = row.get(0);
×
95
                let column_name: String = row.get(1);
×
96

×
97
                tables_columns_map.get(&table_name).map_or(true, |columns| {
×
98
                    columns.is_empty() || columns.contains(&column_name)
×
99
                })
×
100
            })
×
101
            .map(|r| self.convert_row(r))
×
102
            .try_for_each(|row| -> Result<(), PostgresSchemaError> {
×
103
                let (table_name, field_def, is_primary_key, table_id, replication_type) = row?;
×
104
                let vals = columns_map.get(&table_name);
×
105
                let (mut fields, mut primary_keys, table_id, replication_type) = match vals {
×
106
                    Some((fields, primary_keys, table_id, replication_type)) => (
×
107
                        fields.clone(),
×
108
                        primary_keys.clone(),
×
109
                        *table_id,
×
110
                        replication_type,
×
111
                    ),
×
112
                    None => (vec![], vec![], table_id, &replication_type),
×
113
                };
114

115
                fields.push(field_def);
×
116
                primary_keys.push(is_primary_key);
×
117
                columns_map.insert(
×
118
                    table_name,
×
119
                    (fields, primary_keys, table_id, replication_type.clone()),
×
120
                );
×
121

×
122
                Ok(())
×
123
            })?;
×
124

125
        Self::map_columns_to_schemas(columns_map)
×
126
            .map_err(PostgresConnectorError::PostgresSchemaError)
×
127
    }
×
128

129
    pub fn map_columns_to_schemas(
×
130
        map: HashMap<String, (Vec<FieldDefinition>, Vec<bool>, u32, String)>,
×
131
    ) -> Result<Vec<SchemaWithChangesType>, PostgresSchemaError> {
×
132
        let mut schemas: Vec<SchemaWithChangesType> = Vec::new();
×
133
        for (table_name, (fields, primary_keys, table_id, replication_type)) in map.into_iter() {
×
134
            let primary_index: Vec<usize> = primary_keys
×
135
                .iter()
×
136
                .enumerate()
×
137
                .filter(|(_, b)| **b)
×
138
                .map(|(idx, _)| idx)
×
139
                .collect();
×
140

×
141
            let schema = Schema {
×
142
                identifier: Some(SchemaIdentifier {
×
143
                    id: table_id,
×
144
                    version: 1,
×
145
                }),
×
146
                fields: fields.clone(),
×
147
                primary_index,
×
148
            };
×
149

150
            let replication_type = match replication_type.as_str() {
×
151
                "d" => Ok(ReplicationChangesTrackingType::OnlyPK),
×
152
                "i" => Ok(ReplicationChangesTrackingType::OnlyPK),
×
153
                "n" => Ok(ReplicationChangesTrackingType::Nothing),
×
154
                "f" => Ok(ReplicationChangesTrackingType::FullChanges),
×
155
                _ => Err(PostgresSchemaError::UnsupportedReplicationType(
×
156
                    replication_type,
×
157
                )),
×
158
            }?;
×
159

160
            schemas.push((table_name, schema, replication_type));
×
161
        }
162

163
        Self::validate_schema_replication_identity(&schemas)?;
×
164

165
        Ok(schemas)
×
166
    }
×
167

168
    pub fn validate_schema_replication_identity(
×
169
        schemas: &[SchemaWithChangesType],
×
170
    ) -> Result<(), PostgresSchemaError> {
×
171
        let table_without_primary_index = schemas
×
172
            .iter()
×
173
            .find(|(_table_name, schema, _)| schema.primary_index.is_empty());
×
174

×
175
        match table_without_primary_index {
×
176
            Some((table_name, _, _)) => Err(PrimaryKeyIsMissingInSchema(table_name.clone())),
×
177
            None => Ok(()),
×
178
        }
179
    }
×
180

181
    pub fn validate(
×
182
        &self,
×
183
        tables: &[TableInfo],
×
184
    ) -> Result<ValidationResults, PostgresConnectorError> {
×
185
        let (results, tables_columns_map) = self.get_columns(Some(tables))?;
×
186

187
        let mut validation_result: ValidationResults = HashMap::new();
×
188
        for row in results {
×
189
            let table_name: String = row.get(0);
×
190
            let column_name: String = row.get(1);
×
191

×
192
            let column_should_be_validated = tables_columns_map
×
193
                .get(&table_name)
×
194
                .map_or(true, |table_info| table_info.contains(&column_name));
×
195

×
196
            if column_should_be_validated {
×
197
                let row_result = self.convert_row(&row).map_or_else(
×
198
                    |e| {
×
199
                        Err(ConnectorError::PostgresConnectorError(
×
200
                            PostgresConnectorError::PostgresSchemaError(e),
×
201
                        ))
×
202
                    },
×
203
                    |_| Ok(()),
×
204
                );
×
205

×
206
                validation_result.entry(table_name.clone()).or_default();
×
207
                validation_result
×
208
                    .entry(table_name)
×
209
                    .and_modify(|r| r.push((Some(column_name), row_result)));
×
210
            }
×
211
        }
212

213
        for table in tables {
×
214
            if let Some(columns) = &table.columns {
×
215
                let mut existing_columns = HashMap::new();
×
216
                if let Some(res) = validation_result.get(&table.table_name) {
×
217
                    for (col_name, _) in res {
×
218
                        if let Some(name) = col_name {
×
219
                            existing_columns.insert(name.clone(), ());
×
220
                        }
×
221
                    }
222
                }
×
223

224
                for column_name in columns {
×
225
                    if existing_columns.get(column_name).is_none() {
×
226
                        validation_result
×
227
                            .entry(table.table_name.clone())
×
228
                            .and_modify(|r| {
×
229
                                r.push((
×
230
                                    None,
×
231
                                    Err(ConnectorError::PostgresConnectorError(
×
232
                                        PostgresConnectorError::ColumnNotFound(
×
233
                                            column_name.to_string(),
×
234
                                            table.table_name.clone(),
×
235
                                        ),
×
236
                                    )),
×
237
                                ))
×
238
                            })
×
239
                            .or_default();
×
240
                    }
×
241
                }
242
            }
×
243
        }
244

245
        Ok(validation_result)
×
246
    }
×
247

248
    fn convert_row(
×
249
        &self,
×
250
        row: &Row,
×
251
    ) -> Result<(String, FieldDefinition, bool, u32, String), PostgresSchemaError> {
×
252
        let table_name: String = row.get(0);
×
253
        let column_name: String = row.get(1);
×
254
        let is_nullable: bool = row.get(2);
×
255
        let is_primary_index: bool = row.get(3);
×
256
        let table_id: u32 = if let Some(rel_id) = row.get(4) {
×
257
            rel_id
×
258
        } else {
259
            let mut s = DefaultHasher::new();
×
260
            table_name.hash(&mut s);
×
261
            s.finish() as u32
×
262
        };
263
        let replication_type_int: i8 = row.get(5);
×
264
        let type_oid: u32 = row.get(6);
×
265
        let typ = Type::from_oid(type_oid);
×
266

267
        let typ = typ.map_or(Err(InvalidColumnType), postgres_type_to_dozer_type)?;
×
268

269
        let replication_type = String::from_utf8(vec![replication_type_int as u8])
×
270
            .map_err(|_e| ValueConversionError("Replication type".to_string()))?;
×
271
        Ok((
×
272
            table_name,
×
273
            FieldDefinition::new(column_name, typ, is_nullable, SourceDefinition::Dynamic),
×
274
            is_primary_index,
×
275
            table_id,
×
276
            replication_type,
×
277
        ))
×
278
    }
×
279
}
280

281
const TABLES_CONDITION: &str = "IN (SELECT table_name
282
                           FROM information_schema.tables
283
                           WHERE table_schema = $1 AND table_type = 'BASE TABLE'
284
                           ORDER BY table_name)";
285

286
const SQL: &str = "
287
SELECT table_info.table_name,
288
       table_info.column_name,
289
       CASE WHEN table_info.is_nullable = 'NO' THEN false ELSE true END AS is_nullable,
290
       CASE
291
           WHEN pc.relreplident = 'd' THEN constraint_info.constraint_type IS NOT NULL
292
           WHEN pc.relreplident = 'i' THEN pa.attname IS NOT NULL
293
           WHEN pc.relreplident = 'n' THEN false
294
           WHEN pc.relreplident = 'f' THEN true
295
           ELSE false
296
           END                                                          AS is_primary_index,
297
       st_user_table.relid,
298
       pc.relreplident,
299
       pt.oid                                                           AS type_oid
300
FROM (SELECT table_schema,
301
             table_catalog,
302
             table_name,
303
             column_name,
304
             is_nullable,
305
             data_type,
306
             numeric_precision,
307
             udt_name,
308
             character_maximum_length
309
      FROM information_schema.columns
310
      WHERE table_name :tables_condition
311
      ORDER BY table_name) table_info
312
         LEFT JOIN pg_catalog.pg_statio_user_tables st_user_table ON st_user_table.relname = table_info.table_name
313
         LEFT JOIN (SELECT constraintUsage.table_name,
314
                           constraintUsage.column_name,
315
                           table_constraints.constraint_name,
316
                           table_constraints.constraint_type
317
                    FROM information_schema.constraint_column_usage constraintUsage
318
                             JOIN information_schema.table_constraints table_constraints
319
                                  ON constraintUsage.table_name = table_constraints.table_name
320
                                      AND constraintUsage.constraint_name = table_constraints.constraint_name
321
                                      AND table_constraints.constraint_type = 'PRIMARY KEY') constraint_info
322
                   ON table_info.table_name = constraint_info.table_name
323
                       AND table_info.column_name = constraint_info.column_name
324
         LEFT JOIN pg_class pc ON st_user_table.relid = pc.oid
325
         LEFT JOIN pg_type pt ON table_info.udt_name = pt.typname
326
         LEFT JOIN pg_index pi ON st_user_table.relid = pi.indrelid AND pi.indisreplident = true
327
         LEFT JOIN pg_attribute pa ON pa.attrelid = pi.indrelid AND pa.attnum = ANY (pi.indkey) AND pa.attnum > 0 AND
328
                                      pa.attname = table_info.column_name
329
ORDER BY table_info.table_schema,
330
         table_info.table_catalog,
331
         table_info.table_name;";
332

333
#[cfg(test)]
334
mod tests {
335
    use crate::connectors::postgres::schema_helper::SchemaHelper;
336
    use crate::connectors::postgres::test_utils::get_client;
337
    use crate::connectors::TableInfo;
338
    use rand::Rng;
339
    use std::collections::HashSet;
340
    use std::hash::Hash;
341

342
    fn assert_vec_eq<T>(a: Vec<T>, b: Vec<T>) -> bool
×
343
    where
×
344
        T: Eq + Hash,
×
345
    {
×
346
        let a: HashSet<_> = a.iter().collect();
×
347
        let b: HashSet<_> = b.iter().collect();
×
348

×
349
        a == b
×
350
    }
×
351

352
    #[test]
×
353
    #[ignore]
354
    // fn connector_e2e_get_tables() {
355
    fn connector_disabled_test_e2e_get_tables() {
×
356
        let mut client = get_client();
×
357

×
358
        let mut rng = rand::thread_rng();
×
359

×
360
        let schema = format!("schema_helper_test_{}", rng.gen::<u32>());
×
361
        let table_name = format!("products_test_{}", rng.gen::<u32>());
×
362

×
363
        client.create_schema(&schema);
×
364
        client.create_simple_table(&schema, &table_name);
×
365

×
366
        let schema_helper = SchemaHelper::new(client.postgres_config.clone(), Some(schema.clone()));
×
367
        let result = schema_helper.get_tables(None).unwrap();
×
368

×
369
        let table = result.get(0).unwrap();
×
370
        assert_eq!(table_name, table.table_name.clone());
×
371
        assert!(assert_vec_eq(
×
372
            vec![
×
373
                "name".to_string(),
×
374
                "description".to_string(),
×
375
                "weight".to_string(),
×
376
                "id".to_string()
×
377
            ],
×
378
            table.columns.clone().unwrap()
×
379
        ));
×
380

381
        client.drop_schema(&schema);
×
382
    }
×
383

384
    #[test]
×
385
    #[ignore]
386
    // fn connector_e2e_get_schema_with_selected_columns() {
387
    fn connector_disabled_test_e2e_get_schema_with_selected_columns() {
×
388
        let mut client = get_client();
×
389

×
390
        let mut rng = rand::thread_rng();
×
391

×
392
        let schema = format!("schema_helper_test_{}", rng.gen::<u32>());
×
393
        let table_name = format!("products_test_{}", rng.gen::<u32>());
×
394

×
395
        client.create_schema(&schema);
×
396
        client.create_simple_table(&schema, &table_name);
×
397

×
398
        let schema_helper = SchemaHelper::new(client.postgres_config.clone(), Some(schema.clone()));
×
399
        let table_info = TableInfo {
×
400
            name: table_name.clone(),
×
401
            table_name: table_name.clone(),
×
402
            id: 0,
×
403
            columns: Some(vec!["name".to_string(), "id".to_string()]),
×
404
        };
×
405
        let result = schema_helper.get_tables(Some(vec![table_info])).unwrap();
×
406

×
407
        let table = result.get(0).unwrap();
×
408
        assert_eq!(table_name, table.table_name.clone());
×
409
        assert!(assert_vec_eq(
×
410
            vec!["name".to_string(), "id".to_string()],
×
411
            table.columns.clone().unwrap()
×
412
        ));
×
413

414
        client.drop_schema(&schema);
×
415
    }
×
416

417
    #[test]
×
418
    #[ignore]
419
    // fn connector_e2e_get_schema_without_selected_columns() {
420
    fn connector_disabled_test_e2e_get_schema_without_selected_columns() {
×
421
        let mut client = get_client();
×
422

×
423
        let mut rng = rand::thread_rng();
×
424

×
425
        let schema = format!("schema_helper_test_{}", rng.gen::<u32>());
×
426
        let table_name = format!("products_test_{}", rng.gen::<u32>());
×
427

×
428
        client.create_schema(&schema);
×
429
        client.create_simple_table(&schema, &table_name);
×
430

×
431
        let schema_helper = SchemaHelper::new(client.postgres_config.clone(), Some(schema.clone()));
×
432
        let table_info = TableInfo {
×
433
            name: table_name.clone(),
×
434
            table_name: table_name.clone(),
×
435
            id: 0,
×
436
            columns: Some(vec![]),
×
437
        };
×
438
        let result = schema_helper.get_tables(Some(vec![table_info])).unwrap();
×
439

×
440
        let table = result.get(0).unwrap();
×
441
        assert_eq!(table_name, table.table_name.clone());
×
442
        assert!(assert_vec_eq(
×
443
            vec![
×
444
                "id".to_string(),
×
445
                "name".to_string(),
×
446
                "description".to_string(),
×
447
                "weight".to_string()
×
448
            ],
×
449
            table.columns.clone().unwrap()
×
450
        ));
×
451

452
        client.drop_schema(&schema);
×
453
    }
×
454
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc