• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4295401807

pending completion
4295401807

push

github

GitHub
Bump version (#1099)

28685 of 39545 relevant lines covered (72.54%)

52105.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.15
/dozer-types/src/types/mod.rs
1
use ahash::AHasher;
2
use geo::{point, GeodesicDistance, Point};
3
use ordered_float::OrderedFloat;
4
use std::array::TryFromSliceError;
5
use std::cmp::Ordering;
6
use std::fmt::{Display, Formatter};
7
use std::hash::{Hash, Hasher};
8
use std::str::FromStr;
9

10
use crate::errors::types::TypeError;
11
use prettytable::{Cell, Row, Table};
12
use serde::{self, Deserialize, Serialize};
13

14
mod field;
15

16
use crate::errors::types::TypeError::InvalidFieldValue;
17
pub use field::{field_test_cases, Field, FieldBorrow, FieldType, DATE_FORMAT};
18

19
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord)]
1,282,258✔
20
pub enum SourceDefinition {
21
    Table { connection: String, name: String },
22
    Alias { name: String },
23
    Dynamic,
24
}
25
impl Default for SourceDefinition {
26
    fn default() -> Self {
180✔
27
        SourceDefinition::Dynamic
180✔
28
    }
180✔
29
}
30

31
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, PartialOrd, Ord)]
1,278,571✔
32
pub struct FieldDefinition {
33
    pub name: String,
34
    pub typ: FieldType,
35
    pub nullable: bool,
36
    #[serde(default)]
37
    pub source: SourceDefinition,
38
}
39

40
impl FieldDefinition {
41
    pub fn new(name: String, typ: FieldType, nullable: bool, source: SourceDefinition) -> Self {
34,720✔
42
        Self {
34,720✔
43
            name,
34,720✔
44
            typ,
34,720✔
45
            nullable,
34,720✔
46
            source,
34,720✔
47
        }
34,720✔
48
    }
34,720✔
49
}
50

51
#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
1,451,030✔
52
pub struct SchemaIdentifier {
53
    pub id: u32,
54
    pub version: u16,
55
}
56

57
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
74,734✔
58
pub struct Schema {
59
    /// Unique identifier and version for this schema. This value is required only if the schema
60
    /// is represented by a valid entry in the schema registry. For nested schemas, this field
61
    /// is not applicable
62
    pub identifier: Option<SchemaIdentifier>,
63

64
    /// fields contains a list of FieldDefinition for all the fields that appear in a record.
65
    /// Not necessarily all these fields will end up in the final object structure stored in
66
    /// the cache. Some fields might only be used for indexing purposes only.
67
    pub fields: Vec<FieldDefinition>,
68

69
    /// Indexes of the fields forming the primary key for this schema. If the value is empty
70
    /// only Insert Operation are supported. Updates and Deletes are not supported without a
71
    /// primary key definition
72
    #[serde(default)]
73
    pub primary_index: Vec<usize>,
74
}
75

76
#[derive(Clone, Serialize, Deserialize, Debug)]
65✔
77
pub enum ReplicationChangesTrackingType {
78
    FullChanges,
79
    OnlyPK,
80
    Nothing,
81
}
82

83
impl Default for ReplicationChangesTrackingType {
84
    fn default() -> Self {
90✔
85
        ReplicationChangesTrackingType::Nothing
90✔
86
    }
90✔
87
}
88

89
#[derive(Clone, Serialize, Deserialize, Debug)]
225✔
90
pub struct SourceSchema {
91
    pub name: String,
92
    pub schema: Schema,
93
    #[serde(default)]
94
    pub replication_type: ReplicationChangesTrackingType,
95
}
96

97
impl SourceSchema {
98
    pub fn new(
20✔
99
        name: String,
20✔
100
        schema: Schema,
20✔
101
        replication_type: ReplicationChangesTrackingType,
20✔
102
    ) -> Self {
20✔
103
        Self {
20✔
104
            name,
20✔
105
            schema,
20✔
106
            replication_type,
20✔
107
        }
20✔
108
    }
20✔
109
}
110

111
impl Schema {
112
    pub fn empty() -> Schema {
10,000✔
113
        Self {
10,000✔
114
            identifier: None,
10,000✔
115
            fields: Vec::new(),
10,000✔
116
            primary_index: Vec::new(),
10,000✔
117
        }
10,000✔
118
    }
10,000✔
119

120
    pub fn field(&mut self, f: FieldDefinition, pk: bool) -> &mut Self {
26,990✔
121
        self.fields.push(f);
26,990✔
122
        if pk {
26,990✔
123
            self.primary_index.push(&self.fields.len() - 1)
2,380✔
124
        }
24,610✔
125
        self
26,990✔
126
    }
26,990✔
127

128
    pub fn get_field_index(&self, name: &str) -> Result<(usize, &FieldDefinition), TypeError> {
×
129
        let r = self
×
130
            .fields
×
131
            .iter()
×
132
            .enumerate()
×
133
            .find(|f| f.1.name.as_str() == name);
×
134
        match r {
×
135
            Some(v) => Ok(v),
×
136
            _ => Err(TypeError::InvalidFieldName(name.to_string())),
×
137
        }
138
    }
×
139

140
    pub fn print(&self) -> Table {
80✔
141
        let mut table = Table::new();
80✔
142
        table.add_row(row!["Field", "Type", "Nullable"]);
80✔
143
        for f in &self.fields {
280✔
144
            table.add_row(row![f.name, format!("{:?}", f.typ), f.nullable]);
200✔
145
        }
200✔
146
        table
80✔
147
    }
80✔
148

149
    pub fn set_identifier(
×
150
        &mut self,
×
151
        identifier: Option<SchemaIdentifier>,
×
152
    ) -> Result<(), TypeError> {
×
153
        self.identifier = identifier;
×
154
        Ok(())
×
155
    }
×
156
}
157

158
impl Display for Schema {
159
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
160
        let table = self.print();
×
161
        table.fmt(f)
×
162
    }
×
163
}
164

165
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
1,376✔
166
pub enum IndexDefinition {
167
    /// The sorted inverted index, supporting `Eq` filter on multiple fields and `LT`, `LTE`, `GT`, `GTE` filter on at most one field.
168
    SortedInverted(Vec<usize>),
169
    /// Full text index, supporting `Contains`, `MatchesAny` and `MatchesAll` filter on exactly one field.
170
    FullText(usize),
171
}
172

173
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2,353,072✔
174
pub struct Record {
175
    /// Schema implemented by this Record
176
    pub schema_id: Option<SchemaIdentifier>,
177
    /// List of values, following the definitions of `fields` of the associated schema
178
    pub values: Vec<Field>,
179
    /// Records with same primary key will have increasing version.
180
    pub version: Option<u32>,
181
}
182

183
impl Record {
184
    pub fn new(
43,375,910✔
185
        schema_id: Option<SchemaIdentifier>,
43,375,910✔
186
        values: Vec<Field>,
43,375,910✔
187
        version: Option<u32>,
43,375,910✔
188
    ) -> Record {
43,375,910✔
189
        Record {
43,375,910✔
190
            schema_id,
43,375,910✔
191
            values,
43,375,910✔
192
            version,
43,375,910✔
193
        }
43,375,910✔
194
    }
43,375,910✔
195

196
    pub fn from_schema(schema: &Schema) -> Record {
56,470✔
197
        Record {
56,470✔
198
            schema_id: schema.identifier,
56,470✔
199
            values: vec![Field::Null; schema.fields.len()],
56,470✔
200
            version: None,
56,470✔
201
        }
56,470✔
202
    }
56,470✔
203

204
    pub fn nulls(schema_id: Option<SchemaIdentifier>, size: usize, version: Option<u32>) -> Record {
×
205
        Record {
×
206
            schema_id,
×
207
            values: vec![Field::Null; size],
×
208
            version,
×
209
        }
×
210
    }
×
211

212
    pub fn iter(&self) -> core::slice::Iter<'_, Field> {
×
213
        self.values.iter()
×
214
    }
×
215

216
    pub fn set_value(&mut self, idx: usize, value: Field) {
×
217
        self.values[idx] = value;
×
218
    }
×
219

220
    pub fn push_value(&mut self, value: Field) {
×
221
        self.values.push(value);
×
222
    }
×
223

224
    pub fn get_value(&self, idx: usize) -> Result<&Field, TypeError> {
2,179,470✔
225
        match self.values.get(idx) {
2,179,470✔
226
            Some(f) => Ok(f),
2,179,470✔
227
            _ => Err(TypeError::InvalidFieldIndex(idx)),
×
228
        }
229
    }
2,179,470✔
230

231
    pub fn get_key(&self, indexes: &Vec<usize>) -> Vec<u8> {
5,608,480✔
232
        debug_assert!(!indexes.is_empty(), "Primary key indexes cannot be empty");
5,610,910✔
233

234
        let mut tot_size = 0_usize;
5,608,430✔
235
        let mut buffers = Vec::<Vec<u8>>::with_capacity(indexes.len());
5,608,430✔
236
        for i in indexes {
11,219,430✔
237
            let bytes = self.values[*i].encode();
5,611,000✔
238
            tot_size += bytes.len();
5,611,000✔
239
            buffers.push(bytes);
5,611,000✔
240
        }
5,611,000✔
241

242
        let mut res_buffer = Vec::<u8>::with_capacity(tot_size);
5,608,430✔
243
        for i in buffers {
11,219,280✔
244
            res_buffer.extend(i);
5,610,850✔
245
        }
5,610,850✔
246
        res_buffer
5,608,430✔
247
    }
5,608,430✔
248

249
    pub fn get_values_hash(&self) -> u64 {
56,730✔
250
        let mut hasher = AHasher::default();
56,730✔
251

252
        for (index, field) in self.values.iter().enumerate() {
56,730✔
253
            hasher.write_i32(index as i32);
56,730✔
254
            match field {
56,730✔
255
                Field::UInt(i) => {
×
256
                    hasher.write_u8(1);
×
257
                    hasher.write_u64(*i);
×
258
                }
×
259
                Field::Int(i) => {
56,730✔
260
                    hasher.write_u8(2);
56,730✔
261
                    hasher.write_i64(*i);
56,730✔
262
                }
56,730✔
263
                Field::Float(f) => {
×
264
                    hasher.write_u8(3);
×
265
                    hasher.write(&((*f).to_ne_bytes()));
×
266
                }
×
267
                Field::Boolean(b) => {
×
268
                    hasher.write_u8(4);
×
269
                    hasher.write_u8(if *b { 1_u8 } else { 0_u8 });
×
270
                }
271
                Field::String(s) => {
×
272
                    hasher.write_u8(5);
×
273
                    hasher.write(s.as_str().as_bytes());
×
274
                }
×
275
                Field::Text(t) => {
×
276
                    hasher.write_u8(6);
×
277
                    hasher.write(t.as_str().as_bytes());
×
278
                }
×
279
                Field::Binary(b) => {
×
280
                    hasher.write_u8(7);
×
281
                    hasher.write(b.as_ref());
×
282
                }
×
283
                Field::Decimal(d) => {
×
284
                    hasher.write_u8(8);
×
285
                    hasher.write(&d.serialize());
×
286
                }
×
287
                Field::Timestamp(t) => {
×
288
                    hasher.write_u8(9);
×
289
                    hasher.write_i64(t.timestamp())
×
290
                }
291
                Field::Date(d) => {
×
292
                    hasher.write_u8(10);
×
293
                    hasher.write(d.to_string().as_bytes());
×
294
                }
×
295
                Field::Bson(b) => {
×
296
                    hasher.write_u8(11);
×
297
                    hasher.write(b.as_ref());
×
298
                }
×
299
                Field::Point(p) => {
×
300
                    hasher.write_u8(12);
×
301
                    hasher.write(p.to_bytes().as_slice());
×
302
                }
×
303
                Field::Null => {
×
304
                    hasher.write_u8(0);
×
305
                }
×
306
            }
307
        }
308
        hasher.finish()
56,730✔
309
    }
56,730✔
310
}
311

312
impl Display for Record {
313
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
314
        let v = self
×
315
            .values
×
316
            .iter()
×
317
            .map(|f| Cell::new(&f.to_string().unwrap_or("".to_string())))
×
318
            .collect::<Vec<Cell>>();
×
319

×
320
        let mut table = Table::new();
×
321
        table.add_row(Row::new(v));
×
322
        table.fmt(f)
×
323
    }
×
324
}
325

326
#[derive(Clone, Serialize, Deserialize, Debug, Copy)]
×
327
pub struct Commit {
328
    pub seq_no: u64,
329
    pub lsn: u64,
330
}
331

332
impl Commit {
333
    pub fn new(seq_no: u64, lsn: u64) -> Self {
×
334
        Self { seq_no, lsn }
×
335
    }
×
336
}
337

338
#[derive(Clone, Debug, PartialEq, Eq)]
130,005✔
339
pub enum Operation {
340
    Delete { old: Record },
341
    Insert { new: Record },
342
    Update { old: Record, new: Record },
343
    SnapshottingDone {},
344
}
345

346
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
63✔
347
pub struct DozerPoint(pub Point<OrderedFloat<f64>>);
348

349
impl GeodesicDistance<OrderedFloat<f64>> for DozerPoint {
350
    fn geodesic_distance(&self, rhs: &Self) -> OrderedFloat<f64> {
20✔
351
        let f = point! { x: self.0.x().0, y: self.0.y().0 };
20✔
352
        let t = point! { x: rhs.0.x().0, y: rhs.0.y().0 };
20✔
353
        OrderedFloat(f.geodesic_distance(&t))
20✔
354
    }
20✔
355
}
356

357
impl Ord for DozerPoint {
358
    fn cmp(&self, other: &Self) -> Ordering {
×
359
        if self.0.x() == other.0.x() && self.0.y() == other.0.y() {
×
360
            Ordering::Equal
×
361
        } else if self.0.x() > other.0.x()
×
362
            || (self.0.x() == other.0.x() && self.0.y() > other.0.y())
×
363
        {
364
            Ordering::Greater
×
365
        } else {
366
            Ordering::Less
×
367
        }
368
    }
×
369
}
370

371
impl PartialOrd for DozerPoint {
372
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
×
373
        Some(self.cmp(other))
×
374
    }
×
375
}
376

377
impl FromStr for DozerPoint {
378
    type Err = TypeError;
379

380
    fn from_str(str: &str) -> Result<Self, Self::Err> {
13✔
381
        let error = || InvalidFieldValue {
13✔
382
            field_type: FieldType::Point,
2✔
383
            nullable: false,
2✔
384
            value: str.to_string(),
2✔
385
        };
2✔
386

×
387
        let s = str.replace('(', "");
13✔
388
        let s = s.replace(')', "");
13✔
389
        let mut cs = s.split(',');
13✔
390
        let x = cs
13✔
391
            .next()
13✔
392
            .ok_or_else(error)?
13✔
393
            .parse::<f64>()
13✔
394
            .map_err(|_| error())?;
13✔
395
        let y = cs
11✔
396
            .next()
11✔
397
            .ok_or_else(error)?
11✔
398
            .parse::<f64>()
11✔
399
            .map_err(|_| error())?;
11✔
400
        Ok(Self(Point::from((OrderedFloat(x), OrderedFloat(y)))))
11✔
401
    }
13✔
402
}
403

404
impl From<(f64, f64)> for DozerPoint {
405
    fn from((x, y): (f64, f64)) -> Self {
70✔
406
        Self(point! {x: OrderedFloat(x), y: OrderedFloat(y)})
70✔
407
    }
70✔
408
}
409

410
impl Display for DozerPoint {
411
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
412
        f.write_str(&format!("{:?}", self.0.x_y()))
×
413
    }
×
414
}
×
415

×
416
impl DozerPoint {
×
417
    pub fn to_bytes(&self) -> [u8; 16] {
×
418
        let mut result = [0_u8; 16];
×
419
        result[0..8].copy_from_slice(&self.0.x().to_be_bytes());
×
420
        result[8..16].copy_from_slice(&self.0.y().to_be_bytes());
×
421
        result
×
422
    }
×
423

×
424
    pub fn from_bytes(bytes: &[u8]) -> Result<Self, TryFromSliceError> {
×
425
        let x = f64::from_be_bytes(bytes[0..8].try_into()?);
×
426
        let y = f64::from_be_bytes(bytes[8..16].try_into()?);
×
427

428
        Ok(DozerPoint::from((x, y)))
×
429
    }
×
430
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc