• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gluesql / gluesql / 22275856923

22 Feb 2026 11:01AM UTC coverage: 98.17% (+0.09%) from 98.085%
22275856923

Pull #1883

github

web-flow
Merge 6026cc70f into 80a446e87
Pull Request #1883: Add patch coverage diff summary to PR coverage bot comment

21 of 29 new or added lines in 11 files covered. (72.41%)

67 existing lines in 24 files now uncovered.

42750 of 43547 relevant lines covered (98.17%)

66356.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.45
/core/src/executor/join.rs
1
use {
2
    super::fetch::{fetch_relation_columns, fetch_relation_rows},
3
    crate::{
4
        ast::{
5
            Expr, Join as AstJoin, JoinConstraint, JoinExecutor as AstJoinExecutor,
6
            JoinOperator as AstJoinOperator, TableFactor,
7
        },
8
        data::{Key, Row, Value, get_alias},
9
        executor::{context::RowContext, evaluate::evaluate, filter::check_expr},
10
        result::Result,
11
        store::GStore,
12
    },
13
    futures::{
14
        future,
15
        stream::{self, Stream, StreamExt, TryStreamExt, empty, once},
16
    },
17
    itertools::Itertools,
18
    std::{borrow::Cow, collections::HashMap, pin::Pin, sync::Arc},
19
    utils::OrStream,
20
};
21

22
pub struct Join<'a, T: GStore> {
23
    storage: &'a T,
24
    join_clauses: &'a [AstJoin],
25
    filter_context: Option<Arc<RowContext<'a>>>,
26
}
27

28
type JoinItem<'a> = Arc<RowContext<'a>>;
29
type Joined<'a> = Pin<Box<dyn Stream<Item = Result<JoinItem<'a>>> + Send + 'a>>;
30

31
impl<'a, T: GStore> Join<'a, T> {
32
    pub fn new(
23,494✔
33
        storage: &'a T,
23,494✔
34
        join_clauses: &'a [AstJoin],
23,494✔
35
        filter_context: Option<Arc<RowContext<'a>>>,
23,494✔
36
    ) -> Self {
23,494✔
37
        Self {
23,494✔
38
            storage,
23,494✔
39
            join_clauses,
23,494✔
40
            filter_context,
23,494✔
41
        }
23,494✔
42
    }
23,494✔
43

44
    pub async fn apply(
23,494✔
45
        self,
23,494✔
46
        rows: impl Stream<Item = Result<RowContext<'a>>> + Send + 'a,
23,494✔
47
    ) -> Result<Joined<'a>> {
23,494✔
48
        let init_rows: Joined = Box::pin(rows.map(|row| row.map(Arc::new)));
70,528✔
49

50
        stream::iter(self.join_clauses)
23,494✔
51
            .map(Ok)
23,494✔
52
            .try_fold(init_rows, |rows, join_clause| {
23,494✔
53
                let filter_context = self.filter_context.as_ref().map(Arc::clone);
1,440✔
54

55
                async move { join(self.storage, filter_context, join_clause, rows).await }
1,440✔
56
            })
1,440✔
57
            .await
58
    }
23,494✔
59
}
60

61
async fn join<'a, T: GStore>(
1,440✔
62
    storage: &'a T,
1,440✔
63
    filter_context: Option<Arc<RowContext<'a>>>,
1,440✔
64
    ast_join: &'a AstJoin,
1,440✔
65
    left_rows: impl Stream<Item = Result<JoinItem<'a>>> + Send + 'a,
1,440✔
66
) -> Result<Joined<'a>> {
1,440✔
67
    let AstJoin {
68
        relation,
1,440✔
69
        join_operator,
1,440✔
70
        join_executor,
1,440✔
71
    } = ast_join;
1,440✔
72

73
    let table_alias = get_alias(relation);
1,440✔
74
    let join_executor = JoinExecutor::new(
1,440✔
75
        storage,
1,440✔
76
        relation,
1,440✔
77
        filter_context.as_ref().map(Arc::clone),
1,440✔
78
        join_executor,
1,440✔
79
    )
1,440✔
80
    .await
81
    .map(Arc::new)?;
1,440✔
82

83
    let (join_operator, where_clause) = match join_operator {
1,440✔
84
        AstJoinOperator::Inner(JoinConstraint::None) => (JoinOperator::Inner, None),
697✔
85
        AstJoinOperator::Inner(JoinConstraint::On(where_clause)) => {
224✔
86
            (JoinOperator::Inner, Some(where_clause))
224✔
87
        }
88
        AstJoinOperator::LeftOuter(JoinConstraint::None) => (JoinOperator::LeftOuter, None),
491✔
89
        AstJoinOperator::LeftOuter(JoinConstraint::On(where_clause)) => {
28✔
90
            (JoinOperator::LeftOuter, Some(where_clause))
28✔
91
        }
92
    };
93

94
    let columns: Arc<[String]> = Arc::from(fetch_relation_columns(storage, relation).await?);
1,440✔
95
    let rows = left_rows.and_then(move |project_context| {
15,081✔
96
        let init_context = {
15,081✔
97
            let columns = Arc::clone(&columns);
15,081✔
98
            let init_row = Row {
15,081✔
99
                values: columns.iter().map(|_| Value::Null).collect(),
15,081✔
100
                columns,
15,081✔
101
            };
102

103
            Arc::new(RowContext::new(
15,081✔
104
                table_alias,
15,081✔
105
                Cow::Owned(init_row),
15,081✔
106
                Some(Arc::clone(&project_context)),
15,081✔
107
            ))
108
        };
109
        let filter_context = filter_context.as_ref().map(Arc::clone);
15,081✔
110
        let join_executor = Arc::clone(&join_executor);
15,081✔
111

112
        async move {
15,081✔
113
            #[derive(futures_enum::Stream)]
114
            enum Rows<I1, I2, I3> {
115
                NestedLoop(I1),
116
                Hash(I2),
117
                Empty(I3),
118
            }
119

120
            let filter_context = match filter_context {
15,081✔
121
                Some(filter_context) => Arc::new(RowContext::concat(
5,684✔
122
                    Arc::clone(&project_context),
5,684✔
123
                    Arc::clone(&filter_context),
5,684✔
124
                )),
125
                None => Arc::clone(&project_context),
9,397✔
126
            };
127
            let filter_context = Some(filter_context);
15,081✔
128
            let rows = match join_executor.as_ref() {
15,081✔
129
                JoinExecutor::NestedLoop => {
130
                    let rows = fetch_relation_rows(storage, relation, filter_context.as_ref())
936✔
131
                        .await?
936✔
132
                        .and_then(|row| future::ok(Cow::Owned(row)))
6,446✔
133
                        .try_filter_map(move |row| {
6,446✔
134
                            check_where_clause(
6,446✔
135
                                storage,
6,446✔
136
                                table_alias,
6,446✔
137
                                filter_context.as_ref().map(Arc::clone),
6,446✔
138
                                Some(Arc::clone(&project_context)),
6,446✔
139
                                where_clause,
6,446✔
140
                                row,
6,446✔
141
                            )
142
                        });
6,446✔
143
                    Rows::NestedLoop(rows)
936✔
144
                }
145
                JoinExecutor::Hash {
146
                    rows_map,
14,145✔
147
                    value_expr,
14,145✔
148
                } => {
149
                    let rows = evaluate(
14,145✔
150
                        storage,
14,145✔
151
                        filter_context.as_ref().map(Arc::clone),
14,145✔
152
                        None,
14,145✔
153
                        value_expr,
14,145✔
154
                    )
14,145✔
155
                    .await
156
                    .map(Key::try_from)?
14,145✔
157
                    .map(|hash_key| rows_map.get(&hash_key))?;
14,145✔
158

159
                    match rows {
14,145✔
160
                        None => Rows::Empty(empty()),
616✔
161
                        Some(rows) => {
13,529✔
162
                            let rows = stream::iter(rows)
13,529✔
163
                                .filter_map(|row| {
14,103✔
164
                                    let filter_context = filter_context.as_ref().map(Arc::clone);
14,103✔
165
                                    let project_context = Some(Arc::clone(&project_context));
14,103✔
166

167
                                    async {
14,103✔
168
                                        check_where_clause(
14,103✔
169
                                            storage,
14,103✔
170
                                            table_alias,
14,103✔
171
                                            filter_context,
14,103✔
172
                                            project_context,
14,103✔
173
                                            where_clause,
14,103✔
174
                                            Cow::Borrowed(row),
14,103✔
175
                                        )
14,103✔
176
                                        .await
177
                                        .transpose()
14,103✔
178
                                    }
14,103✔
179
                                })
14,103✔
180
                                .collect::<Vec<_>>()
13,529✔
181
                                .await;
182

183
                            Rows::Hash(stream::iter(rows))
13,529✔
184
                        }
185
                    }
186
                }
187
            };
188

189
            let rows: Joined = match join_operator {
15,081✔
190
                JoinOperator::Inner => Box::pin(rows),
8,174✔
191
                JoinOperator::LeftOuter => {
192
                    let init_rows = once(async { Ok(init_context) });
6,907✔
193

194
                    Box::pin(OrStream::new(rows, init_rows))
6,907✔
195
                }
196
            };
197

198
            Ok(rows)
15,081✔
199
        }
15,081✔
200
    });
15,081✔
201

202
    Ok(Box::pin(rows.try_flatten()))
1,440✔
203
}
1,440✔
204

205
#[derive(Copy, Clone)]
206
enum JoinOperator {
207
    Inner,
208
    LeftOuter,
209
}
210

211
enum JoinExecutor<'a> {
212
    NestedLoop,
213
    Hash {
214
        rows_map: HashMap<Key, Vec<Row>>,
215
        value_expr: &'a Expr,
216
    },
217
}
218

219
impl<'a> JoinExecutor<'a> {
220
    async fn new<T: GStore>(
1,440✔
221
        storage: &'a T,
1,440✔
222
        relation: &TableFactor,
1,440✔
223
        filter_context: Option<Arc<RowContext<'a>>>,
1,440✔
224
        ast_join_executor: &'a AstJoinExecutor,
1,440✔
225
    ) -> Result<JoinExecutor<'a>> {
1,440✔
226
        let (key_expr, value_expr, where_clause) = match ast_join_executor {
1,440✔
227
            AstJoinExecutor::NestedLoop => return Ok(Self::NestedLoop),
277✔
228
            AstJoinExecutor::Hash {
229
                key_expr,
1,163✔
230
                value_expr,
1,163✔
231
                where_clause,
1,163✔
232
            } => (key_expr, value_expr, where_clause),
1,163✔
233
        };
234

235
        let rows_map = fetch_relation_rows(storage, relation, filter_context.as_ref())
1,163✔
236
            .await?
1,163✔
237
            .try_filter_map(|row| {
10,183✔
238
                let filter_context = filter_context.as_ref().map(Arc::clone);
10,183✔
239

240
                async move {
10,183✔
241
                    let filter_context = Arc::new(RowContext::new(
10,183✔
242
                        get_alias(relation),
10,183✔
243
                        Cow::Borrowed(&row),
10,183✔
244
                        filter_context,
10,183✔
245
                    ));
246

247
                    let hash_key: Key =
10,183✔
248
                        evaluate(storage, Some(Arc::clone(&filter_context)), None, key_expr)
10,183✔
249
                            .await?
10,183✔
250
                            .try_into()?;
10,183✔
251

252
                    if matches!(hash_key, Key::None) {
10,183✔
UNCOV
253
                        return Ok(None);
×
254
                    }
10,183✔
255

256
                    match where_clause {
10,183✔
257
                        Some(expr) => check_expr(storage, Some(filter_context), None, expr)
280✔
258
                            .await
259
                            .map(|pass| pass.then_some((hash_key, row))),
280✔
260
                        None => Ok(Some((hash_key, row))),
9,903✔
261
                    }
262
                }
10,183✔
263
            })
10,183✔
264
            .try_collect::<Vec<_>>()
1,163✔
265
            .await?
1,163✔
266
            .into_iter()
1,163✔
267
            .into_group_map();
1,163✔
268
        Ok(Self::Hash {
1,163✔
269
            rows_map,
1,163✔
270
            value_expr,
1,163✔
271
        })
1,163✔
272
    }
1,440✔
273
}
274

275
async fn check_where_clause<'a, T: GStore>(
20,549✔
276
    storage: &'a T,
20,549✔
277
    table_alias: &'a str,
20,549✔
278
    filter_context: Option<Arc<RowContext<'a>>>,
20,549✔
279
    project_context: Option<Arc<RowContext<'a>>>,
20,549✔
280
    where_clause: Option<&'a Expr>,
20,549✔
281
    row: Cow<'_, Row>,
20,549✔
282
) -> Result<Option<Arc<RowContext<'a>>>> {
20,549✔
283
    let filter_context = RowContext::new(table_alias, Cow::Borrowed(&row), filter_context);
20,549✔
284
    let filter_context = Some(Arc::new(filter_context));
20,549✔
285

286
    match where_clause {
20,549✔
287
        Some(expr) => check_expr(storage, filter_context, None, expr).await?,
5,026✔
288
        None => true,
15,523✔
289
    }
290
    .then(|| RowContext::new(table_alias, Cow::Owned(row.into_owned()), project_context))
20,535✔
291
    .map(Arc::new)
20,535✔
292
    .map(Ok)
20,535✔
293
    .transpose()
20,535✔
294
}
20,549✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc