• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6472204957

09 Oct 2023 06:57PM UTC coverage: 77.582%. First build
6472204957

push

github

web-flow
chore: init schema option at connection level optionally (#2145)

* chore: init schema option at connection level

* chore: fix clippy

40 of 40 new or added lines in 11 files covered. (100.0%)

51962 of 66977 relevant lines covered (77.58%)

165501.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/dozer-ingestion/src/connectors/postgres/schema/helper.rs
1
use std::collections::HashMap;
2

3
use crate::connectors::{CdcType, ListOrFilterColumns, SourceSchema, SourceSchemaResult};
4
use crate::errors::{ConnectorError, PostgresConnectorError, PostgresSchemaError};
5
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
6

7
use crate::connectors::postgres::connection::helper;
8
use crate::connectors::postgres::helper::postgres_type_to_dozer_type;
9
use crate::errors::PostgresSchemaError::{InvalidColumnType, ValueConversionError};
10

11
use postgres_types::Type;
12

13
use crate::connectors::postgres::schema::sorter::sort_schemas;
14
use tokio_postgres::Row;
15

16
use PostgresSchemaError::TableTypeNotFound;
17

18
#[derive(Debug)]
×
19
pub struct SchemaHelper {
20
    conn_config: tokio_postgres::Config,
21
    // Postgres schema
22
    schema: Option<String>,
23
}
24

25
struct PostgresTableRow {
26
    schema: String,
27
    table_name: String,
28
    field: FieldDefinition,
29
    is_column_used_in_index: bool,
30
    replication_type: String,
31
}
32

33
#[derive(Clone, Debug)]
4✔
34
pub struct PostgresTable {
35
    fields: Vec<FieldDefinition>,
36
    // Indexes of fields, which are used for replication identity
37
    // Default - uses PK for identity
38
    // Index - uses selected index fields for identity
39
    // Full - all fields are used for identity
40
    // Nothing - no fields can be used for identity.
41
    //  Postgres will not return old values in update and delete replication messages
42
    index_keys: Vec<bool>,
43
    replication_type: String,
44
}
45

46
pub(crate) type SchemaTableIdentifier = (String, String);
47
impl PostgresTable {
48
    pub fn new(replication_type: String) -> Self {
108✔
49
        Self {
108✔
50
            fields: vec![],
108✔
51
            index_keys: vec![],
108✔
52
            replication_type,
108✔
53
        }
108✔
54
    }
108✔
55

56
    pub fn add_field(&mut self, field: FieldDefinition, is_column_used_in_index: bool) {
1,116✔
57
        self.fields.push(field);
1,116✔
58
        self.index_keys.push(is_column_used_in_index);
1,116✔
59
    }
1,116✔
60

61
    pub fn fields(&self) -> &Vec<FieldDefinition> {
568✔
62
        &self.fields
568✔
63
    }
568✔
64

65
    pub fn is_index_field(&self, index: usize) -> Option<&bool> {
559✔
66
        self.index_keys.get(index)
559✔
67
    }
559✔
68

69
    pub fn get_field(&self, index: usize) -> Option<&FieldDefinition> {
556✔
70
        self.fields.get(index)
556✔
71
    }
556✔
72

73
    pub fn replication_type(&self) -> &String {
52✔
74
        &self.replication_type
52✔
75
    }
52✔
76
}
77

78
#[derive(Debug, Clone)]
24✔
79
pub struct PostgresTableInfo {
80
    pub schema: String,
81
    pub name: String,
82
    pub relation_id: u32,
83
    pub columns: Vec<String>,
84
}
85

86
type RowsWithColumnsMap = (Vec<Row>, HashMap<SchemaTableIdentifier, Vec<String>>);
87

88
impl SchemaHelper {
89
    pub fn new(conn_config: tokio_postgres::Config, schema: Option<String>) -> SchemaHelper {
57✔
90
        Self {
57✔
91
            conn_config,
57✔
92
            schema,
57✔
93
        }
57✔
94
    }
57✔
95

96
    pub async fn get_tables(
57✔
97
        &self,
57✔
98
        tables: Option<&[ListOrFilterColumns]>,
57✔
99
    ) -> Result<Vec<PostgresTableInfo>, ConnectorError> {
57✔
100
        let (results, tables_columns_map) = self.get_columns(tables).await?;
396✔
101

102
        let mut table_columns_map: HashMap<SchemaTableIdentifier, (u32, Vec<String>)> =
57✔
103
            HashMap::new();
57✔
104
        for row in results {
861✔
105
            let schema: String = row.get(8);
804✔
106
            let table_name: String = row.get(0);
804✔
107
            let column_name: String = row.get(1);
804✔
108
            let relation_id: u32 = row.get(4);
804✔
109

804✔
110
            let schema_table_tuple = (schema, table_name);
804✔
111
            let add_column_table = tables_columns_map
804✔
112
                .get(&schema_table_tuple)
804✔
113
                .map_or(true, |columns| {
804✔
114
                    columns.is_empty() || columns.contains(&column_name)
278✔
115
                });
804✔
116

804✔
117
            if add_column_table {
804✔
118
                match table_columns_map.get_mut(&schema_table_tuple) {
802✔
119
                    Some((existing_relation_id, columns)) => {
745✔
120
                        columns.push(column_name);
745✔
121
                        assert_eq!(*existing_relation_id, relation_id);
745✔
122
                    }
123
                    None => {
57✔
124
                        table_columns_map
57✔
125
                            .insert(schema_table_tuple, (relation_id, vec![column_name]));
57✔
126
                    }
57✔
127
                }
128
            }
2✔
129
        }
130

131
        Ok(if let Some(tables) = tables {
57✔
132
            let mut result = vec![];
50✔
133
            for table in tables {
100✔
134
                result.push(find_table(
50✔
135
                    &table_columns_map,
50✔
136
                    table.schema.as_deref(),
50✔
137
                    &table.name,
50✔
138
                )?);
50✔
139
            }
140
            result
50✔
141
        } else {
142
            table_columns_map
7✔
143
                .into_iter()
7✔
144
                .map(
7✔
145
                    |((schema, name), (relation_id, columns))| PostgresTableInfo {
7✔
146
                        name,
7✔
147
                        relation_id,
7✔
148
                        columns,
7✔
149
                        schema,
7✔
150
                    },
7✔
151
                )
7✔
152
                .collect()
7✔
153
        })
154
    }
57✔
155

156
    async fn get_columns(
110✔
157
        &self,
110✔
158
        tables: Option<&[ListOrFilterColumns]>,
110✔
159
    ) -> Result<RowsWithColumnsMap, PostgresConnectorError> {
110✔
160
        let mut tables_columns_map: HashMap<SchemaTableIdentifier, Vec<String>> = HashMap::new();
110✔
161
        let mut client = helper::connect(self.conn_config.clone()).await?;
542✔
162
        let query = if let Some(tables) = tables {
110✔
163
            tables.iter().for_each(|t| {
103✔
164
                if let Some(columns) = t.columns.clone() {
103✔
165
                    tables_columns_map.insert(
76✔
166
                        (
76✔
167
                            t.schema
76✔
168
                                .as_ref()
76✔
169
                                .map_or(DEFAULT_SCHEMA_NAME.to_string(), |s| s.to_string()),
76✔
170
                            t.name.clone(),
76✔
171
                        ),
76✔
172
                        columns,
76✔
173
                    );
76✔
174
                }
76✔
175
            });
103✔
176

103✔
177
            let schemas: Vec<String> = tables
103✔
178
                .iter()
103✔
179
                .map(|t| {
103✔
180
                    t.schema
103✔
181
                        .as_ref()
103✔
182
                        .map_or_else(|| DEFAULT_SCHEMA_NAME.to_string(), |s| s.clone())
103✔
183
                })
103✔
184
                .collect();
103✔
185
            let table_names: Vec<String> = tables.iter().map(|t| t.name.clone()).collect();
103✔
186
            let sql = str::replace(
103✔
187
                SQL,
103✔
188
                ":tables_name_condition",
103✔
189
                "t.table_schema = ANY($1) AND t.table_name = ANY($2)",
103✔
190
            );
103✔
191
            client.query(&sql, &[&schemas, &table_names]).await
206✔
192
        } else if let Some(schema) = &self.schema {
7✔
193
            let sql = str::replace(
×
194
                SQL,
×
195
                ":tables_name_condition",
×
196
                "t.table_schema = $1 AND t.table_type = 'BASE TABLE'",
×
197
            );
×
198
            client.query(&sql, &[&schema]).await
×
199
        } else {
200
            let sql = str::replace(SQL, ":tables_name_condition", "t.table_type = 'BASE TABLE'");
7✔
201
            client.query(&sql, &[]).await
14✔
202
        };
203

204
        query
110✔
205
            .map_err(PostgresConnectorError::InvalidQueryError)
110✔
206
            .map(|rows| (rows, tables_columns_map))
110✔
207
    }
110✔
208

209
    pub async fn get_schemas(
53✔
210
        &self,
53✔
211
        tables: &[ListOrFilterColumns],
53✔
212
    ) -> Result<Vec<SourceSchemaResult>, PostgresConnectorError> {
53✔
213
        let (results, tables_columns_map) = self.get_columns(Some(tables)).await?;
366✔
214

215
        let mut columns_map: HashMap<SchemaTableIdentifier, PostgresTable> = HashMap::new();
53✔
216
        results
53✔
217
            .iter()
53✔
218
            .filter(|row| {
549✔
219
                let schema: String = row.get(8);
549✔
220
                let table_name: String = row.get(0);
549✔
221
                let column_name: String = row.get(1);
549✔
222

549✔
223
                tables_columns_map
549✔
224
                    .get(&(schema, table_name))
549✔
225
                    .map_or(true, |columns| {
549✔
226
                        columns.is_empty() || columns.contains(&column_name)
545✔
227
                    })
549✔
228
            })
549✔
229
            .map(|r| self.convert_row(r))
549✔
230
            .try_for_each(|table_row| -> Result<(), PostgresSchemaError> {
549✔
231
                let row = table_row?;
549✔
232
                columns_map
548✔
233
                    .entry((row.schema, row.table_name))
548✔
234
                    .and_modify(|table| {
548✔
235
                        table.add_field(row.field.clone(), row.is_column_used_in_index)
498✔
236
                    })
548✔
237
                    .or_insert_with(|| {
548✔
238
                        let mut table = PostgresTable::new(row.replication_type);
50✔
239
                        table.add_field(row.field, row.is_column_used_in_index);
50✔
240
                        table
50✔
241
                    });
548✔
242

548✔
243
                Ok(())
548✔
244
            })?;
549✔
245

246
        let columns_map = sort_schemas(tables, &columns_map)?;
52✔
247

248
        Ok(Self::map_columns_to_schemas(columns_map))
50✔
249
    }
53✔
250

251
    fn map_columns_to_schemas(
50✔
252
        postgres_tables: Vec<(SchemaTableIdentifier, PostgresTable)>,
50✔
253
    ) -> Vec<SourceSchemaResult> {
50✔
254
        postgres_tables
50✔
255
            .into_iter()
50✔
256
            .map(|((_, table_name), table)| {
50✔
257
                Self::map_schema(&table_name, table).map_err(|e| {
50✔
258
                    ConnectorError::PostgresConnectorError(
×
259
                        PostgresConnectorError::PostgresSchemaError(e),
×
260
                    )
×
261
                })
50✔
262
            })
50✔
263
            .collect()
50✔
264
    }
50✔
265

266
    fn map_schema(
50✔
267
        table_name: &str,
50✔
268
        table: PostgresTable,
50✔
269
    ) -> Result<SourceSchema, PostgresSchemaError> {
50✔
270
        let primary_index: Vec<usize> = table
50✔
271
            .index_keys
50✔
272
            .iter()
50✔
273
            .enumerate()
50✔
274
            .filter(|(_, b)| **b)
548✔
275
            .map(|(idx, _)| idx)
50✔
276
            .collect();
50✔
277

50✔
278
        let schema = Schema {
50✔
279
            fields: table.fields.clone(),
50✔
280
            primary_index,
50✔
281
        };
50✔
282

283
        let cdc_type = match table.replication_type.as_str() {
50✔
284
            "d" => {
50✔
285
                if schema.primary_index.is_empty() {
50✔
286
                    Ok(CdcType::Nothing)
24✔
287
                } else {
288
                    Ok(CdcType::OnlyPK)
26✔
289
                }
290
            }
291
            "i" => Ok(CdcType::OnlyPK),
×
292
            "n" => Ok(CdcType::Nothing),
×
293
            "f" => Ok(CdcType::FullChanges),
×
294
            typ => Err(PostgresSchemaError::UnsupportedReplicationType(
×
295
                typ.to_string(),
×
296
            )),
×
297
        }?;
×
298

299
        let source_schema = SourceSchema::new(schema, cdc_type);
50✔
300
        Self::validate_schema_replication_identity(table_name, &source_schema)?;
50✔
301

302
        Ok(source_schema)
50✔
303
    }
50✔
304

305
    fn validate_schema_replication_identity(
50✔
306
        table_name: &str,
50✔
307
        schema: &SourceSchema,
50✔
308
    ) -> Result<(), PostgresSchemaError> {
50✔
309
        if schema.cdc_type == CdcType::OnlyPK && schema.schema.primary_index.is_empty() {
50✔
310
            Err(PostgresSchemaError::PrimaryKeyIsMissingInSchema(
×
311
                table_name.to_string(),
×
312
            ))
×
313
        } else {
314
            Ok(())
50✔
315
        }
316
    }
50✔
317
    fn convert_row(&self, row: &Row) -> Result<PostgresTableRow, PostgresSchemaError> {
549✔
318
        let schema: String = row.get(8);
549✔
319
        let table_name: String = row.get(0);
549✔
320
        let table_type: Option<String> = row.get(7);
549✔
321
        if let Some(typ) = table_type {
549✔
322
            if typ != *"BASE TABLE" {
549✔
323
                return Err(PostgresSchemaError::UnsupportedTableType(typ, table_name));
1✔
324
            }
548✔
325
        } else {
326
            return Err(TableTypeNotFound);
×
327
        }
328

329
        let column_name: String = row.get(1);
548✔
330
        let is_nullable: bool = row.get(2);
548✔
331
        let is_column_used_in_index: bool = row.get(3);
548✔
332
        let replication_type_int: i8 = row.get(5);
548✔
333
        let type_oid: u32 = row.get(6);
548✔
334

335
        // TODO: workaround - in case of custom enum
336
        let typ = if type_oid == 28862 {
548✔
337
            FieldType::String
×
338
        } else {
339
            let oid_typ = Type::from_oid(type_oid);
548✔
340
            oid_typ.map_or_else(
548✔
341
                || Err(InvalidColumnType(column_name.clone())),
548✔
342
                postgres_type_to_dozer_type,
548✔
343
            )?
548✔
344
        };
345

346
        let replication_type = String::from_utf8(vec![replication_type_int as u8])
548✔
347
            .map_err(|_e| ValueConversionError("Replication type".to_string()))?;
548✔
348

349
        Ok(PostgresTableRow {
548✔
350
            schema,
548✔
351
            table_name,
548✔
352
            field: FieldDefinition::new(column_name, typ, is_nullable, SourceDefinition::Dynamic),
548✔
353
            is_column_used_in_index,
548✔
354
            replication_type,
548✔
355
        })
548✔
356
    }
549✔
357
}
358

359
pub const DEFAULT_SCHEMA_NAME: &str = "public";
360

361
fn find_table(
50✔
362
    table_columns_map: &HashMap<SchemaTableIdentifier, (u32, Vec<String>)>,
50✔
363
    schema_name: Option<&str>,
50✔
364
    table_name: &str,
50✔
365
) -> Result<PostgresTableInfo, PostgresConnectorError> {
50✔
366
    let schema_name = schema_name.unwrap_or(DEFAULT_SCHEMA_NAME);
50✔
367
    let schema_table_identifier = (schema_name.to_string(), table_name.to_string());
50✔
368
    if let Some((relation_id, columns)) = table_columns_map.get(&schema_table_identifier) {
50✔
369
        Ok(PostgresTableInfo {
50✔
370
            schema: schema_table_identifier.0,
50✔
371
            name: schema_table_identifier.1,
50✔
372
            relation_id: *relation_id,
50✔
373
            columns: columns.clone(),
50✔
374
        })
50✔
375
    } else {
376
        Err(PostgresConnectorError::TablesNotFound(vec![
×
377
            schema_table_identifier,
×
378
        ]))
×
379
    }
380
}
50✔
381

382
const SQL: &str = "
383
SELECT table_info.table_name,
384
       table_info.column_name,
385
       CASE WHEN table_info.is_nullable = 'NO' THEN false ELSE true END AS is_nullable,
386
       CASE
387
           WHEN pc.relreplident = 'd' OR pc.relreplident = 'i'
388
               THEN pa.attrelid IS NOT NULL
389
           WHEN pc.relreplident = 'n' THEN false
390
           WHEN pc.relreplident = 'f' THEN true
391
           ELSE false
392
           END                                                          AS is_column_used_in_index,
393
       pc.oid,
394
       pc.relreplident,
395
       pt.oid                                                           AS type_oid,
396
       t.table_type,
397
       t.table_schema
398
FROM information_schema.columns table_info
399
         LEFT JOIN information_schema.tables t ON t.table_name = table_info.table_name AND t.table_schema = table_info.table_schema
400
         LEFT JOIN pg_namespace ns ON t.table_schema = ns.nspname
401
         LEFT JOIN pg_class pc ON t.table_name = pc.relname AND ns.oid = pc.relnamespace
402
         LEFT JOIN pg_type pt ON table_info.udt_name = pt.typname
403
         LEFT JOIN pg_index pi ON pc.oid = pi.indrelid AND
404
                                  ((pi.indisreplident = true AND pc.relreplident = 'i') OR (pi.indisprimary AND pc.relreplident = 'd'))
405
         LEFT JOIN pg_attribute pa ON
406
             pa.attrelid = pi.indrelid
407
                 AND pa.attnum = ANY (pi.indkey)
408
                 AND pa.attnum > 0
409
                 AND pa.attname = table_info.column_name
410
WHERE :tables_name_condition AND ns.nspname not in ('information_schema', 'pg_catalog')
411
      and ns.nspname not like 'pg_toast%'
412
      and ns.nspname not like 'pg_temp_%'
413
ORDER BY table_info.table_schema,
414
         table_info.table_catalog,
415
         table_info.table_name,
416
         table_info.ordinal_position;";
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc