• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5611876656

pending completion
5611876656

Pull #1776

github

chubei
chore: Remove `Schema::identifier`
Pull Request #1776: chore: Remove `Schema::identifier`

1470 of 1470 new or added lines in 98 files covered. (100.0%)

43423 of 57848 relevant lines covered (75.06%)

33086.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.54
/dozer-ingestion/src/connectors/postgres/xlog_mapper.rs
1
use crate::connectors::postgres::helper;
2
use crate::errors::{PostgresConnectorError, PostgresSchemaError};
3
use dozer_types::node::OpIdentifier;
4
use dozer_types::types::{Field, Operation, Record};
5
use helper::postgres_type_to_dozer_type;
6
use postgres_protocol::message::backend::LogicalReplicationMessage::{
7
    Begin, Commit, Delete, Insert, Relation, Update,
8
};
9
use postgres_protocol::message::backend::{
10
    LogicalReplicationMessage, RelationBody, ReplicaIdentity, TupleData, UpdateBody, XLogDataBody,
11
};
12
use postgres_types::Type;
13
use std::collections::hash_map::Entry;
14
use std::collections::HashMap;
15

16
#[derive(Debug)]
×
17
pub struct Table {
18
    columns: Vec<TableColumn>,
19
    replica_identity: ReplicaIdentity,
20
}
21

22
#[derive(Debug)]
×
23
pub struct TableColumn {
×
24
    pub name: String,
×
25
    pub flags: i8,
26
    pub r#type: Type,
27
    pub column_index: usize,
×
28
}
29

30
#[derive(Debug, Clone)]
×
31
pub enum MappedReplicationMessage {
32
    Begin,
33
    Commit(OpIdentifier),
34
    Operation { table_index: usize, op: Operation },
35
}
×
36

37
#[derive(Debug, Default)]
×
38
pub struct XlogMapper {
39
    /// Relation id to table info from replication `Relation` message.
40
    relations_map: HashMap<u32, Table>,
41
    /// Relation id to (table index, column names).
42
    tables_columns: HashMap<u32, (usize, Vec<String>)>,
43
}
44

45
impl XlogMapper {
×
46
    pub fn new(tables_columns: HashMap<u32, (usize, Vec<String>)>) -> Self {
12✔
47
        XlogMapper {
12✔
48
            relations_map: HashMap::<u32, Table>::new(),
12✔
49
            tables_columns,
12✔
50
        }
12✔
51
    }
12✔
52

×
53
    pub fn handle_message(
×
54
        &mut self,
×
55
        message: XLogDataBody<LogicalReplicationMessage>,
×
56
    ) -> Result<Option<MappedReplicationMessage>, PostgresConnectorError> {
×
57
        match &message.data() {
×
58
            Relation(relation) => {
×
59
                self.ingest_schema(relation)?;
×
60
            }
61
            Commit(commit) => {
×
62
                return Ok(Some(MappedReplicationMessage::Commit(OpIdentifier::new(
×
63
                    commit.end_lsn(),
×
64
                    0,
×
65
                ))));
×
66
            }
67
            Begin(_begin) => {
×
68
                return Ok(Some(MappedReplicationMessage::Begin));
×
69
            }
70
            Insert(insert) => {
×
71
                let Some(table_columns) = self.tables_columns.get(&insert.rel_id()) else {
×
72
                    return Ok(None);
×
73
                };
74
                let table_index = table_columns.0;
×
75

×
76
                let table = self.relations_map.get(&insert.rel_id()).unwrap();
×
77
                let new_values = insert.tuple().tuple_data();
×
78

×
79
                let values = Self::convert_values_to_fields(table, new_values, false)?;
×
80

×
81
                let event = Operation::Insert {
×
82
                    new: Record::new(values),
×
83
                };
×
84

×
85
                return Ok(Some(MappedReplicationMessage::Operation {
×
86
                    table_index,
×
87
                    op: event,
×
88
                }));
×
89
            }
×
90
            Update(update) => {
×
91
                let Some(table_columns) = self.tables_columns.get(&update.rel_id()) else {
×
92
                    return Ok(None);
×
93
                };
×
94
                let table_index = table_columns.0;
×
95

×
96
                let table = self.relations_map.get(&update.rel_id()).unwrap();
×
97
                let new_values = update.new_tuple().tuple_data();
×
98

99
                let values = Self::convert_values_to_fields(table, new_values, false)?;
×
100
                let old_values = Self::convert_old_value_to_fields(table, update)?;
×
101

×
102
                let event = Operation::Update {
×
103
                    old: Record::new(old_values),
×
104
                    new: Record::new(values),
×
105
                };
×
106

×
107
                return Ok(Some(MappedReplicationMessage::Operation {
×
108
                    table_index,
×
109
                    op: event,
×
110
                }));
×
111
            }
112
            Delete(delete) => {
×
113
                let Some(table_columns) = self.tables_columns.get(&delete.rel_id()) else {
×
114
                    return Ok(None);
×
115
                };
×
116
                let table_index = table_columns.0;
×
117

×
118
                // TODO: Use only columns with .flags() = 0
×
119
                let table = self.relations_map.get(&delete.rel_id()).unwrap();
×
120
                let key_values = delete.key_tuple().unwrap().tuple_data();
×
121

×
122
                let values = Self::convert_values_to_fields(table, key_values, true)?;
×
123

×
124
                let event = Operation::Delete {
×
125
                    old: Record::new(values),
×
126
                };
×
127

×
128
                return Ok(Some(MappedReplicationMessage::Operation {
×
129
                    table_index,
×
130
                    op: event,
×
131
                }));
×
132
            }
133
            _ => {}
×
134
        }
×
135

×
136
        Ok(None)
×
137
    }
×
138

×
139
    fn ingest_schema(&mut self, relation: &RelationBody) -> Result<(), PostgresConnectorError> {
×
140
        let rel_id = relation.rel_id();
×
141
        let Some((table_index, wanted_columns)) = self.tables_columns.get(&rel_id) else {
×
142
            return Ok(());
×
143
        };
×
144

×
145
        let mut columns = vec![];
×
146
        for (column_index, column) in relation.columns().iter().enumerate() {
×
147
            let column_name =
×
148
                column
×
149
                    .name()
×
150
                    .map_err(|_| PostgresConnectorError::NonUtf8ColumnName {
×
151
                        table_index: *table_index,
×
152
                        column_index,
×
153
                    })?;
×
154

×
155
            if !wanted_columns.is_empty()
×
156
                && !wanted_columns
×
157
                    .iter()
×
158
                    .any(|column| column.as_str() == column_name)
×
159
            {
×
160
                continue;
×
161
            }
×
162

×
163
            columns.push(TableColumn {
×
164
                name: column_name.to_string(),
×
165
                flags: column.flags(),
×
166
                r#type: Type::from_oid(column.type_id() as u32)
×
167
                    .ok_or(PostgresSchemaError::InvalidColumnType)?,
×
168
                column_index,
×
169
            })
×
170
        }
×
171

×
172
        let replica_identity = match relation.replica_identity() {
×
173
            ReplicaIdentity::Default => ReplicaIdentity::Default,
×
174
            ReplicaIdentity::Nothing => ReplicaIdentity::Nothing,
×
175
            ReplicaIdentity::Full => ReplicaIdentity::Full,
×
176
            ReplicaIdentity::Index => ReplicaIdentity::Index,
×
177
        };
178

×
179
        let table = Table {
×
180
            columns,
×
181
            replica_identity,
×
182
        };
×
183

184
        for c in &table.columns {
×
185
            postgres_type_to_dozer_type(c.r#type.clone())?;
×
186
        }
×
187

×
188
        match self.relations_map.entry(rel_id) {
×
189
            Entry::Occupied(mut entry) => {
×
190
                // Check if type has changed.
×
191
                for (existing_column, column) in entry.get().columns.iter().zip(&table.columns) {
×
192
                    if existing_column.r#type != column.r#type {
×
193
                        return Err(PostgresConnectorError::ColumnTypeChanged {
×
194
                            table_index: *table_index,
×
195
                            column_name: existing_column.name.clone(),
×
196
                            old_type: existing_column.r#type.clone(),
×
197
                            new_type: column.r#type.clone(),
×
198
                        });
×
199
                    }
×
200
                }
×
201

×
202
                entry.insert(table);
×
203
            }
×
204
            Entry::Vacant(entry) => {
×
205
                entry.insert(table);
×
206
            }
×
207
        }
×
208

×
209
        Ok(())
×
210
    }
×
211

×
212
    fn convert_values_to_fields(
×
213
        table: &Table,
×
214
        new_values: &[TupleData],
×
215
        only_key: bool,
×
216
    ) -> Result<Vec<Field>, PostgresConnectorError> {
×
217
        let mut values: Vec<Field> = vec![];
×
218

×
219
        for column in &table.columns {
×
220
            if column.flags == 1 || !only_key {
×
221
                let value = new_values.get(column.column_index).unwrap();
×
222
                match value {
×
223
                    TupleData::Null => values.push(
×
224
                        helper::postgres_type_to_field(None, column)
×
225
                            .map_err(PostgresConnectorError::PostgresSchemaError)?,
×
226
                    ),
×
227
                    TupleData::UnchangedToast => {}
×
228
                    TupleData::Text(text) => values.push(
×
229
                        helper::postgres_type_to_field(Some(text), column)
×
230
                            .map_err(PostgresConnectorError::PostgresSchemaError)?,
×
231
                    ),
×
232
                }
×
233
            } else {
×
234
                values.push(Field::Null);
×
235
            }
×
236
        }
×
237

238
        Ok(values)
×
239
    }
×
240

×
241
    fn convert_old_value_to_fields(
×
242
        table: &Table,
×
243
        update: &UpdateBody,
×
244
    ) -> Result<Vec<Field>, PostgresConnectorError> {
×
245
        match table.replica_identity {
×
246
            ReplicaIdentity::Default | ReplicaIdentity::Full | ReplicaIdentity::Index => {
×
247
                update.key_tuple().map_or_else(
×
248
                    || Self::convert_values_to_fields(table, update.new_tuple().tuple_data(), true),
×
249
                    |key_tuple| Self::convert_values_to_fields(table, key_tuple.tuple_data(), true),
×
250
                )
×
251
            }
×
252
            ReplicaIdentity::Nothing => Ok(vec![]),
×
253
        }
×
254
    }
×
255
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc