• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4060960250

pending completion
4060960250

Pull #729

github

GitHub
Merge 25e0159b7 into de98caa91
Pull Request #729: feat: Implement multi-way JOIN

1356 of 1356 new or added lines in 10 files covered. (100.0%)

24359 of 38526 relevant lines covered (63.23%)

37090.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.6
/dozer-core/src/dag/executor_utils.rs
1
#![allow(clippy::type_complexity)]
2
use crate::dag::dag::{Dag, Edge, Endpoint};
3
use crate::dag::dag_metadata::METADATA_DB_NAME;
4
use crate::dag::errors::ExecutionError;
5
use crate::dag::errors::ExecutionError::InvalidOperation;
6
use crate::dag::executor::ExecutorOperation;
7
use crate::dag::node::{NodeHandle, OutputPortDef, OutputPortType, PortHandle};
8
use crate::dag::record_store::{
9
    AutogenRowKeyLookupRecordReader, PrimaryKeyValueLookupRecordReader, RecordReader,
10
};
11
use crate::storage::common::Database;
12
use crate::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
13
use crossbeam::channel::{bounded, Receiver, Select, Sender};
14
use dozer_types::types::{Operation, Schema};
15
use std::collections::HashMap;
16
use std::path::Path;
17

18
pub(crate) struct StorageMetadata {
19
    pub env: LmdbEnvironmentManager,
20
    pub meta_db: Database,
21
}
22

23
impl StorageMetadata {
24
    pub fn new(env: LmdbEnvironmentManager, meta_db: Database) -> Self {
748✔
25
        Self { env, meta_db }
748✔
26
    }
748✔
27
}
28

29
pub(crate) fn init_component<F>(
358✔
30
    node_handle: &NodeHandle,
358✔
31
    base_path: &Path,
358✔
32
    mut init_f: F,
358✔
33
) -> Result<StorageMetadata, ExecutionError>
358✔
34
where
358✔
35
    F: FnMut(&mut LmdbEnvironmentManager) -> Result<(), ExecutionError>,
358✔
36
{
358✔
37
    let mut env = LmdbEnvironmentManager::create(base_path, format!("{node_handle}").as_str())?;
358✔
38
    let db = env.open_database(METADATA_DB_NAME, false)?;
355✔
39
    init_f(&mut env)?;
355✔
40
    Ok(StorageMetadata::new(env, db))
355✔
41
}
358✔
42
#[inline]
43
pub(crate) fn init_select(receivers: &Vec<Receiver<ExecutorOperation>>) -> Select {
3,174✔
44
    let mut sel = Select::new();
3,174✔
45
    for r in receivers {
6,746✔
46
        sel.recv(r);
3,572✔
47
    }
3,572✔
48
    sel
3,174✔
49
}
3,174✔
50

51
pub(crate) fn requires_schema_update(
×
52
    _new: Schema,
×
53
    _port_handle: &PortHandle,
×
54
    input_schemas: &mut HashMap<PortHandle, Schema>,
×
55
    input_ports: &[PortHandle],
×
56
) -> bool {
×
57
    let count = input_ports
×
58
        .iter()
×
59
        .filter(|e| !input_schemas.contains_key(*e))
×
60
        .count();
×
61
    count == 0
×
62
}
×
63

64
pub(crate) fn map_to_op(op: ExecutorOperation) -> Result<Operation, ExecutionError> {
×
65
    match op {
×
66
        ExecutorOperation::Delete { old } => Ok(Operation::Delete { old }),
×
67
        ExecutorOperation::Insert { new } => Ok(Operation::Insert { new }),
×
68
        ExecutorOperation::Update { old, new } => Ok(Operation::Update { old, new }),
×
69
        _ => Err(InvalidOperation(op.to_string())),
×
70
    }
71
}
×
72

73
pub(crate) fn map_to_exec_op(op: Operation) -> ExecutorOperation {
×
74
    match op {
×
75
        Operation::Update { old, new } => ExecutorOperation::Update { old, new },
×
76
        Operation::Delete { old } => ExecutorOperation::Delete { old },
×
77
        Operation::Insert { new } => ExecutorOperation::Insert { new },
×
78
    }
79
}
×
80

81
pub(crate) fn index_edges<T: Clone>(
58✔
82
    dag: &Dag<T>,
58✔
83
    channel_buf_sz: usize,
58✔
84
) -> (
58✔
85
    HashMap<NodeHandle, HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>>,
58✔
86
    HashMap<NodeHandle, HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>>,
58✔
87
) {
58✔
88
    let mut senders: HashMap<NodeHandle, HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>> =
58✔
89
        HashMap::new();
58✔
90
    let mut receivers: HashMap<NodeHandle, HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>> =
58✔
91
        HashMap::new();
58✔
92

93
    for edge in dag.edges.iter() {
202✔
94
        if !senders.contains_key(&edge.from.node) {
202✔
95
            senders.insert(edge.from.node.clone(), HashMap::new());
196✔
96
        }
196✔
97
        if !receivers.contains_key(&edge.to.node) {
202✔
98
            receivers.insert(edge.to.node.clone(), HashMap::new());
190✔
99
        }
190✔
100

101
        let (tx, rx) = bounded(channel_buf_sz);
202✔
102
        // let (tx, rx) = match dag.nodes.get(&edge.from.node).unwrap() {
202✔
103
        //     NodeType::Source(_) => bounded(1),
202✔
104
        //     _ => bounded(channel_buf_sz),
202✔
105
        // };
202✔
106

202✔
107
        let rcv_port: PortHandle = edge.to.port;
202✔
108
        if receivers
202✔
109
            .get(&edge.to.node)
202✔
110
            .unwrap()
202✔
111
            .contains_key(&rcv_port)
202✔
112
        {
×
113
            receivers
×
114
                .get_mut(&edge.to.node)
×
115
                .unwrap()
×
116
                .get_mut(&rcv_port)
×
117
                .unwrap()
×
118
                .push(rx);
×
119
        } else {
202✔
120
            receivers
202✔
121
                .get_mut(&edge.to.node)
202✔
122
                .unwrap()
202✔
123
                .insert(rcv_port, vec![rx]);
202✔
124
        }
202✔
125

×
126
        let snd_port: PortHandle = edge.from.port;
202✔
127
        if senders
202✔
128
            .get(&edge.from.node)
202✔
129
            .unwrap()
202✔
130
            .contains_key(&snd_port)
202✔
131
        {
4✔
132
            senders
4✔
133
                .get_mut(&edge.from.node)
4✔
134
                .unwrap()
4✔
135
                .get_mut(&snd_port)
4✔
136
                .unwrap()
4✔
137
                .push(tx);
4✔
138
        } else {
198✔
139
            senders
198✔
140
                .get_mut(&edge.from.node)
198✔
141
                .unwrap()
198✔
142
                .insert(snd_port, vec![tx]);
198✔
143
        }
198✔
144
    }
145

×
146
    (senders, receivers)
58✔
147
}
58✔
148

×
149
pub(crate) fn build_receivers_lists(
579✔
150
    receivers: HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>,
579✔
151
) -> (Vec<PortHandle>, Vec<Receiver<ExecutorOperation>>) {
579✔
152
    let mut handles_ls: Vec<PortHandle> = Vec::new();
579✔
153
    let mut receivers_ls: Vec<Receiver<ExecutorOperation>> = Vec::new();
579✔
154
    for e in receivers {
1,170✔
155
        for r in e.1 {
1,182✔
156
            receivers_ls.push(r);
591✔
157
            handles_ls.push(e.0);
591✔
158
        }
591✔
159
    }
×
160
    (handles_ls, receivers_ls)
579✔
161
}
579✔
162

×
163
fn get_inputs_for_output(edges: &[Edge], node: &NodeHandle, port: &PortHandle) -> Vec<Endpoint> {
172✔
164
    edges
172✔
165
        .iter()
172✔
166
        .filter(|e| e.from.node == *node && e.from.port == *port)
703✔
167
        .map(|e| e.to.clone())
176✔
168
        .collect()
172✔
169
}
172✔
170

171
const PORT_STATE_KEY: &str = "__PORT_STATE_";
172

173
#[derive(Debug)]
×
174
pub(crate) struct StateOptions {
175
    pub(crate) db: Database,
176
    pub(crate) meta_db: Database,
×
177
    pub(crate) typ: OutputPortType,
×
178
}
×
179

×
180
pub(crate) fn create_ports_databases_and_fill_downstream_record_readers(
585✔
181
    handle: &NodeHandle,
585✔
182
    edges: &[Edge],
585✔
183
    mut env: LmdbEnvironmentManager,
585✔
184
    output_ports: &[OutputPortDef],
585✔
185
    record_stores: &mut HashMap<NodeHandle, HashMap<PortHandle, Box<dyn RecordReader>>>,
585✔
186
) -> Result<(SharedTransaction, HashMap<PortHandle, StateOptions>), ExecutionError> {
585✔
187
    let mut port_databases: Vec<Option<StateOptions>> = Vec::new();
585✔
188
    for port in output_ports {
1,180✔
189
        let opt = match &port.typ {
595✔
190
            OutputPortType::Stateless => None,
423✔
191
            typ => {
172✔
192
                let db =
172✔
193
                    env.open_database(&format!("{}_{}", PORT_STATE_KEY, port.handle), false)?;
172✔
194
                let meta_db =
172✔
195
                    env.open_database(&format!("{}_{}_META", PORT_STATE_KEY, port.handle), false)?;
172✔
196
                Some(StateOptions {
172✔
197
                    db,
172✔
198
                    meta_db,
172✔
199
                    typ: typ.clone(),
172✔
200
                })
172✔
201
            }
202
        };
×
203
        port_databases.push(opt);
595✔
204
    }
×
205

×
206
    let master_tx = env.create_txn()?;
585✔
207

×
208
    for (state_options, port) in port_databases.iter().zip(output_ports.iter()) {
595✔
209
        if let Some(state_options) = state_options {
595✔
210
            for endpoint in get_inputs_for_output(edges, handle, &port.handle) {
176✔
211
                let record_reader: Box<dyn RecordReader> = match port.typ {
168✔
212
                    OutputPortType::AutogenRowKeyLookup => Box::new(
1✔
213
                        AutogenRowKeyLookupRecordReader::new(master_tx.clone(), state_options.db),
1✔
214
                    ),
1✔
215
                    OutputPortType::StatefulWithPrimaryKeyLookup { .. } => Box::new(
167✔
216
                        PrimaryKeyValueLookupRecordReader::new(master_tx.clone(), state_options.db),
167✔
217
                    ),
167✔
218
                    OutputPortType::Stateless => panic!("Internal error: Invalid port type"),
×
219
                };
×
220

×
221
                record_stores
168✔
222
                    .get_mut(&endpoint.node)
168✔
223
                    .expect("Record store HashMap must be created for every node upfront")
168✔
224
                    .insert(endpoint.port, record_reader);
168✔
225
            }
×
226
        }
423✔
227
    }
×
228

×
229
    let port_databases = output_ports
585✔
230
        .iter()
585✔
231
        .zip(port_databases.into_iter())
585✔
232
        .flat_map(|(output_port, state_option)| {
595✔
233
            state_option.map(|state_option| (output_port.handle, state_option))
595✔
234
        })
595✔
235
        .collect();
585✔
236

585✔
237
    Ok((master_tx, port_databases))
585✔
238
}
585✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc