• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/simple/tests.rs
1
use std::{
2
    fs,
3
    sync::{
4
        atomic::{AtomicBool, Ordering},
5
        Arc,
6
    },
7
    thread,
8
    time::Duration,
9
};
10

11
use dozer_api::CacheEndpoint;
12
use dozer_cache::cache::{expression::QueryExpression, test_utils, Cache, CacheOptions, LmdbCache};
13
use dozer_types::{
14
    ingestion_types::IngestionMessage,
15
    log::warn,
16
    models::{
17
        self,
18
        api_endpoint::{ApiEndpoint, ApiIndex},
19
        app_config::Config,
20
        connection::EventsAuthentication,
21
        flags::Flags,
22
    },
23
    types::{Field, OperationEvent, Record, Schema},
24
};
25
use serde_json::{json, Value};
26
use tempdir::TempDir;
27

28
use crate::pipeline::CacheSinkSettings;
29

30
use super::executor::Executor;
31

32
fn single_source_sink_impl(schema: Schema) {
×
33
    let source = models::source::Source {
×
34
        id: Some("1".to_string()),
×
35
        name: "events".to_string(),
×
36
        table_name: "events".to_string(),
×
37
        columns: vec![],
×
38
        connection: Some(models::connection::Connection {
×
39
            authentication: Some(models::connection::Authentication::Events(
×
40
                EventsAuthentication::default(),
×
41
            )),
×
42
            id: Some("1".to_string()),
×
43
            db_type: models::connection::DBType::Events as i32,
×
44
            name: "events".to_string(),
×
45
            ..Default::default()
×
46
        }),
×
47
        refresh_config: Some(models::source::RefreshConfig::default()),
×
48
        ..Default::default()
×
49
    };
×
50

×
51
    let table_name = "events";
×
52
    let cache = Arc::new(LmdbCache::new(CacheOptions::default()).unwrap());
×
53
    let cache_endpoint = CacheEndpoint {
×
54
        cache: cache.clone(),
×
55
        endpoint: ApiEndpoint {
×
56
            id: Some("1".to_string()),
×
57
            name: table_name.to_string(),
×
58
            path: "/events".to_string(),
×
59
            // sql: Some("select a, b from events group by a,b;".to_string()),
×
60
            index: Some(ApiIndex {
×
61
                primary_key: vec!["a".to_string()],
×
62
            }),
×
63
            ..Default::default()
×
64
        },
×
65
    };
×
66

×
67
    let running = Arc::new(AtomicBool::new(true));
×
68
    let r = running.clone();
×
69
    let executor_running = running;
×
70

×
71
    let items: Vec<(i64, String, i64)> = vec![
×
72
        (1, "yuri".to_string(), 521),
×
73
        (2, "mega".to_string(), 521),
×
74
        (3, "james".to_string(), 523),
×
75
        (4, "james".to_string(), 524),
×
76
        (5, "steff".to_string(), 526),
×
77
        (6, "mega".to_string(), 527),
×
78
        (7, "james".to_string(), 528),
×
79
    ];
×
80

×
81
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
82
    if tmp_dir.path().exists() {
×
83
        fs::remove_dir_all(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to remove old dir"));
×
84
    }
×
85
    fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
86

×
87
    let tmp_path = tmp_dir.path().to_owned();
×
88
    let executor = Executor::new(
×
89
        Config {
×
90
            sources: vec![source],
×
91
            ..Config::default()
×
92
        },
×
93
        vec![cache_endpoint],
×
94
        executor_running,
×
95
        tmp_path,
×
96
    );
×
97
    let flags = Flags::default();
×
98
    let (dag_executor, ingestors) = executor
×
99
        .create_dag_executor(None, CacheSinkSettings::new(Some(flags), None))
×
100
        .unwrap();
×
101

×
102
    let _thread = thread::spawn(move || match Executor::run_dag_executor(dag_executor) {
×
103
        Ok(_) => {}
×
104
        Err(e) => warn!("Exiting: {:?}", e),
×
105
    });
×
106

×
107
    // Insert each record and query cache
×
108
    let ingestor = ingestors.into_iter().next().unwrap();
×
109
    for (a, b, c) in items {
×
110
        let record = Record::new(
×
111
            schema.identifier,
×
112
            vec![Field::Int(a), Field::String(b), Field::Int(c)],
×
113
            None,
×
114
        );
×
115
        ingestor
×
116
            .write()
×
117
            .handle_message((
×
118
                (1, 0),
×
119
                IngestionMessage::OperationEvent(OperationEvent {
×
120
                    seq_no: a as u64,
×
121
                    operation: dozer_types::types::Operation::Insert { new: record },
×
122
                }),
×
123
            ))
×
124
            .unwrap();
×
125
    }
×
126

×
127
    // Allow for the thread to process the records
×
128
    thread::sleep(Duration::from_millis(3000));
×
129
    // Shutdown the thread
×
130
    r.store(false, Ordering::SeqCst);
×
131

×
132
    test_query("events".to_string(), json!({}), 7, &cache);
×
133
}
×
134

×
135
#[test]
×
136
#[ignore]
×
137
fn single_source_sink() {
×
138
    let mut schema = test_utils::schema_1().0;
×
139
    single_source_sink_impl(schema.clone());
×
140
    schema.primary_index.clear();
×
141
    single_source_sink_impl(schema);
×
142
}
×
143

×
144
fn test_query(schema_name: String, query: Value, count: usize, cache: &LmdbCache) {
×
145
    let query = serde_json::from_value::<QueryExpression>(query).unwrap();
×
146
    let records = cache.query(&schema_name, &query).unwrap();
×
147

×
148
    assert_eq!(records.len(), count, "Count must be equal : {query:?}");
×
149
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc