• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3965884680

pending completion
3965884680

push

github

GitHub
feat: support bpchar column type (#687)

1 of 1 new or added line in 1 file covered. (100.0%)

22001 of 32789 relevant lines covered (67.1%)

36262.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.21
/dozer-ingestion/src/connectors/postgres/helper.rs
1
use crate::connectors::postgres::xlog_mapper::TableColumn;
2
use crate::errors::PostgresSchemaError::{
3
    ColumnTypeNotFound, ColumnTypeNotSupported, CustomTypeNotSupported, ValueConversionError,
4
};
5
use crate::errors::{ConnectorError, PostgresSchemaError};
6
use dozer_types::bytes::Bytes;
7
use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, Offset, Utc};
8
use dozer_types::ordered_float::OrderedFloat;
9
use dozer_types::{rust_decimal, types::*};
10
use postgres::{Column, Row};
11
use postgres_types::{Type, WasNull};
12
use rust_decimal::prelude::FromPrimitive;
13
use rust_decimal::Decimal;
14
use std::error::Error;
15
use std::vec;
16

17
pub fn postgres_type_to_field(
11✔
18
    value: Option<&Bytes>,
11✔
19
    column: &TableColumn,
11✔
20
) -> Result<Field, PostgresSchemaError> {
11✔
21
    value.map_or(Ok(Field::Null), |v| {
11✔
22
        column
10✔
23
            .r#type
10✔
24
            .clone()
10✔
25
            .map_or(Err(ColumnTypeNotFound), |column_type| match column_type {
10✔
26
                Type::INT2 | Type::INT4 | Type::INT8 => Ok(Field::Int(
1✔
27
                    String::from_utf8(v.to_vec()).unwrap().parse().unwrap(),
1✔
28
                )),
1✔
29
                Type::FLOAT4 | Type::FLOAT8 => Ok(Field::Float(OrderedFloat(
1✔
30
                    String::from_utf8(v.to_vec())
1✔
31
                        .unwrap()
1✔
32
                        .parse::<f64>()
1✔
33
                        .unwrap(),
1✔
34
                ))),
1✔
35
                Type::TEXT | Type::VARCHAR | Type::CHAR | Type::BPCHAR => {
36
                    Ok(Field::String(String::from_utf8(v.to_vec()).unwrap()))
1✔
37
                }
38
                Type::BYTEA => Ok(Field::Binary(v.to_vec())),
1✔
39
                Type::NUMERIC => Ok(Field::Decimal(
1✔
40
                    Decimal::from_f64(
1✔
41
                        String::from_utf8(v.to_vec())
1✔
42
                            .unwrap()
1✔
43
                            .parse::<f64>()
1✔
44
                            .unwrap(),
1✔
45
                    )
1✔
46
                    .unwrap(),
1✔
47
                )),
1✔
48
                Type::TIMESTAMP => {
49
                    let date = NaiveDateTime::parse_from_str(
1✔
50
                        String::from_utf8(v.to_vec()).unwrap().as_str(),
1✔
51
                        "%Y-%m-%d %H:%M:%S",
1✔
52
                    )
1✔
53
                    .unwrap();
1✔
54
                    Ok(Field::Timestamp(DateTime::from_utc(date, Utc.fix())))
1✔
55
                }
56
                Type::TIMESTAMPTZ => {
57
                    let date: DateTime<FixedOffset> = DateTime::parse_from_str(
1✔
58
                        String::from_utf8(v.to_vec()).unwrap().as_str(),
1✔
59
                        "%Y-%m-%d %H:%M:%S%.f%#z",
1✔
60
                    )
1✔
61
                    .unwrap();
1✔
62
                    Ok(Field::Timestamp(date))
1✔
63
                }
64
                Type::DATE => {
65
                    let date: NaiveDate = NaiveDate::parse_from_str(
×
66
                        String::from_utf8(v.to_vec()).unwrap().as_str(),
×
67
                        DATE_FORMAT,
×
68
                    )
×
69
                    .unwrap();
×
70
                    Ok(Field::from(date))
×
71
                }
72
                Type::JSONB | Type::JSON => Ok(Field::Bson(v.to_vec())),
1✔
73
                Type::BOOL => Ok(Field::Boolean(v.slice(0..1) == "t")),
2✔
74
                _ => Err(ColumnTypeNotSupported(column_type.name().to_string())),
×
75
            })
10✔
76
    })
11✔
77
}
11✔
78

79
pub fn postgres_type_to_dozer_type(column_type: Type) -> Result<FieldType, PostgresSchemaError> {
×
80
    match column_type {
×
81
        Type::BOOL => Ok(FieldType::Boolean),
×
82
        Type::INT2 | Type::INT4 | Type::INT8 => Ok(FieldType::Int),
×
83
        Type::CHAR | Type::TEXT | Type::VARCHAR | Type::BPCHAR => Ok(FieldType::String),
×
84
        Type::FLOAT4 | Type::FLOAT8 => Ok(FieldType::Float),
×
85
        Type::BIT => Ok(FieldType::Binary),
×
86
        Type::TIMESTAMP | Type::TIMESTAMPTZ => Ok(FieldType::Timestamp),
×
87
        Type::NUMERIC => Ok(FieldType::Decimal),
×
88
        Type::JSONB => Ok(FieldType::Bson),
×
89
        Type::DATE => Ok(FieldType::Date),
×
90
        _ => Err(ColumnTypeNotSupported(column_type.name().to_string())),
×
91
    }
92
}
×
93

94
fn handle_error(e: postgres::error::Error) -> Result<Field, PostgresSchemaError> {
95
    if let Some(e) = e.source() {
×
96
        if let Some(_e) = e.downcast_ref::<WasNull>() {
×
97
            Ok(Field::Null)
×
98
        } else {
99
            Err(ValueConversionError(e.to_string()))
×
100
        }
101
    } else {
102
        Err(ValueConversionError(e.to_string()))
×
103
    }
104
}
×
105

106
macro_rules! convert_row_value_to_field {
107
    ($a:ident, $b:ident, $c:ty) => {{
108
        let value: Result<$c, _> = $a.try_get($b);
109
        value.map_or_else(handle_error, |val| Ok(Field::from(val)))
110
    }};
111
}
112

113
pub fn value_to_field(
×
114
    row: &Row,
×
115
    idx: usize,
×
116
    col_type: &Type,
×
117
) -> Result<Field, PostgresSchemaError> {
×
118
    match col_type {
×
119
        &Type::BOOL => convert_row_value_to_field!(row, idx, bool),
×
120
        &Type::INT2 => convert_row_value_to_field!(row, idx, i16),
×
121
        &Type::INT4 => convert_row_value_to_field!(row, idx, i32),
×
122
        &Type::INT8 => convert_row_value_to_field!(row, idx, i64),
×
123
        &Type::CHAR | &Type::TEXT | &Type::VARCHAR | &Type::BPCHAR => {
124
            convert_row_value_to_field!(row, idx, String)
×
125
        }
126
        &Type::FLOAT4 => convert_row_value_to_field!(row, idx, f32),
×
127
        &Type::FLOAT8 => convert_row_value_to_field!(row, idx, f64),
×
128
        &Type::TIMESTAMP => convert_row_value_to_field!(row, idx, NaiveDateTime),
×
129
        &Type::TIMESTAMPTZ => convert_row_value_to_field!(row, idx, DateTime<FixedOffset>),
×
130
        &Type::NUMERIC => convert_row_value_to_field!(row, idx, Decimal),
×
131
        &Type::DATE => convert_row_value_to_field!(row, idx, NaiveDate),
×
132
        &Type::BYTEA => {
133
            let value: Result<Vec<u8>, _> = row.try_get(idx);
×
134
            value.map_or_else(handle_error, |v| Ok(Field::Binary(v)))
×
135
        }
136
        &Type::JSONB => {
137
            let value: Result<Vec<u8>, _> = row.try_get(idx);
×
138
            value.map_or_else(handle_error, |v| Ok(Field::Bson(v)))
×
139
        }
140
        _ => {
141
            if col_type.schema() == "pg_catalog" {
×
142
                Err(ColumnTypeNotSupported(col_type.name().to_string()))
×
143
            } else {
144
                Err(CustomTypeNotSupported)
×
145
            }
146
        }
147
    }
148
}
×
149

150
pub fn get_values(row: &Row, columns: &[Column]) -> Result<Vec<Field>, PostgresSchemaError> {
×
151
    let mut values: Vec<Field> = vec![];
×
152
    for (idx, col) in columns.iter().enumerate() {
×
153
        let val = value_to_field(row, idx, col.type_());
×
154
        match val {
×
155
            Ok(val) => values.push(val),
×
156
            Err(e) => return Err(e),
×
157
        };
158
    }
159
    Ok(values)
×
160
}
×
161

162
pub fn map_row_to_operation_event(
×
163
    _table_name: String,
×
164
    identifier: SchemaIdentifier,
×
165
    row: &Row,
×
166
    columns: &[Column],
×
167
    seq_no: u64,
×
168
) -> Result<OperationEvent, PostgresSchemaError> {
×
169
    match get_values(row, columns) {
×
170
        Ok(values) => Ok(OperationEvent {
×
171
            operation: Operation::Insert {
×
172
                new: Record::new(Some(identifier), values, None),
×
173
            },
×
174
            seq_no,
×
175
        }),
×
176
        Err(e) => Err(e),
×
177
    }
178
}
×
179

180
pub fn map_schema(rel_id: &u32, columns: &[Column]) -> Result<Schema, ConnectorError> {
×
181
    let field_defs: Result<Vec<FieldDefinition>, _> =
×
182
        columns.iter().map(convert_column_to_field).collect();
×
183

×
184
    Ok(Schema {
×
185
        identifier: Some(SchemaIdentifier {
×
186
            id: *rel_id,
×
187
            version: 1,
×
188
        }),
×
189
        fields: field_defs.unwrap(),
×
190
        primary_index: vec![0],
×
191
    })
×
192
}
×
193

194
pub fn convert_column_to_field(column: &Column) -> Result<FieldDefinition, PostgresSchemaError> {
×
195
    postgres_type_to_dozer_type(column.type_().clone()).map(|typ| FieldDefinition {
×
196
        name: column.name().to_string(),
×
197
        typ,
×
198
        nullable: true,
×
199
    })
×
200
}
×
201

202
#[cfg(test)]
203
mod tests {
204
    use super::*;
205
    use dozer_types::chrono::NaiveDate;
206

207
    #[macro_export]
208
    macro_rules! test_conversion {
209
        ($a:expr,$b:expr,$c:expr) => {
210
            let value = postgres_type_to_field(
211
                Some(&Bytes::from($a)),
212
                &TableColumn {
213
                    name: "column".to_string(),
214
                    type_id: $b.oid() as i32,
215
                    flags: 0,
216
                    r#type: Some($b),
217
                    idx: 0,
218
                },
219
            );
220
            assert_eq!(value.unwrap(), $c);
221
        };
222
    }
223

224
    #[test]
1✔
225
    fn it_converts_postgres_type_to_field() {
1✔
226
        test_conversion!("12", Type::INT8, Field::Int(12));
1✔
227
        test_conversion!("4.7809", Type::FLOAT8, Field::Float(OrderedFloat(4.7809)));
1✔
228
        let value = String::from("Test text");
1✔
229
        test_conversion!("Test text", Type::TEXT, Field::String(value));
1✔
230

231
        // UTF-8 bytes representation of json (https://www.charset.org/utf-8)
232
        let value: Vec<u8> = vec![98, 121, 116, 101, 97];
1✔
233
        test_conversion!("bytea", Type::BYTEA, Field::Binary(value));
1✔
234

235
        let value = Decimal::from_f64(8.28).unwrap();
1✔
236
        test_conversion!("8.28", Type::NUMERIC, Field::Decimal(value));
1✔
237

238
        let value = DateTime::from_utc(
1✔
239
            NaiveDate::from_ymd(2022, 9, 16).and_hms(5, 56, 29),
1✔
240
            Utc.fix(),
1✔
241
        );
1✔
242
        test_conversion!(
1✔
243
            "2022-09-16 05:56:29",
1✔
244
            Type::TIMESTAMP,
1✔
245
            Field::Timestamp(value)
1✔
246
        );
1✔
247

248
        let value = DateTime::from_utc(
1✔
249
            NaiveDate::from_ymd(2022, 9, 16).and_hms_micro(3, 56, 30, 959787),
1✔
250
            Utc.fix(),
1✔
251
        );
1✔
252
        test_conversion!(
1✔
253
            "2022-09-16 10:56:30.959787+07",
1✔
254
            Type::TIMESTAMPTZ,
1✔
255
            Field::Timestamp(value)
1✔
256
        );
1✔
257

258
        // UTF-8 bytes representation of json (https://www.charset.org/utf-8)
259
        let value = vec![123, 34, 97, 98, 99, 34, 58, 34, 102, 111, 111, 34, 125];
1✔
260
        test_conversion!("{\"abc\":\"foo\"}", Type::JSONB, Field::Bson(value));
1✔
261

262
        test_conversion!("t", Type::BOOL, Field::Boolean(true));
1✔
263
        test_conversion!("f", Type::BOOL, Field::Boolean(false));
1✔
264
    }
1✔
265

266
    #[test]
1✔
267
    fn test_none_value() {
1✔
268
        let value = postgres_type_to_field(
1✔
269
            None,
1✔
270
            &TableColumn {
1✔
271
                name: "column".to_string(),
1✔
272
                type_id: Type::VARCHAR.oid() as i32,
1✔
273
                flags: 0,
1✔
274
                r#type: Some(Type::VARCHAR),
1✔
275
                idx: 0,
1✔
276
            },
1✔
277
        );
1✔
278
        assert_eq!(value.unwrap(), Field::Null);
1✔
279
    }
1✔
280
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc