• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5640948961

pending completion
5640948961

push

github

web-flow
chore: Remove `Schema::identifier` (#1776)

1575 of 1575 new or added lines in 103 files covered. (100.0%)

42852 of 55649 relevant lines covered (77.0%)

21742.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.08
/dozer-ingestion/src/connectors/postgres/schema/helper.rs
1
use std::collections::HashMap;
2

3
use crate::connectors::{CdcType, ListOrFilterColumns, SourceSchema, SourceSchemaResult};
4
use crate::errors::{ConnectorError, PostgresConnectorError, PostgresSchemaError};
5
use dozer_types::types::{FieldDefinition, Schema, SourceDefinition};
6

7
use crate::connectors::postgres::connection::helper;
8
use crate::connectors::postgres::helper::postgres_type_to_dozer_type;
9
use crate::errors::PostgresSchemaError::{InvalidColumnType, ValueConversionError};
10

11
use postgres_types::Type;
12

13
use crate::connectors::postgres::schema::sorter::sort_schemas;
14
use tokio_postgres::Row;
15

16
use PostgresSchemaError::TableTypeNotFound;
17

18
#[derive(Debug)]
×
19
pub struct SchemaHelper {
20
    conn_config: tokio_postgres::Config,
21
}
22

23
struct PostgresTableRow {
24
    schema: String,
25
    table_name: String,
26
    field: FieldDefinition,
27
    is_column_used_in_index: bool,
28
    replication_type: String,
29
}
30

31
#[derive(Clone, Debug)]
4✔
32
pub struct PostgresTable {
33
    fields: Vec<FieldDefinition>,
34
    // Indexes of fields, which are used for replication identity
35
    // Default - uses PK for identity
36
    // Index - uses selected index fields for identity
37
    // Full - all fields are used for identity
38
    // Nothing - no fields can be used for identity.
39
    //  Postgres will not return old values in update and delete replication messages
40
    index_keys: Vec<bool>,
41
    replication_type: String,
42
}
43

44
pub(crate) type SchemaTableIdentifier = (String, String);
45
impl PostgresTable {
46
    pub fn new(replication_type: String) -> Self {
68✔
47
        Self {
68✔
48
            fields: vec![],
68✔
49
            index_keys: vec![],
68✔
50
            replication_type,
68✔
51
        }
68✔
52
    }
68✔
53

54
    pub fn add_field(&mut self, field: FieldDefinition, is_column_used_in_index: bool) {
748✔
55
        self.fields.push(field);
748✔
56
        self.index_keys.push(is_column_used_in_index);
748✔
57
    }
748✔
58

59
    pub fn fields(&self) -> &Vec<FieldDefinition> {
384✔
60
        &self.fields
384✔
61
    }
384✔
62

63
    pub fn is_index_field(&self, index: usize) -> Option<&bool> {
375✔
64
        self.index_keys.get(index)
375✔
65
    }
375✔
66

67
    pub fn get_field(&self, index: usize) -> Option<&FieldDefinition> {
372✔
68
        self.fields.get(index)
372✔
69
    }
372✔
70

71
    pub fn replication_type(&self) -> &String {
32✔
72
        &self.replication_type
32✔
73
    }
32✔
74
}
75

76
#[derive(Debug, Clone)]
12✔
77
pub struct PostgresTableInfo {
78
    pub schema: String,
79
    pub name: String,
80
    pub relation_id: u32,
81
    pub columns: Vec<String>,
82
}
83

84
type RowsWithColumnsMap = (Vec<Row>, HashMap<SchemaTableIdentifier, Vec<String>>);
85

86
impl SchemaHelper {
87
    pub fn new(conn_config: tokio_postgres::Config) -> SchemaHelper {
37✔
88
        Self { conn_config }
37✔
89
    }
37✔
90

91
    pub async fn get_tables(
35✔
92
        &self,
35✔
93
        tables: Option<&[ListOrFilterColumns]>,
35✔
94
    ) -> Result<Vec<PostgresTableInfo>, ConnectorError> {
35✔
95
        let (results, tables_columns_map) = self.get_columns(tables).await?;
242✔
96

97
        let mut table_columns_map: HashMap<SchemaTableIdentifier, (u32, Vec<String>)> =
35✔
98
            HashMap::new();
35✔
99
        for row in results {
571✔
100
            let schema: String = row.get(8);
536✔
101
            let table_name: String = row.get(0);
536✔
102
            let column_name: String = row.get(1);
536✔
103
            let relation_id: u32 = row.get(4);
536✔
104

536✔
105
            let schema_table_tuple = (schema, table_name);
536✔
106
            let add_column_table = tables_columns_map
536✔
107
                .get(&schema_table_tuple)
536✔
108
                .map_or(true, |columns| {
536✔
109
                    columns.is_empty() || columns.contains(&column_name)
184✔
110
                });
536✔
111

536✔
112
            if add_column_table {
536✔
113
                match table_columns_map.get_mut(&schema_table_tuple) {
534✔
114
                    Some((existing_relation_id, columns)) => {
499✔
115
                        columns.push(column_name);
499✔
116
                        assert_eq!(*existing_relation_id, relation_id);
499✔
117
                    }
118
                    None => {
35✔
119
                        table_columns_map
35✔
120
                            .insert(schema_table_tuple, (relation_id, vec![column_name]));
35✔
121
                    }
35✔
122
                }
123
            }
2✔
124
        }
125

126
        Ok(if let Some(tables) = tables {
35✔
127
            let mut result = vec![];
30✔
128
            for table in tables {
60✔
129
                result.push(find_table(
30✔
130
                    &table_columns_map,
30✔
131
                    table.schema.as_deref(),
30✔
132
                    &table.name,
30✔
133
                )?);
30✔
134
            }
135
            result
30✔
136
        } else {
137
            table_columns_map
5✔
138
                .into_iter()
5✔
139
                .map(
5✔
140
                    |((schema, name), (relation_id, columns))| PostgresTableInfo {
5✔
141
                        name,
5✔
142
                        relation_id,
5✔
143
                        columns,
5✔
144
                        schema,
5✔
145
                    },
5✔
146
                )
5✔
147
                .collect()
5✔
148
        })
149
    }
35✔
150

151
    async fn get_columns(
68✔
152
        &self,
68✔
153
        tables: Option<&[ListOrFilterColumns]>,
68✔
154
    ) -> Result<RowsWithColumnsMap, PostgresConnectorError> {
68✔
155
        let mut tables_columns_map: HashMap<SchemaTableIdentifier, Vec<String>> = HashMap::new();
68✔
156
        let client = helper::connect(self.conn_config.clone()).await?;
332✔
157
        let query = if let Some(tables) = tables {
68✔
158
            tables.iter().for_each(|t| {
63✔
159
                if let Some(columns) = t.columns.clone() {
63✔
160
                    tables_columns_map.insert(
44✔
161
                        (
44✔
162
                            t.schema
44✔
163
                                .as_ref()
44✔
164
                                .map_or(DEFAULT_SCHEMA_NAME.to_string(), |s| s.to_string()),
44✔
165
                            t.name.clone(),
44✔
166
                        ),
44✔
167
                        columns,
44✔
168
                    );
44✔
169
                }
44✔
170
            });
63✔
171

63✔
172
            let schemas: Vec<String> = tables
63✔
173
                .iter()
63✔
174
                .map(|t| {
63✔
175
                    t.schema
63✔
176
                        .as_ref()
63✔
177
                        .map_or_else(|| DEFAULT_SCHEMA_NAME.to_string(), |s| s.clone())
63✔
178
                })
63✔
179
                .collect();
63✔
180
            let table_names: Vec<String> = tables.iter().map(|t| t.name.clone()).collect();
63✔
181
            let sql = str::replace(
63✔
182
                SQL,
63✔
183
                ":tables_name_condition",
63✔
184
                "t.table_schema = ANY($1) AND t.table_name = ANY($2)",
63✔
185
            );
63✔
186
            client.query(&sql, &[&schemas, &table_names]).await
126✔
187
        } else {
188
            let sql = str::replace(SQL, ":tables_name_condition", "t.table_type = 'BASE TABLE'");
5✔
189
            client.query(&sql, &[]).await
10✔
190
        };
191

192
        query
68✔
193
            .map_err(PostgresConnectorError::InvalidQueryError)
68✔
194
            .map(|rows| (rows, tables_columns_map))
68✔
195
    }
68✔
196

197
    pub async fn get_schemas(
33✔
198
        &self,
33✔
199
        tables: &[ListOrFilterColumns],
33✔
200
    ) -> Result<Vec<SourceSchemaResult>, PostgresConnectorError> {
33✔
201
        let (results, tables_columns_map) = self.get_columns(Some(tables)).await?;
226✔
202

203
        let mut columns_map: HashMap<SchemaTableIdentifier, PostgresTable> = HashMap::new();
33✔
204
        results
33✔
205
            .iter()
33✔
206
            .filter(|row| {
365✔
207
                let schema: String = row.get(8);
365✔
208
                let table_name: String = row.get(0);
365✔
209
                let column_name: String = row.get(1);
365✔
210

365✔
211
                tables_columns_map
365✔
212
                    .get(&(schema, table_name))
365✔
213
                    .map_or(true, |columns| {
365✔
214
                        columns.is_empty() || columns.contains(&column_name)
361✔
215
                    })
365✔
216
            })
365✔
217
            .map(|r| self.convert_row(r))
365✔
218
            .try_for_each(|table_row| -> Result<(), PostgresSchemaError> {
365✔
219
                let row = table_row?;
365✔
220
                columns_map
364✔
221
                    .entry((row.schema, row.table_name))
364✔
222
                    .and_modify(|table| {
364✔
223
                        table.add_field(row.field.clone(), row.is_column_used_in_index)
334✔
224
                    })
364✔
225
                    .or_insert_with(|| {
364✔
226
                        let mut table = PostgresTable::new(row.replication_type);
30✔
227
                        table.add_field(row.field, row.is_column_used_in_index);
30✔
228
                        table
30✔
229
                    });
364✔
230

364✔
231
                Ok(())
364✔
232
            })?;
365✔
233

234
        let columns_map = sort_schemas(tables, &columns_map)?;
32✔
235

236
        Ok(Self::map_columns_to_schemas(columns_map))
30✔
237
    }
33✔
238

239
    fn map_columns_to_schemas(
30✔
240
        postgres_tables: Vec<(SchemaTableIdentifier, PostgresTable)>,
30✔
241
    ) -> Vec<SourceSchemaResult> {
30✔
242
        postgres_tables
30✔
243
            .into_iter()
30✔
244
            .map(|((_, table_name), table)| {
30✔
245
                Self::map_schema(&table_name, table).map_err(|e| {
30✔
246
                    ConnectorError::PostgresConnectorError(
×
247
                        PostgresConnectorError::PostgresSchemaError(e),
×
248
                    )
×
249
                })
30✔
250
            })
30✔
251
            .collect()
30✔
252
    }
30✔
253

254
    fn map_schema(
30✔
255
        table_name: &str,
30✔
256
        table: PostgresTable,
30✔
257
    ) -> Result<SourceSchema, PostgresSchemaError> {
30✔
258
        let primary_index: Vec<usize> = table
30✔
259
            .index_keys
30✔
260
            .iter()
30✔
261
            .enumerate()
30✔
262
            .filter(|(_, b)| **b)
364✔
263
            .map(|(idx, _)| idx)
30✔
264
            .collect();
30✔
265

30✔
266
        let schema = Schema {
30✔
267
            fields: table.fields.clone(),
30✔
268
            primary_index,
30✔
269
        };
30✔
270

271
        let cdc_type = match table.replication_type.as_str() {
30✔
272
            "d" => {
30✔
273
                if schema.primary_index.is_empty() {
30✔
274
                    Ok(CdcType::Nothing)
28✔
275
                } else {
276
                    Ok(CdcType::OnlyPK)
2✔
277
                }
278
            }
279
            "i" => Ok(CdcType::OnlyPK),
×
280
            "n" => Ok(CdcType::Nothing),
×
281
            "f" => Ok(CdcType::FullChanges),
×
282
            typ => Err(PostgresSchemaError::UnsupportedReplicationType(
×
283
                typ.to_string(),
×
284
            )),
×
285
        }?;
×
286

287
        let source_schema = SourceSchema::new(schema, cdc_type);
30✔
288
        Self::validate_schema_replication_identity(table_name, &source_schema)?;
30✔
289

290
        Ok(source_schema)
30✔
291
    }
30✔
292

293
    fn validate_schema_replication_identity(
30✔
294
        table_name: &str,
30✔
295
        schema: &SourceSchema,
30✔
296
    ) -> Result<(), PostgresSchemaError> {
30✔
297
        if schema.cdc_type == CdcType::OnlyPK && schema.schema.primary_index.is_empty() {
30✔
298
            Err(PostgresSchemaError::PrimaryKeyIsMissingInSchema(
×
299
                table_name.to_string(),
×
300
            ))
×
301
        } else {
302
            Ok(())
30✔
303
        }
304
    }
30✔
305
    fn convert_row(&self, row: &Row) -> Result<PostgresTableRow, PostgresSchemaError> {
365✔
306
        let schema: String = row.get(8);
365✔
307
        let table_name: String = row.get(0);
365✔
308
        let table_type: Option<String> = row.get(7);
365✔
309
        if let Some(typ) = table_type {
365✔
310
            if typ != *"BASE TABLE" {
365✔
311
                return Err(PostgresSchemaError::UnsupportedTableType(typ, table_name));
1✔
312
            }
364✔
313
        } else {
314
            return Err(TableTypeNotFound);
×
315
        }
316

317
        let column_name: String = row.get(1);
364✔
318
        let is_nullable: bool = row.get(2);
364✔
319
        let is_column_used_in_index: bool = row.get(3);
364✔
320
        let replication_type_int: i8 = row.get(5);
364✔
321
        let type_oid: u32 = row.get(6);
364✔
322
        let typ = Type::from_oid(type_oid);
364✔
323

324
        let typ = typ.map_or(Err(InvalidColumnType), postgres_type_to_dozer_type)?;
364✔
325

326
        let replication_type = String::from_utf8(vec![replication_type_int as u8])
364✔
327
            .map_err(|_e| ValueConversionError("Replication type".to_string()))?;
364✔
328

329
        Ok(PostgresTableRow {
364✔
330
            schema,
364✔
331
            table_name,
364✔
332
            field: FieldDefinition::new(column_name, typ, is_nullable, SourceDefinition::Dynamic),
364✔
333
            is_column_used_in_index,
364✔
334
            replication_type,
364✔
335
        })
364✔
336
    }
365✔
337
}
338

339
pub const DEFAULT_SCHEMA_NAME: &str = "public";
340

341
fn find_table(
30✔
342
    table_columns_map: &HashMap<SchemaTableIdentifier, (u32, Vec<String>)>,
30✔
343
    schema_name: Option<&str>,
30✔
344
    table_name: &str,
30✔
345
) -> Result<PostgresTableInfo, PostgresConnectorError> {
30✔
346
    let schema_name = schema_name.unwrap_or(DEFAULT_SCHEMA_NAME);
30✔
347
    let schema_table_identifier = (schema_name.to_string(), table_name.to_string());
30✔
348
    if let Some((relation_id, columns)) = table_columns_map.get(&schema_table_identifier) {
30✔
349
        Ok(PostgresTableInfo {
30✔
350
            schema: schema_table_identifier.0,
30✔
351
            name: schema_table_identifier.1,
30✔
352
            relation_id: *relation_id,
30✔
353
            columns: columns.clone(),
30✔
354
        })
30✔
355
    } else {
356
        Err(PostgresConnectorError::TablesNotFound(vec![
×
357
            schema_table_identifier,
×
358
        ]))
×
359
    }
360
}
30✔
361

362
const SQL: &str = "
363
SELECT table_info.table_name,
364
       table_info.column_name,
365
       CASE WHEN table_info.is_nullable = 'NO' THEN false ELSE true END AS is_nullable,
366
       CASE
367
           WHEN pc.relreplident = 'd' OR pc.relreplident = 'i'
368
               THEN pa.attrelid IS NOT NULL
369
           WHEN pc.relreplident = 'n' THEN false
370
           WHEN pc.relreplident = 'f' THEN true
371
           ELSE false
372
           END                                                          AS is_column_used_in_index,
373
       pc.oid,
374
       pc.relreplident,
375
       pt.oid                                                           AS type_oid,
376
       t.table_type,
377
       t.table_schema
378
FROM information_schema.columns table_info
379
         LEFT JOIN information_schema.tables t ON t.table_name = table_info.table_name AND t.table_schema = table_info.table_schema
380
         LEFT JOIN pg_namespace ns ON t.table_schema = ns.nspname
381
         LEFT JOIN pg_class pc ON t.table_name = pc.relname AND ns.oid = pc.relnamespace
382
         LEFT JOIN pg_type pt ON table_info.udt_name = pt.typname
383
         LEFT JOIN pg_index pi ON pc.oid = pi.indrelid AND
384
                                  ((pi.indisreplident = true AND pc.relreplident = 'i') OR (pi.indisprimary AND pc.relreplident = 'd'))
385
         LEFT JOIN pg_attribute pa ON
386
             pa.attrelid = pi.indrelid
387
                 AND pa.attnum = ANY (pi.indkey)
388
                 AND pa.attnum > 0
389
                 AND pa.attname = table_info.column_name
390
WHERE :tables_name_condition AND ns.nspname not in ('information_schema', 'pg_catalog')
391
      and ns.nspname not like 'pg_toast%'
392
      and ns.nspname not like 'pg_temp_%'
393
ORDER BY table_info.table_schema,
394
         table_info.table_catalog,
395
         table_info.table_name,
396
         table_info.ordinal_position;";
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc