• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4382731818

pending completion
4382731818

push

github

GitHub
fix: Fix publication slot creation (#1202)

60 of 60 new or added lines in 24 files covered. (100.0%)

27881 of 39907 relevant lines covered (69.86%)

58951.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.01
/dozer-ingestion/src/connectors/postgres/xlog_mapper.rs
1
use crate::connectors::postgres::helper;
2
use crate::connectors::ColumnInfo;
3
use crate::errors::{PostgresConnectorError, PostgresSchemaError};
4
use dozer_types::node::OpIdentifier;
5
use dozer_types::types::{Field, FieldDefinition, Operation, Record, SourceDefinition};
6
use helper::postgres_type_to_dozer_type;
7
use postgres_protocol::message::backend::LogicalReplicationMessage::{
8
    Begin, Commit, Delete, Insert, Relation, Update,
9
};
10
use postgres_protocol::message::backend::{
11
    LogicalReplicationMessage, RelationBody, ReplicaIdentity, TupleData, UpdateBody, XLogDataBody,
12
};
13
use postgres_types::Type;
14
use std::collections::hash_map::DefaultHasher;
15
use std::collections::HashMap;
16
use std::hash::{Hash, Hasher};
17

18
struct MessageBody<'a> {
19
    message: &'a RelationBody,
20
}
21

22
impl<'a> MessageBody<'a> {
23
    pub fn new(message: &'a RelationBody) -> Self {
2✔
24
        Self { message }
2✔
25
    }
2✔
26
}
27

28
#[derive(Debug)]
×
29
pub struct Table {
30
    columns: Vec<TableColumn>,
31
    hash: u64,
32
    rel_id: u32,
33
    replica_identity: ReplicaIdentity,
34
}
35

36
#[derive(Debug)]
×
37
pub struct TableColumn {
38
    pub name: String,
39
    pub type_id: i32,
40
    pub flags: i8,
41
    pub r#type: Option<Type>,
42
    pub idx: usize,
43
}
44

45
impl Hash for MessageBody<'_> {
46
    fn hash<H: Hasher>(&self, state: &mut H) {
2✔
47
        let columns_vec: Vec<(i32, &str, i8)> = self
2✔
48
            .message
2✔
49
            .columns()
2✔
50
            .iter()
2✔
51
            .map(|column| (column.type_id(), column.name().unwrap(), column.flags()))
8✔
52
            .collect();
2✔
53

2✔
54
        columns_vec.hash(state);
2✔
55
    }
2✔
56
}
57

58
#[derive(Debug, Clone)]
×
59
pub enum MappedReplicationMessage {
60
    Begin,
61
    Commit(OpIdentifier),
62
    Operation(Operation),
63
}
64

65
pub struct XlogMapper {
66
    relations_map: HashMap<u32, Table>,
67
    tables_columns: HashMap<u32, Vec<ColumnInfo>>,
68
}
69

70
impl Default for XlogMapper {
71
    fn default() -> Self {
×
72
        Self::new(HashMap::new())
×
73
    }
×
74
}
75

76
impl XlogMapper {
77
    pub fn new(tables_columns: HashMap<u32, Vec<ColumnInfo>>) -> Self {
1✔
78
        XlogMapper {
1✔
79
            relations_map: HashMap::<u32, Table>::new(),
1✔
80
            tables_columns,
1✔
81
        }
1✔
82
    }
1✔
83

84
    pub fn handle_message(
17✔
85
        &mut self,
17✔
86
        message: XLogDataBody<LogicalReplicationMessage>,
17✔
87
    ) -> Result<Option<MappedReplicationMessage>, PostgresConnectorError> {
17✔
88
        match &message.data() {
17✔
89
            Relation(relation) => {
2✔
90
                let body = MessageBody::new(relation);
2✔
91
                let mut s = DefaultHasher::new();
2✔
92
                body.hash(&mut s);
2✔
93
                let hash = s.finish();
2✔
94

2✔
95
                let table_option = self.relations_map.get(&relation.rel_id());
2✔
96
                match table_option {
2✔
97
                    None => {
98
                        self.ingest_schema(relation, hash)?;
1✔
99
                    }
100
                    Some(table) => {
1✔
101
                        if table.hash != hash {
1✔
102
                            self.ingest_schema(relation, hash)?;
×
103
                        }
1✔
104
                    }
105
                }
106
            }
107
            Commit(commit) => {
4✔
108
                return Ok(Some(MappedReplicationMessage::Commit(OpIdentifier::new(
4✔
109
                    commit.end_lsn(),
4✔
110
                    0,
4✔
111
                ))));
4✔
112
            }
113
            Begin(_begin) => {
4✔
114
                return Ok(Some(MappedReplicationMessage::Begin));
4✔
115
            }
116
            Insert(insert) => {
7✔
117
                let table = self.relations_map.get(&insert.rel_id()).unwrap();
7✔
118
                let new_values = insert.tuple().tuple_data();
7✔
119

120
                let values = Self::convert_values_to_fields(table, new_values, false)?;
7✔
121

122
                let event = Operation::Insert {
7✔
123
                    new: Record::new(
7✔
124
                        Some(dozer_types::types::SchemaIdentifier {
7✔
125
                            id: table.rel_id,
7✔
126
                            version: 0,
7✔
127
                        }),
7✔
128
                        values,
7✔
129
                        None,
7✔
130
                    ),
7✔
131
                };
7✔
132

7✔
133
                return Ok(Some(MappedReplicationMessage::Operation(event)));
7✔
134
            }
135
            Update(update) => {
×
136
                let table = self.relations_map.get(&update.rel_id()).unwrap();
×
137
                let new_values = update.new_tuple().tuple_data();
×
138

139
                let values = Self::convert_values_to_fields(table, new_values, false)?;
×
140
                let old_values = Self::convert_old_value_to_fields(table, update)?;
×
141

142
                let event = Operation::Update {
×
143
                    old: Record::new(
×
144
                        Some(dozer_types::types::SchemaIdentifier {
×
145
                            id: table.rel_id,
×
146
                            version: 0,
×
147
                        }),
×
148
                        old_values,
×
149
                        None,
×
150
                    ),
×
151
                    new: Record::new(
×
152
                        Some(dozer_types::types::SchemaIdentifier {
×
153
                            id: table.rel_id,
×
154
                            version: 0,
×
155
                        }),
×
156
                        values,
×
157
                        None,
×
158
                    ),
×
159
                };
×
160

×
161
                return Ok(Some(MappedReplicationMessage::Operation(event)));
×
162
            }
163
            Delete(delete) => {
×
164
                // TODO: Use only columns with .flags() = 0
×
165
                let table = self.relations_map.get(&delete.rel_id()).unwrap();
×
166
                let key_values = delete.key_tuple().unwrap().tuple_data();
×
167

168
                let values = Self::convert_values_to_fields(table, key_values, true)?;
×
169

170
                let event = Operation::Delete {
×
171
                    old: Record::new(
×
172
                        Some(dozer_types::types::SchemaIdentifier {
×
173
                            id: table.rel_id,
×
174
                            version: 0,
×
175
                        }),
×
176
                        values,
×
177
                        None,
×
178
                    ),
×
179
                };
×
180

×
181
                return Ok(Some(MappedReplicationMessage::Operation(event)));
×
182
            }
183
            _ => {}
×
184
        }
185

186
        Ok(None)
2✔
187
    }
17✔
188

189
    fn ingest_schema(
1✔
190
        &mut self,
1✔
191
        relation: &RelationBody,
1✔
192
        hash: u64,
1✔
193
    ) -> Result<(), PostgresConnectorError> {
1✔
194
        let rel_id = relation.rel_id();
1✔
195
        let existing_columns = self
1✔
196
            .tables_columns
1✔
197
            .get(&rel_id)
1✔
198
            .map_or(vec![], |t| t.clone());
1✔
199

1✔
200
        let columns: Vec<TableColumn> = relation
1✔
201
            .columns()
1✔
202
            .iter()
1✔
203
            .enumerate()
1✔
204
            .filter(|(_, column)| {
1✔
205
                existing_columns.is_empty()
4✔
206
                    || existing_columns
4✔
207
                        .iter()
4✔
208
                        .any(|c| c.name == *column.name().unwrap())
10✔
209
            })
4✔
210
            .map(|(idx, column)| TableColumn {
4✔
211
                name: String::from(column.name().unwrap()),
4✔
212
                type_id: column.type_id(),
4✔
213
                flags: column.flags(),
4✔
214
                r#type: Type::from_oid(column.type_id() as u32),
4✔
215
                idx,
4✔
216
            })
4✔
217
            .collect();
1✔
218

219
        let replica_identity = match relation.replica_identity() {
1✔
220
            ReplicaIdentity::Default => ReplicaIdentity::Default,
1✔
221
            ReplicaIdentity::Nothing => ReplicaIdentity::Nothing,
×
222
            ReplicaIdentity::Full => ReplicaIdentity::Full,
×
223
            ReplicaIdentity::Index => ReplicaIdentity::Index,
×
224
        };
225

226
        let table = Table {
1✔
227
            columns,
1✔
228
            hash,
1✔
229
            rel_id,
1✔
230
            replica_identity,
1✔
231
        };
1✔
232

1✔
233
        let mut fields = vec![];
1✔
234
        for c in &table.columns {
5✔
235
            let typ = c.r#type.clone();
4✔
236
            let typ = typ
4✔
237
                .map_or(
4✔
238
                    Err(PostgresSchemaError::InvalidColumnType),
4✔
239
                    postgres_type_to_dozer_type,
4✔
240
                )
4✔
241
                .map_err(PostgresConnectorError::PostgresSchemaError)?;
4✔
242

243
            fields.push(FieldDefinition {
4✔
244
                name: c.name.clone(),
4✔
245
                typ,
4✔
246
                nullable: true,
4✔
247
                source: SourceDefinition::Dynamic,
4✔
248
            });
4✔
249
        }
250

251
        self.relations_map.insert(rel_id, table);
1✔
252

1✔
253
        Ok(())
1✔
254
    }
1✔
255

×
256
    fn convert_values_to_fields(
7✔
257
        table: &Table,
7✔
258
        new_values: &[TupleData],
7✔
259
        only_key: bool,
7✔
260
    ) -> Result<Vec<Field>, PostgresConnectorError> {
7✔
261
        let mut values: Vec<Field> = vec![];
7✔
262

×
263
        for column in &table.columns {
35✔
264
            if column.flags == 1 || !only_key {
28✔
265
                let value = new_values.get(column.idx).unwrap();
28✔
266
                match value {
28✔
267
                    TupleData::Null => values.push(
×
268
                        helper::postgres_type_to_field(None, column)
×
269
                            .map_err(PostgresConnectorError::PostgresSchemaError)?,
×
270
                    ),
×
271
                    TupleData::UnchangedToast => {}
×
272
                    TupleData::Text(text) => values.push(
28✔
273
                        helper::postgres_type_to_field(Some(text), column)
28✔
274
                            .map_err(PostgresConnectorError::PostgresSchemaError)?,
28✔
275
                    ),
×
276
                }
277
            } else {
×
278
                values.push(Field::Null);
×
279
            }
×
280
        }
×
281

×
282
        Ok(values)
7✔
283
    }
7✔
284

285
    fn convert_old_value_to_fields(
×
286
        table: &Table,
×
287
        update: &UpdateBody,
×
288
    ) -> Result<Vec<Field>, PostgresConnectorError> {
×
289
        match table.replica_identity {
×
290
            ReplicaIdentity::Default | ReplicaIdentity::Full | ReplicaIdentity::Index => {
291
                update.key_tuple().map_or_else(
×
292
                    || Self::convert_values_to_fields(table, update.new_tuple().tuple_data(), true),
×
293
                    |key_tuple| Self::convert_values_to_fields(table, key_tuple.tuple_data(), true),
×
294
                )
×
295
            }
×
296
            ReplicaIdentity::Nothing => Ok(vec![]),
×
297
        }
×
298
    }
×
299
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc