• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007818786

pending completion
4007818786

Pull #733

github

GitHub
Merge baf5c38aa into 6c0ac2b2c
Pull Request #733: Bump diesel from 2.0.2 to 2.0.3

23389 of 34432 relevant lines covered (67.93%)

40326.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/postgres/xlog_mapper.rs
1
use crate::connectors::postgres::helper;
2
use crate::errors::{PostgresConnectorError, PostgresSchemaError};
3
use dozer_types::ingestion_types::IngestionMessage;
4
use dozer_types::types::{
5
    Field, FieldDefinition, Operation, OperationEvent, Record, Schema, SourceDefinition,
6
};
7
use helper::postgres_type_to_dozer_type;
8
use postgres_protocol::message::backend::LogicalReplicationMessage::{
9
    Begin, Commit, Delete, Insert, Relation, Update,
10
};
11
use postgres_protocol::message::backend::{
12
    LogicalReplicationMessage, RelationBody, ReplicaIdentity, TupleData, UpdateBody, XLogDataBody,
13
};
14
use postgres_types::Type;
15
use std::collections::hash_map::DefaultHasher;
16
use std::collections::HashMap;
17
use std::hash::{Hash, Hasher};
18

19
struct MessageBody<'a> {
20
    message: &'a RelationBody,
21
}
22

×
23
impl<'a> MessageBody<'a> {
×
24
    pub fn new(message: &'a RelationBody) -> Self {
×
25
        Self { message }
×
26
    }
×
27
}
×
28

29
#[derive(Debug)]
×
30
pub struct Table {
31
    columns: Vec<TableColumn>,
32
    hash: u64,
33
    rel_id: u32,
34
    replica_identity: ReplicaIdentity,
35
}
×
36

37
#[derive(Debug)]
×
38
pub struct TableColumn {
39
    pub name: String,
40
    pub type_id: i32,
41
    pub flags: i8,
42
    pub r#type: Option<Type>,
43
    pub idx: usize,
44
}
45

×
46
impl Hash for MessageBody<'_> {
×
47
    fn hash<H: Hasher>(&self, state: &mut H) {
×
48
        let columns_vec: Vec<(i32, &str, i8)> = self
×
49
            .message
×
50
            .columns()
×
51
            .iter()
×
52
            .map(|column| (column.type_id(), column.name().unwrap(), column.flags()))
×
53
            .collect();
×
54

×
55
        columns_vec.hash(state);
×
56
    }
×
57
}
58

59
pub struct XlogMapper {
60
    relations_map: HashMap<u32, Table>,
61
    tables_columns: HashMap<u32, Vec<String>>,
62
}
63

×
64
impl Default for XlogMapper {
×
65
    fn default() -> Self {
×
66
        Self::new(HashMap::new())
×
67
    }
×
68
}
69

×
70
impl XlogMapper {
×
71
    pub fn new(tables_columns: HashMap<u32, Vec<String>>) -> Self {
×
72
        XlogMapper {
×
73
            relations_map: HashMap::<u32, Table>::new(),
×
74
            tables_columns,
×
75
        }
×
76
    }
×
77

×
78
    pub fn handle_message(
×
79
        &mut self,
×
80
        message: XLogDataBody<LogicalReplicationMessage>,
×
81
    ) -> Result<Option<IngestionMessage>, PostgresConnectorError> {
×
82
        match &message.data() {
×
83
            Relation(relation) => {
×
84
                let body = MessageBody::new(relation);
×
85
                let mut s = DefaultHasher::new();
×
86
                body.hash(&mut s);
×
87
                let hash = s.finish();
×
88

×
89
                let table_option = self.relations_map.get(&relation.rel_id());
×
90
                match table_option {
×
91
                    None => {
92
                        self.ingest_schema(relation, hash)?;
×
93
                    }
×
94
                    Some(table) => {
×
95
                        if table.hash != hash {
×
96
                            self.ingest_schema(relation, hash)?;
×
97
                        }
×
98
                    }
99
                }
×
100
            }
×
101
            Commit(commit) => {
×
102
                return Ok(Some(IngestionMessage::Commit(dozer_types::types::Commit {
×
103
                    seq_no: 0,
×
104
                    lsn: commit.end_lsn(),
×
105
                })));
×
106
            }
×
107
            Begin(_begin) => {
×
108
                return Ok(Some(IngestionMessage::Begin()));
×
109
            }
×
110
            Insert(insert) => {
×
111
                let table = self.relations_map.get(&insert.rel_id()).unwrap();
×
112
                let new_values = insert.tuple().tuple_data();
×
113

114
                let values = Self::convert_values_to_fields(table, new_values, false)?;
×
115

×
116
                let event = OperationEvent {
×
117
                    operation: Operation::Insert {
×
118
                        new: Record::new(
×
119
                            Some(dozer_types::types::SchemaIdentifier {
×
120
                                id: table.rel_id,
×
121
                                version: table.rel_id as u16,
×
122
                            }),
×
123
                            values,
×
124
                            None,
×
125
                        ),
×
126
                    },
×
127
                    seq_no: 0,
×
128
                };
×
129

×
130
                return Ok(Some(IngestionMessage::OperationEvent(event)));
×
131
            }
×
132
            Update(update) => {
×
133
                let table = self.relations_map.get(&update.rel_id()).unwrap();
×
134
                let new_values = update.new_tuple().tuple_data();
×
135

×
136
                let values = Self::convert_values_to_fields(table, new_values, false)?;
×
137
                let old_values = Self::convert_old_value_to_fields(table, update)?;
×
138

×
139
                let event = OperationEvent {
×
140
                    operation: Operation::Update {
×
141
                        old: Record::new(
×
142
                            Some(dozer_types::types::SchemaIdentifier {
×
143
                                id: table.rel_id,
×
144
                                version: table.rel_id as u16,
×
145
                            }),
×
146
                            old_values,
×
147
                            None,
×
148
                        ),
×
149
                        new: Record::new(
×
150
                            Some(dozer_types::types::SchemaIdentifier {
×
151
                                id: table.rel_id,
×
152
                                version: table.rel_id as u16,
×
153
                            }),
×
154
                            values,
×
155
                            None,
×
156
                        ),
×
157
                    },
×
158
                    seq_no: 0,
×
159
                };
×
160

×
161
                return Ok(Some(IngestionMessage::OperationEvent(event)));
×
162
            }
×
163
            Delete(delete) => {
×
164
                // TODO: Use only columns with .flags() = 0
×
165
                let table = self.relations_map.get(&delete.rel_id()).unwrap();
×
166
                let key_values = delete.key_tuple().unwrap().tuple_data();
×
167

168
                let values = Self::convert_values_to_fields(table, key_values, true)?;
×
169

×
170
                let event = OperationEvent {
×
171
                    operation: Operation::Delete {
×
172
                        old: Record::new(
×
173
                            Some(dozer_types::types::SchemaIdentifier {
×
174
                                id: table.rel_id,
×
175
                                version: table.rel_id as u16,
×
176
                            }),
×
177
                            values,
×
178
                            None,
×
179
                        ),
×
180
                    },
×
181
                    seq_no: 0,
×
182
                };
×
183

×
184
                return Ok(Some(IngestionMessage::OperationEvent(event)));
×
185
            }
186
            _ => {}
×
187
        }
×
188

×
189
        Ok(None)
×
190
    }
×
191

×
192
    fn ingest_schema(
×
193
        &mut self,
×
194
        relation: &RelationBody,
×
195
        hash: u64,
×
196
    ) -> Result<(), PostgresConnectorError> {
×
197
        let rel_id = relation.rel_id();
×
198
        let existing_columns = self
×
199
            .tables_columns
×
200
            .get(&rel_id)
×
201
            .map_or(vec![], |t| t.clone());
×
202

×
203
        let columns: Vec<TableColumn> = relation
×
204
            .columns()
×
205
            .iter()
×
206
            .enumerate()
×
207
            .filter(|(_, column)| {
×
208
                existing_columns.is_empty()
×
209
                    || existing_columns.contains(&column.name().unwrap().to_string())
×
210
            })
×
211
            .map(|(idx, column)| TableColumn {
×
212
                name: String::from(column.name().unwrap()),
×
213
                type_id: column.type_id(),
×
214
                flags: column.flags(),
×
215
                r#type: Type::from_oid(column.type_id() as u32),
×
216
                idx,
×
217
            })
×
218
            .collect();
×
219

×
220
        let _table_name = relation
×
221
            .name()
×
222
            .map_err(PostgresConnectorError::RelationNotFound)?
×
223
            .to_string();
×
224

×
225
        let replica_identity = match relation.replica_identity() {
×
226
            ReplicaIdentity::Default => ReplicaIdentity::Default,
×
227
            ReplicaIdentity::Nothing => ReplicaIdentity::Nothing,
×
228
            ReplicaIdentity::Full => ReplicaIdentity::Full,
×
229
            ReplicaIdentity::Index => ReplicaIdentity::Index,
×
230
        };
×
231

×
232
        let table = Table {
×
233
            columns,
×
234
            hash,
×
235
            rel_id,
×
236
            replica_identity,
×
237
        };
×
238

×
239
        let mut fields = vec![];
×
240
        for c in &table.columns {
×
241
            let typ = c.r#type.clone();
×
242
            let typ = typ
×
243
                .map_or(
×
244
                    Err(PostgresSchemaError::InvalidColumnType),
×
245
                    postgres_type_to_dozer_type,
×
246
                )
×
247
                .map_err(PostgresConnectorError::PostgresSchemaError)?;
×
248

×
249
            fields.push(FieldDefinition {
×
250
                name: c.name.clone(),
×
251
                typ,
×
252
                nullable: true,
×
253
                source: SourceDefinition::Dynamic,
×
254
            });
×
255
        }
×
256

×
257
        let _schema = Schema {
×
258
            identifier: Some(dozer_types::types::SchemaIdentifier {
×
259
                id: table.rel_id,
×
260
                version: table.rel_id as u16,
×
261
            }),
×
262
            fields,
×
263
            primary_index: vec![0],
×
264
        };
×
265

×
266
        self.relations_map.insert(rel_id, table);
×
267

×
268
        Ok(())
×
269
    }
×
270

×
271
    fn convert_values_to_fields(
×
272
        table: &Table,
×
273
        new_values: &[TupleData],
×
274
        only_key: bool,
×
275
    ) -> Result<Vec<Field>, PostgresConnectorError> {
×
276
        let mut values: Vec<Field> = vec![];
×
277

×
278
        for column in &table.columns {
×
279
            if column.flags == 1 || !only_key {
×
280
                let value = new_values.get(column.idx).unwrap();
×
281
                match value {
×
282
                    TupleData::Null => values.push(
283
                        helper::postgres_type_to_field(None, column)
×
284
                            .map_err(PostgresConnectorError::PostgresSchemaError)?,
×
285
                    ),
×
286
                    TupleData::UnchangedToast => {}
×
287
                    TupleData::Text(text) => values.push(
×
288
                        helper::postgres_type_to_field(Some(text), column)
×
289
                            .map_err(PostgresConnectorError::PostgresSchemaError)?,
×
290
                    ),
×
291
                }
×
292
            } else {
×
293
                values.push(Field::Null);
×
294
            }
×
295
        }
×
296

297
        Ok(values)
×
298
    }
×
299

×
300
    fn convert_old_value_to_fields(
×
301
        table: &Table,
×
302
        update: &UpdateBody,
×
303
    ) -> Result<Vec<Field>, PostgresConnectorError> {
×
304
        match table.replica_identity {
×
305
            ReplicaIdentity::Default | ReplicaIdentity::Full | ReplicaIdentity::Index => {
×
306
                update.key_tuple().map_or_else(
×
307
                    || Self::convert_values_to_fields(table, update.new_tuple().tuple_data(), true),
×
308
                    |key_tuple| Self::convert_values_to_fields(table, key_tuple.tuple_data(), true),
×
309
                )
×
310
            }
×
311
            ReplicaIdentity::Nothing => Ok(vec![]),
×
312
        }
313
    }
×
314
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc