• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gluesql / gluesql / 27895806754

21 Jun 2026 06:20AM UTC coverage: 98.637% (-0.07%) from 98.711%
27895806754

push

github

web-flow
Convert core and native storages to sync execution (#1928)

GlueSQL's async support has mostly been driven by browser-facing storage constraints such as IndexedDB, rather than by the core execution model itself.

That async boundary ended up spreading through the planner, executor, storage traits, CLI, examples, tests, and native storage implementations. For Rust users and native storages, this added futures/stream plumbing and async dependencies even when the underlying work was synchronous or backed by synchronous APIs.

With the JavaScript and Python packages being split out of this workspace, this PR makes the Rust core synchronous again and keeps browser/package-specific async concerns out of the core storage contract.

2100 of 2147 new or added lines in 141 files covered. (97.81%)

18 existing lines in 9 files now uncovered.

44000 of 44608 relevant lines covered (98.64%)

77908.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.11
/core/src/executor/join.rs
1
use {
2
    super::fetch::{fetch_relation_columns, fetch_relation_rows},
3
    crate::{
4
        data::{Key, Row, Value},
5
        executor::{context::RowContext, evaluate::evaluate, filter::check_expr},
6
        plan::{
7
            ExprPlan, JoinConstraintPlan, JoinExecutorPlan, JoinOperatorPlan, JoinPlan,
8
            TableFactorPlan,
9
        },
10
        result::Result,
11
        store::GStore,
12
    },
13
    itertools::Itertools,
14
    std::{borrow::Cow, collections::HashMap, rc::Rc},
15
};
16

17
pub struct Join<'a, T: GStore> {
18
    storage: &'a T,
19
    join_clauses: &'a [JoinPlan],
20
    filter_context: Option<Rc<RowContext<'a>>>,
21
}
22

23
type JoinItem<'a> = Rc<RowContext<'a>>;
24
type Joined<'a> = Box<dyn Iterator<Item = Result<JoinItem<'a>>> + 'a>;
25
type JoinInput<'a> = Box<dyn Iterator<Item = Result<RowContext<'a>>> + 'a>;
26

27
struct LeftOuter<'a> {
28
    rows: Joined<'a>,
29
    init: Option<JoinItem<'a>>,
30
    matched: bool,
31
}
32

33
impl<'a> LeftOuter<'a> {
34
    fn new(rows: Joined<'a>, init: JoinItem<'a>) -> Self {
628,537✔
35
        Self {
628,537✔
36
            rows,
628,537✔
37
            init: Some(init),
628,537✔
38
            matched: false,
628,537✔
39
        }
628,537✔
40
    }
628,537✔
41
}
42

43
impl<'a> Iterator for LeftOuter<'a> {
44
    type Item = Result<JoinItem<'a>>;
45

46
    fn next(&mut self) -> Option<Self::Item> {
1,282,554✔
47
        match self.rows.next() {
1,282,554✔
48
            Some(item) => {
606,879✔
49
                self.matched = true;
606,879✔
50
                Some(item)
606,879✔
51
            }
52
            None if !self.matched => self.init.take().map(Ok),
94,276✔
53
            None => None,
581,399✔
54
        }
55
    }
1,282,554✔
56
}
57

58
impl<'a, T: GStore> Join<'a, T> {
59
    pub fn new(
23,704✔
60
        storage: &'a T,
23,704✔
61
        join_clauses: &'a [JoinPlan],
23,704✔
62
        filter_context: Option<Rc<RowContext<'a>>>,
23,704✔
63
    ) -> Self {
23,704✔
64
        Self {
23,704✔
65
            storage,
23,704✔
66
            join_clauses,
23,704✔
67
            filter_context,
23,704✔
68
        }
23,704✔
69
    }
23,704✔
70

71
    pub fn apply(self, rows: JoinInput<'a>) -> Result<Joined<'a>> {
23,704✔
72
        let mut rows: Joined = Box::new(rows.map(|row| row.map(Rc::new)));
71,042✔
73

74
        for join_clause in self.join_clauses {
23,704✔
75
            rows = join(
1,454✔
76
                self.storage,
1,454✔
77
                self.filter_context.as_ref().map(Rc::clone),
1,454✔
78
                join_clause,
1,454✔
79
                rows,
1,454✔
NEW
80
            )?;
×
81
        }
82

83
        Ok(rows)
23,704✔
84
    }
23,704✔
85
}
86

87
fn join<'a, T: GStore>(
1,454✔
88
    storage: &'a T,
1,454✔
89
    filter_context: Option<Rc<RowContext<'a>>>,
1,454✔
90
    join_plan: &'a JoinPlan,
1,454✔
91
    left_rows: Joined<'a>,
1,454✔
92
) -> Result<Joined<'a>> {
1,454✔
93
    let JoinPlan {
94
        relation,
1,454✔
95
        join_operator,
1,454✔
96
        join_executor,
1,454✔
97
    } = join_plan;
1,454✔
98

99
    let table_alias = relation.alias_name();
1,454✔
100
    let join_executor =
1,454✔
101
        JoinExecutor::new(storage, relation, filter_context.as_ref(), join_executor)?;
1,454✔
102

103
    let (join_operator, where_clause) = match join_operator {
1,454✔
104
        JoinOperatorPlan::Inner(JoinConstraintPlan::None) => (JoinOperator::Inner, None),
697✔
105
        JoinOperatorPlan::Inner(JoinConstraintPlan::On(where_clause)) => {
238✔
106
            (JoinOperator::Inner, Some(where_clause))
238✔
107
        }
108
        JoinOperatorPlan::LeftOuter(JoinConstraintPlan::None) => (JoinOperator::LeftOuter, None),
491✔
109
        JoinOperatorPlan::LeftOuter(JoinConstraintPlan::On(where_clause)) => {
28✔
110
            (JoinOperator::LeftOuter, Some(where_clause))
28✔
111
        }
112
    };
113

114
    let columns: Rc<[String]> = Rc::from(fetch_relation_columns(storage, relation)?);
1,454✔
115
    let rows = left_rows.flat_map(move |project_context| {
15,095✔
116
        let project_context = match project_context {
15,095✔
117
            Ok(project_context) => project_context,
15,095✔
NEW
118
            Err(error) => return Box::new(std::iter::once(Err(error))) as Joined<'a>,
×
119
        };
120

121
        let init_context = {
15,095✔
122
            let columns = Rc::clone(&columns);
15,095✔
123
            let init_row = Row {
15,095✔
124
                values: columns.iter().map(|_| Value::Null).collect(),
15,095✔
125
                columns,
15,095✔
126
            };
127

128
            Rc::new(RowContext::new(
15,095✔
129
                table_alias,
15,095✔
130
                Cow::Owned(init_row),
15,095✔
131
                Some(Rc::clone(&project_context)),
15,095✔
132
            ))
133
        };
134

135
        let row_filter_context = match filter_context.as_ref() {
15,095✔
136
            Some(filter_context) => Rc::new(RowContext::concat(
5,684✔
137
                Rc::clone(&project_context),
5,684✔
138
                Rc::clone(filter_context),
5,684✔
139
            )),
140
            None => Rc::clone(&project_context),
9,411✔
141
        };
142
        let row_filter_context = Some(row_filter_context);
15,095✔
143

144
        let rows: Joined<'a> = match &join_executor {
15,095✔
145
            JoinExecutor::NestedLoop => {
146
                let rows = match fetch_relation_rows(storage, relation, row_filter_context.as_ref())
950✔
147
                {
148
                    Ok(rows) => rows,
950✔
NEW
149
                    Err(error) => return Box::new(std::iter::once(Err(error))) as Joined<'a>,
×
150
                };
151
                Box::new(rows.filter_map(move |row| {
6,460✔
152
                    let row = match row {
6,460✔
153
                        Ok(row) => row,
6,460✔
NEW
154
                        Err(error) => return Some(Err(error)),
×
155
                    };
156

157
                    match check_where_clause(
6,460✔
158
                        storage,
6,460✔
159
                        table_alias,
6,460✔
160
                        row_filter_context.as_ref().map(Rc::clone),
6,460✔
161
                        Some(Rc::clone(&project_context)),
6,460✔
162
                        where_clause,
6,460✔
163
                        Cow::Owned(row),
6,460✔
164
                    ) {
165
                        Ok(Some(row)) => Some(Ok(row)),
3,142✔
166
                        Ok(None) => None,
3,304✔
167
                        Err(error) => Some(Err(error)),
14✔
168
                    }
169
                }))
6,460✔
170
            }
171
            JoinExecutor::Hash {
172
                rows_map,
14,145✔
173
                value_expr,
14,145✔
174
            } => {
175
                let rows = match evaluate(storage, row_filter_context.as_ref(), None, value_expr)
14,145✔
176
                    .and_then(|evaluated| {
14,145✔
177
                        Key::try_from(evaluated).map(|hash_key| rows_map.get(&hash_key))
14,145✔
178
                    }) {
14,145✔
179
                    Ok(rows) => rows,
14,145✔
NEW
180
                    Err(error) => return Box::new(std::iter::once(Err(error))) as Joined<'a>,
×
181
                };
182

183
                match rows {
14,145✔
184
                    Some(rows) => {
13,529✔
185
                        let rows =
13,529✔
186
                            rows.clone().into_iter().filter_map(
13,529✔
187
                                move |row| match check_where_clause(
14,103✔
188
                                    storage,
14,103✔
189
                                    table_alias,
14,103✔
190
                                    row_filter_context.as_ref().map(Rc::clone),
14,103✔
191
                                    Some(Rc::clone(&project_context)),
14,103✔
192
                                    where_clause,
14,103✔
193
                                    Cow::Owned(row),
14,103✔
194
                                ) {
195
                                    Ok(Some(row)) => Some(Ok(row)),
13,571✔
196
                                    Ok(None) => None,
532✔
NEW
197
                                    Err(error) => Some(Err(error)),
×
198
                                },
14,103✔
199
                            );
200

201
                        Box::new(rows)
13,529✔
202
                    }
203
                    None => Box::new(std::iter::empty()),
616✔
204
                }
205
            }
206
        };
207

208
        match join_operator {
15,095✔
209
            JoinOperator::Inner => rows,
8,188✔
210
            JoinOperator::LeftOuter => Box::new(LeftOuter::new(rows, init_context)),
6,907✔
211
        }
212
    });
15,095✔
213

214
    Ok(Box::new(rows))
1,454✔
215
}
1,454✔
216

217
#[derive(Copy, Clone)]
218
enum JoinOperator {
219
    Inner,
220
    LeftOuter,
221
}
222

223
enum JoinExecutor<'a> {
224
    NestedLoop,
225
    Hash {
226
        rows_map: HashMap<Key, Vec<Row>>,
227
        value_expr: &'a ExprPlan,
228
    },
229
}
230

231
impl<'a> JoinExecutor<'a> {
232
    fn new<T: GStore>(
1,454✔
233
        storage: &'a T,
1,454✔
234
        relation: &TableFactorPlan,
1,454✔
235
        filter_context: Option<&Rc<RowContext<'a>>>,
1,454✔
236
        join_executor: &'a JoinExecutorPlan,
1,454✔
237
    ) -> Result<JoinExecutor<'a>> {
1,454✔
238
        let (key_expr, value_expr, where_clause) = match join_executor {
1,454✔
239
            JoinExecutorPlan::NestedLoop => return Ok(Self::NestedLoop),
291✔
240
            JoinExecutorPlan::Hash {
241
                key_expr,
1,163✔
242
                value_expr,
1,163✔
243
                where_clause,
1,163✔
244
            } => (key_expr, value_expr, where_clause),
1,163✔
245
        };
246

247
        let mut rows = Vec::new();
1,163✔
248
        for row in fetch_relation_rows(storage, relation, filter_context)? {
10,183✔
249
            let row = row?;
10,183✔
250
            let filter_context = Rc::new(RowContext::new(
10,183✔
251
                relation.alias_name(),
10,183✔
252
                Cow::Borrowed(&row),
10,183✔
253
                filter_context.cloned(),
10,183✔
254
            ));
255

256
            let hash_key: Key =
10,183✔
257
                evaluate(storage, Some(&filter_context), None, key_expr)?.try_into()?;
10,183✔
258

259
            if matches!(hash_key, Key::None) {
10,183✔
NEW
260
                continue;
×
261
            }
10,183✔
262

263
            let pass = match where_clause {
10,183✔
264
                Some(expr) => check_expr(storage, Some(&filter_context), None, expr)?,
280✔
265
                None => true,
9,903✔
266
            };
267

268
            if pass {
10,183✔
269
                rows.push((hash_key, row));
9,973✔
270
            }
9,973✔
271
        }
272

273
        Ok(Self::Hash {
1,163✔
274
            rows_map: rows.into_iter().into_group_map(),
1,163✔
275
            value_expr,
1,163✔
276
        })
1,163✔
277
    }
1,454✔
278
}
279

280
fn check_where_clause<'a, T: GStore>(
20,563✔
281
    storage: &'a T,
20,563✔
282
    table_alias: &'a str,
20,563✔
283
    filter_context: Option<Rc<RowContext<'a>>>,
20,563✔
284
    project_context: Option<Rc<RowContext<'a>>>,
20,563✔
285
    where_clause: Option<&'a ExprPlan>,
20,563✔
286
    row: Cow<'_, Row>,
20,563✔
287
) -> Result<Option<Rc<RowContext<'a>>>> {
20,563✔
288
    let filter_context = RowContext::new(table_alias, Cow::Borrowed(&row), filter_context);
20,563✔
289
    let filter_context = Some(Rc::new(filter_context));
20,563✔
290

291
    match where_clause {
20,563✔
292
        Some(expr) => check_expr(storage, filter_context.as_ref(), None, expr)?,
5,040✔
293
        None => true,
15,523✔
294
    }
295
    .then(|| RowContext::new(table_alias, Cow::Owned(row.into_owned()), project_context))
20,549✔
296
    .map(Rc::new)
20,549✔
297
    .map(Ok)
20,549✔
298
    .transpose()
20,549✔
299
}
20,563✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc