• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gluesql / gluesql / 22275856923

22 Feb 2026 11:01AM UTC coverage: 98.17% (+0.09%) from 98.085%
22275856923

Pull #1883

github

web-flow
Merge 6026cc70f into 80a446e87
Pull Request #1883: Add patch coverage diff summary to PR coverage bot comment

21 of 29 new or added lines in 11 files covered. (72.41%)

67 existing lines in 24 files now uncovered.

42750 of 43547 relevant lines covered (98.17%)

66356.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.73
/core/src/executor/fetch.rs
1
use {
2
    super::{context::RowContext, evaluate::evaluate_stateless, filter::check_expr},
3
    crate::{
4
        ast::{
5
            ColumnDef, ColumnUniqueOption, Dictionary, Expr, IndexItem, Join, Projection, Query,
6
            Select, SelectItem, SetExpr, TableAlias, TableFactor, TableWithJoins, ToSql,
7
            ToSqlUnquoted, Values,
8
        },
9
        data::{Key, Row, SCHEMALESS_DOC_COLUMN, Value, get_alias, get_index},
10
        executor::{evaluate::evaluate, select::select},
11
        result::Result,
12
        store::GStore,
13
    },
14
    async_recursion::async_recursion,
15
    futures::{
16
        future,
17
        stream::{self, Stream, TryStreamExt},
18
    },
19
    serde::Serialize,
20
    std::{borrow::Cow, collections::BTreeMap, fmt::Debug, iter, sync::Arc},
21
    thiserror::Error as ThisError,
22
};
23

24
#[derive(ThisError, Serialize, Debug, PartialEq, Eq)]
25
pub enum FetchError {
26
    #[error("table not found: {0}")]
27
    TableNotFound(String),
28

29
    #[error("table alias not found: {0}")]
30
    TableAliasNotFound(String),
31

32
    #[error("SERIES has wrong size: {0}")]
33
    SeriesSizeWrong(i64),
34

35
    #[error("table '{0}' has {1} columns available but {2} column aliases specified")]
36
    TooManyColumnAliases(String, usize, usize),
37

38
    #[error("unreachable")]
39
    Unreachable,
40
}
41

42
pub async fn fetch<'a, T: GStore>(
1,044✔
43
    storage: &'a T,
1,044✔
44
    table_name: &'a str,
1,044✔
45
    columns: Arc<[String]>,
1,044✔
46
    where_clause: Option<&'a Expr>,
1,044✔
47
) -> Result<impl Stream<Item = Result<(Key, Row)>> + 'a> {
1,044✔
48
    let rows = storage
1,044✔
49
        .scan_data(table_name)
1,044✔
50
        .await?
1,044✔
51
        .try_filter_map(move |(key, values)| {
2,859✔
52
            let row = Row {
2,859✔
53
                columns: Arc::clone(&columns),
2,859✔
54
                values,
2,859✔
55
            };
2,859✔
56

57
            async move {
2,859✔
58
                let Some(expr) = where_clause else {
2,859✔
59
                    return Ok(Some((key, row)));
1,314✔
60
                };
61

62
                let context = RowContext::new(table_name, Cow::Borrowed(&row), None);
1,545✔
63

64
                check_expr(storage, Some(Arc::new(context)), None, expr)
1,545✔
65
                    .await
66
                    .map(|pass| pass.then_some((key, row)))
1,545✔
67
            }
2,859✔
68
        });
2,859✔
69

70
    Ok(rows)
1,044✔
71
}
1,044✔
72

73
#[derive(futures_enum::Stream)]
74
pub enum Rows<I1, I2, I3, I4> {
75
    Derived(I1),
76
    Table(I2),
77
    Series(I3),
78
    Dictionary(I4),
79
}
80

81
pub async fn fetch_relation_rows<'a, T: GStore>(
25,803✔
82
    storage: &'a T,
25,803✔
83
    table_factor: &'a TableFactor,
25,803✔
84
    filter_context: Option<&Arc<RowContext<'a>>>,
25,803✔
85
) -> Result<impl Stream<Item = Result<Row>> + 'a> {
25,803✔
86
    let columns = Arc::from(fetch_relation_columns(storage, table_factor).await?);
25,803✔
87

88
    match table_factor {
25,611✔
89
        TableFactor::Derived { subquery, .. } => {
770✔
90
            let filter_context = filter_context.map(Arc::clone);
770✔
91
            let rows = select(storage, subquery, filter_context)
770✔
92
                .await?
770✔
93
                .map_ok(move |row| Row {
770✔
94
                    columns: Arc::clone(&columns),
2,184✔
95
                    values: row.values,
2,184✔
96
                });
2,184✔
97

98
            Ok(Rows::Derived(rows))
770✔
99
        }
100
        TableFactor::Table { name, .. } => {
20,133✔
101
            let rows = {
20,129✔
102
                #[derive(futures_enum::Stream)]
103
                enum Rows<I1, I2, I3, I4> {
104
                    Indexed(I1),
105
                    PrimaryKey(I2),
106
                    PrimaryKeyEmpty(I3),
107
                    FullScan(I4),
108
                }
109

110
                match get_index(table_factor) {
20,133✔
111
                    Some(IndexItem::NonClustered {
112
                        name: index_name,
98✔
113
                        asc,
98✔
114
                        cmp_expr,
98✔
115
                    }) => {
116
                        let cmp_value = match cmp_expr {
98✔
117
                            Some((op, expr)) => {
91✔
118
                                let evaluated = evaluate(storage, None, None, expr).await?;
91✔
119

120
                                Some((op, evaluated.try_into()?))
91✔
121
                            }
122
                            None => None,
7✔
123
                        };
124

125
                        let rows = storage
98✔
126
                            .scan_indexed_data(name, index_name, *asc, cmp_value)
98✔
127
                            .await?
98✔
128
                            .map_ok(move |(_, values)| Row {
98✔
129
                                columns: Arc::clone(&columns),
227✔
130
                                values,
227✔
131
                            });
227✔
132

133
                        Rows::Indexed(rows)
98✔
134
                    }
135
                    Some(IndexItem::PrimaryKey(expr)) => {
56✔
136
                        let schema = storage
56✔
137
                            .fetch_schema(name)
56✔
138
                            .await?
56✔
139
                            .ok_or(FetchError::Unreachable)?;
56✔
140

141
                        let filter_context = filter_context.map(Arc::clone);
56✔
142
                        let evaluated = evaluate(storage, filter_context, None, expr).await?;
56✔
143

144
                        let column_def = schema
56✔
145
                            .column_defs
56✔
146
                            .as_ref()
56✔
147
                            .and_then(|column_defs| {
56✔
148
                                column_defs.iter().find(|column_def| {
56✔
149
                                    column_def.unique.map(|u| u.is_primary) == Some(true)
56✔
150
                                })
56✔
151
                            })
56✔
152
                            .ok_or(FetchError::Unreachable)?;
56✔
153

154
                        let value =
56✔
155
                            evaluated.try_into_value(&column_def.data_type, column_def.nullable)?;
56✔
156
                        let key = Key::try_from(value)?;
56✔
157

158
                        match storage.fetch_data(name, &key).await? {
56✔
159
                            Some(values) => {
56✔
160
                                let row = Row {
56✔
161
                                    columns: Arc::clone(&columns),
56✔
162
                                    values,
56✔
163
                                };
56✔
164

165
                                Rows::PrimaryKey(stream::once(future::ready(Ok(row))))
56✔
166
                            }
UNCOV
167
                            None => Rows::PrimaryKeyEmpty(stream::empty()),
×
168
                        }
169
                    }
170
                    _ => {
171
                        let rows = storage
19,979✔
172
                            .scan_data(name)
19,979✔
173
                            .await?
19,979✔
174
                            .map_ok(move |(_, values)| Row {
19,975✔
175
                                columns: Arc::clone(&columns),
79,149✔
176
                                values,
79,149✔
177
                            });
79,149✔
178

179
                        Rows::FullScan(rows)
19,975✔
180
                    }
181
                }
182
            };
183

184
            Ok(Rows::Table(rows))
20,129✔
185
        }
186
        TableFactor::Series { size, .. } => {
4,592✔
187
            let value: Value = evaluate_stateless(None, size).await?.try_into()?;
4,592✔
188
            let size: i64 = value.try_into()?;
4,592✔
189
            let size = match size {
4,592✔
190
                n if n >= 0 => size,
4,592✔
191
                n => return Err(FetchError::SeriesSizeWrong(n).into()),
14✔
192
            };
193

194
            let columns = Arc::from(vec!["N".to_owned()]);
4,578✔
195
            let rows = (1..=size).map(move |v| {
5,260✔
196
                Ok(Row {
5,260✔
197
                    columns: Arc::clone(&columns),
5,260✔
198
                    values: vec![Value::I64(v)],
5,260✔
199
                })
5,260✔
200
            });
5,260✔
201

202
            Ok(Rows::Series(stream::iter(rows)))
4,578✔
203
        }
204
        TableFactor::Dictionary { dict, .. } => {
116✔
205
            let rows = {
116✔
206
                #[derive(futures_enum::Stream)]
207
                enum Rows<I1, I2, I3, I4> {
208
                    Tables(I1),
209
                    TableColumns(I2),
210
                    Indexes(I3),
211
                    Objects(I4),
212
                }
213

214
                match dict {
116✔
215
                    Dictionary::GlueObjects => {
216
                        let schemas = storage.fetch_all_schemas().await?;
15✔
217
                        let table_metas = storage
15✔
218
                            .scan_table_meta()
15✔
219
                            .await?
15✔
220
                            .collect::<Result<BTreeMap<_, _>>>()?;
15✔
221
                        let rows = schemas.into_iter().flat_map(move |schema| {
35✔
222
                            let meta = table_metas
33✔
223
                                .iter()
33✔
224
                                .find_map(|(table_name, hash_map)| {
33✔
225
                                    (table_name == &schema.table_name).then(|| hash_map.clone())
2✔
226
                                })
2✔
227
                                .unwrap_or_default();
33✔
228

229
                            let table_rows = BTreeMap::from([
33✔
230
                                ("OBJECT_NAME".to_owned(), Value::Str(schema.table_name)),
33✔
231
                                ("OBJECT_TYPE".to_owned(), Value::Str("TABLE".to_owned())),
33✔
232
                            ])
33✔
233
                            .into_iter()
33✔
234
                            .chain(meta)
33✔
235
                            .collect::<BTreeMap<_, _>>();
33✔
236

237
                            let index_rows = schema.indexes.into_iter().map(|index| {
34✔
238
                                BTreeMap::from([
12✔
239
                                    ("OBJECT_NAME".to_owned(), Value::Str(index.name)),
12✔
240
                                    ("OBJECT_TYPE".to_owned(), Value::Str("INDEX".to_owned())),
12✔
241
                                ])
12✔
242
                            });
12✔
243

244
                            iter::once(table_rows).chain(index_rows).map(|hash_map| {
45✔
245
                                let (columns, values): (Vec<_>, Vec<_>) =
45✔
246
                                    hash_map.into_iter().unzip();
45✔
247
                                Ok(Row {
45✔
248
                                    columns: columns.into(),
45✔
249
                                    values,
45✔
250
                                })
45✔
251
                            })
45✔
252
                        });
33✔
253

254
                        Rows::Objects(stream::iter(rows))
15✔
255
                    }
256
                    Dictionary::GlueTables => {
257
                        let schemas = storage.fetch_all_schemas().await?;
83✔
258
                        let rows = schemas.into_iter().map(move |schema| {
141✔
259
                            Ok(Row {
138✔
260
                                columns: Arc::clone(&columns),
138✔
261
                                values: vec![
138✔
262
                                    Value::Str(schema.table_name),
138✔
263
                                    schema.comment.map_or(Value::Null, Value::Str),
138✔
264
                                ],
138✔
265
                            })
138✔
266
                        });
138✔
267

268
                        Rows::Tables(stream::iter(rows))
83✔
269
                    }
270
                    Dictionary::GlueTableColumns => {
271
                        let schemas = storage.fetch_all_schemas().await?;
14✔
272
                        let rows = schemas.into_iter().flat_map(move |schema| {
42✔
273
                            let columns = Arc::clone(&columns);
42✔
274
                            let table_name = schema.table_name;
42✔
275

276
                            schema
42✔
277
                                .column_defs
42✔
278
                                .unwrap_or_default()
42✔
279
                                .into_iter()
42✔
280
                                .enumerate()
42✔
281
                                .map(move |(index, column_def)| {
84✔
282
                                    let values = vec![
84✔
283
                                        Value::Str(table_name.clone()),
84✔
284
                                        Value::Str(column_def.name),
84✔
285
                                        Value::I64(index as i64 + 1),
84✔
286
                                        Value::Bool(column_def.nullable),
84✔
287
                                        column_def.unique.map_or(Value::Null, |unique| {
84✔
288
                                            Value::Str(unique.to_sql())
28✔
289
                                        }),
28✔
290
                                        column_def
84✔
291
                                            .default
84✔
292
                                            .map_or(Value::Null, |expr| Value::Str(expr.to_sql())),
84✔
293
                                        column_def.comment.map_or(Value::Null, Value::Str),
84✔
294
                                    ];
295

296
                                    Ok(Row {
84✔
297
                                        columns: Arc::clone(&columns),
84✔
298
                                        values,
84✔
299
                                    })
84✔
300
                                })
84✔
301
                        });
42✔
302

303
                        Rows::TableColumns(stream::iter(rows))
14✔
304
                    }
305
                    Dictionary::GlueIndexes => {
306
                        let schemas = storage.fetch_all_schemas().await?;
4✔
307
                        let rows = schemas.into_iter().flat_map(move |schema| {
5✔
308
                            let column_defs = schema.column_defs.unwrap_or_default();
5✔
309
                            let primary_column = column_defs.iter().find_map(|column_def| {
11✔
310
                                let ColumnDef { name, unique, .. } = column_def;
11✔
311

312
                                (unique == &Some(ColumnUniqueOption { is_primary: true }))
11✔
313
                                    .then_some(name)
11✔
314
                            });
11✔
315

316
                            let clustered = match primary_column {
5✔
317
                                Some(column_name) => {
1✔
318
                                    let values = vec![
1✔
319
                                        Value::Str(schema.table_name.clone()),
1✔
320
                                        Value::Str("PRIMARY".to_owned()),
1✔
321
                                        Value::Str("BOTH".to_owned()),
1✔
322
                                        Value::Str(column_name.to_owned()),
1✔
323
                                        Value::Bool(true),
1✔
324
                                    ];
325

326
                                    let row = Row {
1✔
327
                                        columns: Arc::clone(&columns),
1✔
328
                                        values,
1✔
329
                                    };
1✔
330

331
                                    vec![Ok(row)]
1✔
332
                                }
333
                                None => Vec::new(),
4✔
334
                            };
335

336
                            let columns = Arc::clone(&columns);
5✔
337
                            let non_clustered = schema.indexes.into_iter().map(move |index| {
11✔
338
                                let values = vec![
11✔
339
                                    Value::Str(schema.table_name.clone()),
11✔
340
                                    Value::Str(index.name),
11✔
341
                                    Value::Str(index.order.to_string()),
11✔
342
                                    Value::Str(index.expr.to_sql_unquoted()),
11✔
343
                                    Value::Bool(false),
11✔
344
                                ];
345

346
                                Ok(Row {
11✔
347
                                    columns: Arc::clone(&columns),
11✔
348
                                    values,
11✔
349
                                })
11✔
350
                            });
11✔
351

352
                            clustered.into_iter().chain(non_clustered)
5✔
353
                        });
5✔
354

355
                        Rows::Indexes(stream::iter(rows))
4✔
356
                    }
357
                }
358
            };
359

360
            Ok(Rows::Dictionary(rows))
116✔
361
        }
362
    }
363
}
25,803✔
364

365
pub async fn fetch_columns<T: GStore>(storage: &T, table_name: &str) -> Result<Vec<String>> {
43,088✔
366
    let columns = storage
43,088✔
367
        .fetch_schema(table_name)
43,088✔
368
        .await?
43,088✔
369
        .ok_or_else(|| FetchError::TableNotFound(table_name.to_owned()))?
43,088✔
370
        .column_defs
371
        .map_or_else(
42,938✔
372
            || vec![SCHEMALESS_DOC_COLUMN.to_owned()],
999✔
373
            |column_defs| {
41,939✔
374
                column_defs
41,939✔
375
                    .into_iter()
41,939✔
376
                    .map(|column_def| column_def.name)
41,939✔
377
                    .collect()
41,939✔
378
            },
41,939✔
379
        );
380

381
    Ok(columns)
42,938✔
382
}
43,088✔
383

384
#[async_recursion]
385
pub async fn fetch_relation_columns<T>(
386
    storage: &T,
387
    table_factor: &TableFactor,
388
) -> Result<Vec<String>>
389
where
390
    T: GStore,
391
{
107,518✔
392
    match table_factor {
107,518✔
393
        TableFactor::Table { name, alias, .. } => {
42,607✔
394
            let columns = fetch_columns(storage, name).await?;
42,607✔
395
            match alias {
5,718✔
396
                None => Ok(columns),
36,739✔
397
                Some(alias) if alias.columns.len() > columns.len() => {
5,718✔
398
                    Err(FetchError::TooManyColumnAliases(
14✔
399
                        name.to_string(),
14✔
400
                        columns.len(),
14✔
401
                        alias.columns.len(),
14✔
402
                    )
14✔
403
                    .into())
14✔
404
                }
405
                Some(alias) => Ok(alias
5,704✔
406
                    .columns
5,704✔
407
                    .iter()
5,704✔
408
                    .cloned()
5,704✔
409
                    .chain(columns[alias.columns.len()..columns.len()].to_vec())
5,704✔
410
                    .collect()),
5,704✔
411
            }
412
        }
413
        TableFactor::Series { .. } => Ok(vec!["N".to_owned()]),
9,198✔
414
        TableFactor::Dictionary { dict, .. } => Ok(match dict {
232✔
415
            Dictionary::GlueObjects => vec![
30✔
416
                "OBJECT_NAME".to_owned(),
30✔
417
                "OBJECT_TYPE".to_owned(),
30✔
418
                "CREATED".to_owned(),
30✔
419
            ],
420
            Dictionary::GlueTables => vec!["TABLE_NAME".to_owned(), "COMMENT".to_owned()],
166✔
421
            Dictionary::GlueTableColumns => vec![
28✔
422
                "TABLE_NAME".to_owned(),
28✔
423
                "COLUMN_NAME".to_owned(),
28✔
424
                "COLUMN_ID".to_owned(),
28✔
425
                "NULLABLE".to_owned(),
28✔
426
                "KEY".to_owned(),
28✔
427
                "DEFAULT".to_owned(),
28✔
428
                "COMMENT".to_owned(),
28✔
429
            ],
430
            Dictionary::GlueIndexes => vec![
8✔
431
                "TABLE_NAME".to_owned(),
8✔
432
                "INDEX_NAME".to_owned(),
8✔
433
                "ORDER".to_owned(),
8✔
434
                "EXPRESSION".to_owned(),
8✔
435
                "UNIQUENESS".to_owned(),
8✔
436
            ],
437
        }),
438
        TableFactor::Derived {
439
            subquery: Query { body, .. },
1,722✔
440
            alias:
441
                TableAlias {
442
                    columns: alias_columns,
1,722✔
443
                    name,
1,722✔
444
                },
445
        } => match body {
1,722✔
446
            SetExpr::Select(statement) => {
1,456✔
447
                let Select {
448
                    from:
449
                        TableWithJoins {
450
                            relation, joins, ..
1,456✔
451
                        },
452
                    projection,
1,456✔
453
                    ..
454
                } = statement.as_ref();
1,456✔
455

456
                let labels = fetch_labels(storage, relation, joins, projection).await?;
1,456✔
457
                if alias_columns.is_empty() {
1,456✔
458
                    Ok(labels)
1,358✔
459
                } else if alias_columns.len() > labels.len() {
98✔
460
                    Err(FetchError::TooManyColumnAliases(
14✔
461
                        name.to_string(),
14✔
462
                        labels.len(),
14✔
463
                        alias_columns.len(),
14✔
464
                    )
14✔
465
                    .into())
14✔
466
                } else {
467
                    Ok(alias_columns
84✔
468
                        .iter()
84✔
469
                        .cloned()
84✔
470
                        .chain(labels[alias_columns.len()..labels.len()].to_vec())
84✔
471
                        .collect())
84✔
472
                }
473
            }
474
            SetExpr::Values(Values(values_list)) => {
266✔
475
                let total_len = values_list[0].len();
266✔
476
                let alias_len = alias_columns.len();
266✔
477
                if alias_len > total_len {
266✔
478
                    return Err(FetchError::TooManyColumnAliases(
14✔
479
                        name.into(),
14✔
480
                        total_len,
14✔
481
                        alias_len,
14✔
482
                    )
14✔
483
                    .into());
14✔
484
                }
252✔
485
                let labels = (alias_len + 1..=total_len).map(|i| format!("column{i}"));
364✔
486
                let labels = alias_columns
252✔
487
                    .iter()
252✔
488
                    .cloned()
252✔
489
                    .chain(labels)
252✔
490
                    .collect::<Vec<_>>();
252✔
491

492
                Ok(labels)
252✔
493
            }
494
        },
495
    }
496
}
53,759✔
497

498
async fn fetch_join_columns<'a, T: GStore>(
24,936✔
499
    storage: &T,
24,936✔
500
    joins: &'a [Join],
24,936✔
501
) -> Result<Vec<(&'a String, Vec<String>)>> {
24,936✔
502
    let mut all_columns = Vec::with_capacity(joins.len());
24,936✔
503
    for join in joins {
26,516✔
504
        let columns = fetch_relation_columns(storage, &join.relation).await?;
1,580✔
505
        let alias = get_alias(&join.relation);
1,580✔
506
        all_columns.push((alias, columns));
1,580✔
507
    }
508
    Ok(all_columns)
24,936✔
509
}
24,936✔
510

511
pub async fn fetch_labels<T: GStore>(
24,936✔
512
    storage: &T,
24,936✔
513
    relation: &TableFactor,
24,936✔
514
    joins: &[Join],
24,936✔
515
    projection: &Projection,
24,936✔
516
) -> Result<Vec<String>> {
24,936✔
517
    let table_alias = get_alias(relation);
24,936✔
518
    let columns = fetch_relation_columns(storage, relation).await?;
24,936✔
519
    let join_columns = fetch_join_columns(storage, joins).await?;
24,936✔
520

521
    match projection {
24,936✔
522
        Projection::SchemalessMap => Ok(vec![SCHEMALESS_DOC_COLUMN.to_owned()]),
194✔
523
        Projection::SelectItems(projection) => projection
24,742✔
524
            .iter()
24,742✔
525
            .flat_map(|item| match item {
30,946✔
526
                SelectItem::Wildcard => {
527
                    let columns = columns.iter().cloned();
5,081✔
528
                    let join_columns = join_columns.iter().flat_map(|(_, columns)| columns.clone());
5,081✔
529

530
                    columns.chain(join_columns).map(Ok).collect()
5,081✔
531
                }
532
                SelectItem::QualifiedWildcard(target_table_alias) => {
267✔
533
                    if table_alias == target_table_alias {
267✔
534
                        return columns.iter().cloned().map(Ok).collect();
197✔
535
                    }
70✔
536

537
                    let labels = join_columns
70✔
538
                        .iter()
70✔
539
                        .find(|(table_alias, _)| table_alias == &target_table_alias)
70✔
540
                        .map(|(_, columns)| columns.clone());
70✔
541

542
                    match labels {
70✔
543
                        Some(columns) => columns.into_iter().map(Ok).collect(),
56✔
544
                        None => {
545
                            vec![Err(FetchError::TableAliasNotFound(
14✔
546
                                target_table_alias.to_owned(),
14✔
547
                            )
14✔
548
                            .into())]
14✔
549
                        }
550
                    }
551
                }
552
                SelectItem::Expr { label, .. } => vec![Ok(label.to_owned())],
25,598✔
553
            })
30,946✔
554
            .collect::<Result<_>>(),
24,742✔
555
    }
556
}
24,936✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc