• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5709656380

pending completion
5709656380

push

github

web-flow
Version bump (#1808)

45512 of 59772 relevant lines covered (76.14%)

39312.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.93
/dozer-ingestion/src/connectors/postgres/helper.rs
1
use crate::connectors::postgres::xlog_mapper::TableColumn;
2
use crate::errors::PostgresSchemaError::{
3
    ColumnTypeNotSupported, CustomTypeNotSupported, PointParseError, StringParseError,
4
    ValueConversionError,
5
};
6
use crate::errors::{ConnectorError, PostgresSchemaError};
7
use dozer_types::bytes::Bytes;
8
use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, Offset, Utc};
9
use dozer_types::errors::types::TypeError;
10
use dozer_types::geo::Point as GeoPoint;
11
use dozer_types::json_types::{serde_json_to_json_value, JsonValue};
12
use dozer_types::ordered_float::OrderedFloat;
13
use dozer_types::{rust_decimal, serde_json, types::*};
14
use postgres_types::{Type, WasNull};
15
use rust_decimal::prelude::FromPrimitive;
16
use rust_decimal::Decimal;
17
use std::error::Error;
18
use std::vec;
19
use tokio_postgres::{Column, Row};
20
use uuid::Uuid;
21

22
pub fn postgres_type_to_field(
18✔
23
    value: Option<&Bytes>,
18✔
24
    column: &TableColumn,
18✔
25
) -> Result<Field, PostgresSchemaError> {
18✔
26
    let column_type = column.r#type.clone();
18✔
27
    value.map_or(Ok(Field::Null), |v| match column_type {
18✔
28
        Type::INT2 | Type::INT4 | Type::INT8 => Ok(Field::Int(
1✔
29
            String::from_utf8(v.to_vec()).unwrap().parse().unwrap(),
1✔
30
        )),
1✔
31
        Type::FLOAT4 | Type::FLOAT8 => Ok(Field::Float(OrderedFloat(
1✔
32
            String::from_utf8(v.to_vec())
1✔
33
                .unwrap()
1✔
34
                .parse::<f64>()
1✔
35
                .unwrap(),
1✔
36
        ))),
1✔
37
        Type::TEXT | Type::VARCHAR | Type::CHAR | Type::BPCHAR | Type::ANYENUM => {
38
            Ok(Field::String(String::from_utf8(v.to_vec()).unwrap()))
2✔
39
        }
40
        Type::UUID => Ok(Field::String(String::from_utf8(v.to_vec()).unwrap())),
1✔
41
        Type::BYTEA => Ok(Field::Binary(v.to_vec())),
1✔
42
        Type::NUMERIC => Ok(Field::Decimal(
1✔
43
            Decimal::from_f64(
1✔
44
                String::from_utf8(v.to_vec())
1✔
45
                    .unwrap()
1✔
46
                    .parse::<f64>()
1✔
47
                    .unwrap(),
1✔
48
            )
1✔
49
            .unwrap(),
1✔
50
        )),
1✔
51
        Type::TIMESTAMP => {
52
            let date_string = String::from_utf8(v.to_vec())?;
2✔
53
            let format = if date_string.len() == 19 {
2✔
54
                "%Y-%m-%d %H:%M:%S"
1✔
55
            } else {
56
                "%Y-%m-%d %H:%M:%S%.f"
1✔
57
            };
58
            let date = NaiveDateTime::parse_from_str(date_string.as_str(), format)?;
2✔
59
            Ok(Field::Timestamp(DateTime::from_utc(date, Utc.fix())))
2✔
60
        }
61
        Type::TIMESTAMPTZ => {
62
            let date: DateTime<FixedOffset> = DateTime::parse_from_str(
1✔
63
                String::from_utf8(v.to_vec()).unwrap().as_str(),
1✔
64
                "%Y-%m-%d %H:%M:%S%.f%#z",
1✔
65
            )
1✔
66
            .unwrap();
1✔
67
            Ok(Field::Timestamp(date))
1✔
68
        }
69
        Type::DATE => {
70
            let date: NaiveDate = NaiveDate::parse_from_str(
×
71
                String::from_utf8(v.to_vec()).unwrap().as_str(),
×
72
                DATE_FORMAT,
×
73
            )
×
74
            .unwrap();
×
75
            Ok(Field::from(date))
×
76
        }
77
        Type::JSONB | Type::JSON => {
78
            let val: serde_json::Value = serde_json::from_slice(v).map_err(|_| {
2✔
79
                PostgresSchemaError::JSONBParseError(format!(
×
80
                    "Error converting to a single row for: {}",
×
81
                    column_type.name()
×
82
                ))
×
83
            })?;
2✔
84
            let json: JsonValue = serde_json_to_json_value(val)
2✔
85
                .map_err(|e| PostgresSchemaError::TypeError(TypeError::DeserializationError(e)))?;
2✔
86
            Ok(Field::Json(json))
2✔
87
        }
88
        Type::JSONB_ARRAY | Type::JSON_ARRAY => {
89
            let val: Vec<serde_json::Value> = serde_json::from_slice(v).map_err(|_| {
2✔
90
                PostgresSchemaError::JSONBParseError(format!(
×
91
                    "Error converting to a single row for: {}",
×
92
                    column_type.name()
×
93
                ))
×
94
            })?;
2✔
95
            let mut lst = vec![];
2✔
96
            for v in val {
4✔
97
                lst.push(serde_json_to_json_value(v).map_err(|e| {
2✔
98
                    PostgresSchemaError::TypeError(TypeError::DeserializationError(e))
×
99
                })?);
2✔
100
            }
101
            Ok(Field::Json(JsonValue::Array(lst)))
2✔
102
        }
103
        Type::BOOL => Ok(Field::Boolean(v.slice(0..1) == "t")),
2✔
104
        Type::POINT => Ok(Field::Point(
105
            String::from_utf8(v.to_vec())
1✔
106
                .map_err(StringParseError)?
1✔
107
                .parse::<DozerPoint>()
1✔
108
                .map_err(|_| PointParseError)?,
1✔
109
        )),
110
        _ => Err(ColumnTypeNotSupported(column_type.name().to_string())),
×
111
    })
18✔
112
}
18✔
113

114
pub fn postgres_type_to_dozer_type(column_type: Type) -> Result<FieldType, PostgresSchemaError> {
379✔
115
    match column_type {
379✔
116
        Type::BOOL => Ok(FieldType::Boolean),
17✔
117
        Type::INT2 | Type::INT4 | Type::INT8 => Ok(FieldType::Int),
71✔
118
        Type::CHAR | Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::UUID | Type::ANYENUM => {
119
            Ok(FieldType::String)
87✔
120
        }
121
        Type::FLOAT4 | Type::FLOAT8 => Ok(FieldType::Float),
35✔
122
        Type::BYTEA => Ok(FieldType::Binary),
17✔
123
        Type::TIMESTAMP | Type::TIMESTAMPTZ => Ok(FieldType::Timestamp),
34✔
124
        Type::NUMERIC => Ok(FieldType::Decimal),
17✔
125
        Type::JSONB
126
        | Type::JSON
127
        | Type::JSONB_ARRAY
128
        | Type::JSON_ARRAY
129
        | Type::TEXT_ARRAY
130
        | Type::CHAR_ARRAY
131
        | Type::VARCHAR_ARRAY
132
        | Type::BPCHAR_ARRAY => Ok(FieldType::Json),
68✔
133
        Type::DATE => Ok(FieldType::Date),
16✔
134
        Type::POINT => Ok(FieldType::Point),
17✔
135
        _ => Err(ColumnTypeNotSupported(column_type.name().to_string())),
×
136
    }
137
}
379✔
138

139
fn handle_error(e: tokio_postgres::error::Error) -> Result<Field, PostgresSchemaError> {
140
    if let Some(e) = e.source() {
84✔
141
        if let Some(_e) = e.downcast_ref::<WasNull>() {
84✔
142
            Ok(Field::Null)
84✔
143
        } else {
144
            Err(ValueConversionError(e.to_string()))
×
145
        }
146
    } else {
147
        Err(ValueConversionError(e.to_string()))
×
148
    }
149
}
84✔
150

151
macro_rules! convert_row_value_to_field {
152
    ($a:ident, $b:ident, $c:ty) => {{
153
        let value: Result<$c, _> = $a.try_get($b);
154
        value.map_or_else(handle_error, |val| Ok(Field::from(val)))
155
    }};
156
}
157

158
pub fn value_to_field(
352✔
159
    row: &Row,
352✔
160
    idx: usize,
352✔
161
    col_type: &Type,
352✔
162
) -> Result<Field, PostgresSchemaError> {
352✔
163
    match col_type {
352✔
164
        &Type::BOOL => convert_row_value_to_field!(row, idx, bool),
28✔
165
        &Type::INT2 => convert_row_value_to_field!(row, idx, i16),
28✔
166
        &Type::INT4 => convert_row_value_to_field!(row, idx, i32),
32✔
167
        &Type::INT8 => convert_row_value_to_field!(row, idx, i64),
44✔
168
        &Type::CHAR | &Type::TEXT | &Type::VARCHAR | &Type::BPCHAR | &Type::ANYENUM => {
169
            convert_row_value_to_field!(row, idx, String)
120✔
170
        }
171
        &Type::FLOAT4 => convert_row_value_to_field!(row, idx, f32),
28✔
172
        &Type::FLOAT8 => convert_row_value_to_field!(row, idx, f64),
32✔
173
        &Type::TIMESTAMP => convert_row_value_to_field!(row, idx, NaiveDateTime),
28✔
174
        &Type::TIMESTAMPTZ => convert_row_value_to_field!(row, idx, DateTime<FixedOffset>),
28✔
175
        &Type::NUMERIC => convert_row_value_to_field!(row, idx, Decimal),
28✔
176
        &Type::DATE => convert_row_value_to_field!(row, idx, NaiveDate),
28✔
177
        &Type::BYTEA => {
178
            let value: Result<Vec<u8>, _> = row.try_get(idx);
16✔
179
            value.map_or_else(handle_error, |v| Ok(Field::Binary(v)))
16✔
180
        }
181
        &Type::JSONB | &Type::JSON => {
182
            let value: Result<serde_json::Value, _> = row.try_get(idx);
32✔
183
            value.map_or_else(handle_error, |val| {
32✔
184
                Ok(Field::Json(serde_json_to_json_value(val).map_err(|e| {
24✔
185
                    PostgresSchemaError::TypeError(TypeError::DeserializationError(e))
×
186
                })?))
24✔
187
            })
32✔
188
        }
189
        &Type::JSONB_ARRAY | &Type::JSON_ARRAY => {
190
            let value: Result<Vec<serde_json::Value>, _> = row.try_get(idx);
32✔
191
            value.map_or_else(handle_error, |val| {
32✔
192
                let mut lst = vec![];
24✔
193
                for v in val {
32✔
194
                    lst.push(serde_json_to_json_value(v).map_err(|e| {
8✔
195
                        PostgresSchemaError::TypeError(TypeError::DeserializationError(e))
×
196
                    })?);
8✔
197
                }
198
                Ok(Field::Json(JsonValue::Array(lst)))
24✔
199
            })
32✔
200
        }
201
        &Type::CHAR_ARRAY | &Type::TEXT_ARRAY | &Type::VARCHAR_ARRAY | &Type::BPCHAR_ARRAY => {
202
            let value: Result<Vec<String>, _> = row.try_get(idx);
×
203
            value.map_or_else(handle_error, |val| {
×
204
                let mut lst = vec![];
×
205
                for v in val {
×
206
                    lst.push(JsonValue::String(v));
×
207
                }
×
208
                Ok(Field::Json(JsonValue::Array(lst)))
×
209
            })
×
210
        }
211
        &Type::POINT => convert_row_value_to_field!(row, idx, GeoPoint),
28✔
212
        // &Type::UUID => convert_row_value_to_field!(row, idx, Uuid),
213
        &Type::UUID => {
214
            let value: Result<Uuid, _> = row.try_get(idx);
16✔
215
            value.map_or_else(handle_error, |val| Ok(Field::from(val.to_string())))
16✔
216
        }
217
        _ => {
218
            if col_type.schema() == "pg_catalog" {
×
219
                Err(ColumnTypeNotSupported(col_type.name().to_string()))
×
220
            } else {
221
                Err(CustomTypeNotSupported(col_type.name().to_string()))
×
222
            }
223
        }
224
    }
225
}
352✔
226

227
pub fn get_values(row: &Row, columns: &[Column]) -> Result<Vec<Field>, PostgresSchemaError> {
18✔
228
    let mut values: Vec<Field> = vec![];
18✔
229
    for (idx, col) in columns.iter().enumerate() {
352✔
230
        let val = value_to_field(row, idx, col.type_());
352✔
231
        match val {
352✔
232
            Ok(val) => values.push(val),
352✔
233
            Err(e) => return Err(e),
×
234
        };
235
    }
236
    Ok(values)
18✔
237
}
18✔
238

239
pub fn map_row_to_operation_event(
18✔
240
    row: &Row,
18✔
241
    columns: &[Column],
18✔
242
) -> Result<Operation, PostgresSchemaError> {
18✔
243
    match get_values(row, columns) {
18✔
244
        Ok(values) => Ok(Operation::Insert {
18✔
245
            new: Record::new(values),
18✔
246
        }),
18✔
247
        Err(e) => Err(e),
×
248
    }
249
}
18✔
250

251
pub fn map_schema(columns: &[Column]) -> Result<Schema, ConnectorError> {
×
252
    let field_defs: Result<Vec<FieldDefinition>, _> =
×
253
        columns.iter().map(convert_column_to_field).collect();
×
254

×
255
    Ok(Schema {
×
256
        fields: field_defs.unwrap(),
×
257
        primary_index: vec![0],
×
258
    })
×
259
}
×
260

261
pub fn convert_column_to_field(column: &Column) -> Result<FieldDefinition, PostgresSchemaError> {
×
262
    postgres_type_to_dozer_type(column.type_().clone()).map(|typ| FieldDefinition {
×
263
        name: column.name().to_string(),
×
264
        typ,
×
265
        nullable: true,
×
266
        source: SourceDefinition::Dynamic,
×
267
    })
×
268
}
×
269

270
#[cfg(test)]
271
mod tests {
272
    use super::*;
273
    use dozer_types::chrono::NaiveDate;
274
    use std::collections::BTreeMap;
275

276
    #[macro_export]
277
    macro_rules! test_conversion {
278
        ($a:expr,$b:expr,$c:expr) => {
279
            let value = postgres_type_to_field(
280
                Some(&Bytes::from($a)),
281
                &TableColumn {
282
                    name: "column".to_string(),
283
                    flags: 0,
284
                    r#type: $b,
285
                    column_index: 0,
286
                },
287
            );
288
            assert_eq!(value.unwrap(), $c);
289
        };
290
    }
291

292
    #[macro_export]
293
    macro_rules! test_type_mapping {
294
        ($a:expr,$b:expr) => {
295
            let value = postgres_type_to_dozer_type($a);
296
            assert_eq!(value.unwrap(), $b);
297
        };
298
    }
299

300
    #[test]
1✔
301
    fn it_converts_postgres_type_to_field() {
1✔
302
        test_conversion!("12", Type::INT8, Field::Int(12));
1✔
303
        test_conversion!("4.7809", Type::FLOAT8, Field::Float(OrderedFloat(4.7809)));
1✔
304
        let value = String::from("Test text");
1✔
305
        test_conversion!("Test text", Type::TEXT, Field::String(value.clone()));
1✔
306
        test_conversion!("Test text", Type::ANYENUM, Field::String(value));
1✔
307

×
308
        let value = String::from("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11");
1✔
309
        test_conversion!(
1✔
310
            "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
1✔
311
            Type::UUID,
1✔
312
            Field::String(value)
1✔
313
        );
1✔
314

315
        // UTF-8 bytes representation of json (https://www.charset.org/utf-8)
×
316
        let value: Vec<u8> = vec![98, 121, 116, 101, 97];
1✔
317
        test_conversion!("bytea", Type::BYTEA, Field::Binary(value));
1✔
318

×
319
        let value = rust_decimal::Decimal::from_f64(8.28).unwrap();
1✔
320
        test_conversion!("8.28", Type::NUMERIC, Field::Decimal(value));
1✔
321

×
322
        let value = DateTime::from_utc(
1✔
323
            NaiveDate::from_ymd_opt(2022, 9, 16)
1✔
324
                .unwrap()
1✔
325
                .and_hms_opt(5, 56, 29)
1✔
326
                .unwrap(),
1✔
327
            Utc.fix(),
1✔
328
        );
1✔
329
        test_conversion!(
1✔
330
            "2022-09-16 05:56:29",
1✔
331
            Type::TIMESTAMP,
1✔
332
            Field::Timestamp(value)
1✔
333
        );
1✔
334

×
335
        let value = DateTime::from_utc(
1✔
336
            NaiveDate::from_ymd_opt(2022, 9, 16)
1✔
337
                .unwrap()
1✔
338
                .and_hms_milli_opt(7, 59, 29, 321)
1✔
339
                .unwrap(),
1✔
340
            Utc.fix(),
1✔
341
        );
1✔
342
        test_conversion!(
1✔
343
            "2022-09-16 07:59:29.321",
1✔
344
            Type::TIMESTAMP,
1✔
345
            Field::Timestamp(value)
1✔
346
        );
1✔
347

×
348
        let value = DateTime::from_utc(
1✔
349
            NaiveDate::from_ymd_opt(2022, 9, 16)
1✔
350
                .unwrap()
1✔
351
                .and_hms_micro_opt(3, 56, 30, 959787)
1✔
352
                .unwrap(),
1✔
353
            Utc.fix(),
1✔
354
        );
1✔
355
        test_conversion!(
1✔
356
            "2022-09-16 10:56:30.959787+07",
1✔
357
            Type::TIMESTAMPTZ,
1✔
358
            Field::Timestamp(value)
1✔
359
        );
1✔
360

×
361
        let value = JsonValue::Object(BTreeMap::from([(
1✔
362
            String::from("abc"),
1✔
363
            JsonValue::String(String::from("foo")),
1✔
364
        )]));
1✔
365
        test_conversion!("{\"abc\":\"foo\"}", Type::JSONB, Field::Json(value.clone()));
1✔
366
        test_conversion!("{\"abc\":\"foo\"}", Type::JSON, Field::Json(value));
1✔
367

×
368
        let value = JsonValue::Array(vec![JsonValue::Object(BTreeMap::from([(
1✔
369
            String::from("abc"),
1✔
370
            JsonValue::String(String::from("foo")),
1✔
371
        )]))]);
1✔
372
        test_conversion!(
1✔
373
            "[{\"abc\":\"foo\"}]",
1✔
374
            Type::JSON_ARRAY,
1✔
375
            Field::Json(value.clone())
1✔
376
        );
1✔
377
        test_conversion!("[{\"abc\":\"foo\"}]", Type::JSONB_ARRAY, Field::Json(value));
1✔
378

×
379
        test_conversion!("t", Type::BOOL, Field::Boolean(true));
1✔
380
        test_conversion!("f", Type::BOOL, Field::Boolean(false));
1✔
381

×
382
        test_conversion!(
1✔
383
            "(1.234,2.456)",
1✔
384
            Type::POINT,
1✔
385
            Field::Point(DozerPoint::from((1.234, 2.456)))
1✔
386
        );
1✔
387
    }
1✔
388

×
389
    #[test]
1✔
390
    fn it_maps_postgres_type_to_dozer_type() {
1✔
391
        test_type_mapping!(Type::INT8, FieldType::Int);
1✔
392
        test_type_mapping!(Type::FLOAT8, FieldType::Float);
1✔
393
        test_type_mapping!(Type::VARCHAR, FieldType::String);
1✔
394
        test_type_mapping!(Type::ANYENUM, FieldType::String);
1✔
395
        test_type_mapping!(Type::UUID, FieldType::String);
1✔
396
        test_type_mapping!(Type::BYTEA, FieldType::Binary);
1✔
397
        test_type_mapping!(Type::NUMERIC, FieldType::Decimal);
1✔
398
        test_type_mapping!(Type::TIMESTAMP, FieldType::Timestamp);
1✔
399
        test_type_mapping!(Type::TIMESTAMPTZ, FieldType::Timestamp);
1✔
400
        test_type_mapping!(Type::JSONB, FieldType::Json);
1✔
401
        test_type_mapping!(Type::JSON, FieldType::Json);
1✔
402
        test_type_mapping!(Type::JSONB_ARRAY, FieldType::Json);
1✔
403
        test_type_mapping!(Type::JSON_ARRAY, FieldType::Json);
1✔
404
        test_type_mapping!(Type::BOOL, FieldType::Boolean);
1✔
405
        test_type_mapping!(Type::POINT, FieldType::Point);
1✔
406
    }
1✔
407

×
408
    #[test]
1✔
409
    fn test_none_value() {
1✔
410
        let value = postgres_type_to_field(
1✔
411
            None,
1✔
412
            &TableColumn {
1✔
413
                name: "column".to_string(),
1✔
414
                flags: 0,
1✔
415
                r#type: Type::VARCHAR,
1✔
416
                column_index: 0,
1✔
417
            },
1✔
418
        );
1✔
419
        assert_eq!(value.unwrap(), Field::Null);
1✔
420
    }
1✔
421
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc