• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6009657516

29 Aug 2023 08:13AM UTC coverage: 76.652% (-1.4%) from 78.07%
6009657516

push

github

web-flow
chore: Create unit tests workflow (#1910)

* chore: Update for Rust 1.72.0

Rust 1.72.0 has introduced a bunch of new lints. Here we fix them all.

`let ... else` finally gets formatted.

* chire: Create unit tests workflow

* Rename and remove useless steps

* remove env vars

* Add concurrency group

* Test unit workflow on 4 cores

* Add mysql service to unit tests

---------

Co-authored-by: chubei <914745487@qq.com>

48982 of 63902 relevant lines covered (76.65%)

48394.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.43
/dozer-core/src/tests/sinks.rs
1
use crate::epoch::Epoch;
2
use crate::executor_operation::ProcessorOperation;
3
use crate::node::{PortHandle, Sink, SinkFactory};
4
use crate::processor_record::ProcessorRecordStore;
5
use crate::DEFAULT_PORT_HANDLE;
6
use dozer_log::storage::Queue;
7
use dozer_types::errors::internal::BoxedError;
8
use dozer_types::types::Schema;
9

10
use dozer_types::log::debug;
11
use std::collections::HashMap;
12

13
use std::sync::atomic::{AtomicBool, Ordering};
14
use std::sync::Arc;
15

16
pub(crate) const COUNTING_SINK_INPUT_PORT: PortHandle = 90;
17

18
#[derive(Debug)]
×
19
pub(crate) struct CountingSinkFactory {
×
20
    expected: u64,
21
    running: Arc<AtomicBool>,
22
}
23

24
impl CountingSinkFactory {
25
    pub fn new(expected: u64, barrier: Arc<AtomicBool>) -> Self {
19✔
26
        Self {
19✔
27
            expected,
19✔
28
            running: barrier,
19✔
29
        }
19✔
30
    }
19✔
31
}
×
32

33
impl SinkFactory for CountingSinkFactory {
34
    fn get_input_ports(&self) -> Vec<PortHandle> {
80✔
35
        vec![COUNTING_SINK_INPUT_PORT]
80✔
36
    }
80✔
37

×
38
    fn prepare(&self, _input_schemas: HashMap<PortHandle, Schema>) -> Result<(), BoxedError> {
19✔
39
        Ok(())
19✔
40
    }
19✔
41

×
42
    fn build(
15✔
43
        &self,
15✔
44
        _input_schemas: HashMap<PortHandle, Schema>,
15✔
45
    ) -> Result<Box<dyn Sink>, BoxedError> {
15✔
46
        Ok(Box::new(CountingSink {
15✔
47
            expected: self.expected,
15✔
48
            current: 0,
15✔
49
            running: self.running.clone(),
15✔
50
        }))
15✔
51
    }
15✔
52
}
×
53

×
54
#[derive(Debug)]
×
55
pub(crate) struct CountingSink {
×
56
    expected: u64,
57
    current: u64,
58
    running: Arc<AtomicBool>,
×
59
}
60
impl Sink for CountingSink {
61
    fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> {
480✔
62
        // if self.current == self.expected {
480✔
63
        //     info!(
480✔
64
        //         "Received {} messages. Notifying sender to exit!",
480✔
65
        //         self.current
480✔
66
        //     );
480✔
67
        //     self.running.store(false, Ordering::Relaxed);
480✔
68
        // }
480✔
69
        Ok(())
480✔
70
    }
480✔
71

×
72
    fn process(
3,233,835✔
73
        &mut self,
3,233,835✔
74
        _from_port: PortHandle,
3,233,835✔
75
        _record_store: &ProcessorRecordStore,
3,233,835✔
76
        _op: ProcessorOperation,
3,233,835✔
77
    ) -> Result<(), BoxedError> {
3,233,835✔
78
        self.current += 1;
3,233,835✔
79
        if self.current == self.expected {
3,233,835✔
80
            debug!(
10✔
81
                "Received {} messages. Notifying sender to exit!",
×
82
                self.current
×
83
            );
×
84
            self.running.store(false, Ordering::Relaxed);
10✔
85
        }
3,233,825✔
86
        Ok(())
3,233,835✔
87
    }
3,233,835✔
88

×
89
    fn persist(&mut self, _queue: &Queue) -> Result<(), BoxedError> {
29✔
90
        Ok(())
29✔
91
    }
29✔
92

93
    fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> {
×
94
        Ok(())
×
95
    }
×
96
}
97

×
98
#[derive(Debug)]
×
99
pub struct ConnectivityTestSinkFactory;
×
100

101
impl SinkFactory for ConnectivityTestSinkFactory {
102
    fn get_input_ports(&self) -> Vec<PortHandle> {
11✔
103
        vec![DEFAULT_PORT_HANDLE]
11✔
104
    }
11✔
105

106
    fn prepare(&self, _input_schemas: HashMap<PortHandle, Schema>) -> Result<(), BoxedError> {
×
107
        unimplemented!("This struct is for connectivity test, only input ports are defined")
×
108
    }
×
109

110
    fn build(
×
111
        &self,
×
112
        _input_schemas: HashMap<PortHandle, Schema>,
×
113
    ) -> Result<Box<dyn Sink>, BoxedError> {
×
114
        unimplemented!("This struct is for connectivity test, only input ports are defined")
×
115
    }
116
}
117

×
118
#[derive(Debug)]
×
119
pub struct NoInputPortSinkFactory;
×
120

×
121
impl SinkFactory for NoInputPortSinkFactory {
×
122
    fn get_input_ports(&self) -> Vec<PortHandle> {
1✔
123
        vec![]
1✔
124
    }
1✔
125

×
126
    fn prepare(&self, _input_schemas: HashMap<PortHandle, Schema>) -> Result<(), BoxedError> {
×
127
        unimplemented!("This struct is for connectivity test, only input ports are defined")
×
128
    }
129

×
130
    fn build(
×
131
        &self,
×
132
        _input_schemas: HashMap<PortHandle, Schema>,
×
133
    ) -> Result<Box<dyn Sink>, BoxedError> {
×
134
        unimplemented!("This struct is for connectivity test, only input ports are defined")
×
135
    }
×
136
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc