• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gluesql / gluesql / 22275856923

22 Feb 2026 11:01AM UTC coverage: 98.17% (+0.09%) from 98.085%
22275856923

Pull #1883

github

web-flow
Merge 6026cc70f into 80a446e87
Pull Request #1883: Add patch coverage diff summary to PR coverage bot comment

21 of 29 new or added lines in 11 files covered. (72.41%)

67 existing lines in 24 files now uncovered.

42750 of 43547 relevant lines covered (98.17%)

66356.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.33
/core/src/executor/execute.rs
1
use {
2
    super::{
3
        alter::{
4
            CreateTableOptions, alter_table, create_index, create_table, delete_function,
5
            drop_table, insert_function,
6
        },
7
        delete::delete,
8
        fetch::fetch,
9
        insert::insert,
10
        select::{select, select_with_labels},
11
        update::Update,
12
        validate::{ColumnValidation, validate_unique},
13
    },
14
    crate::{
15
        ast::{
16
            BinaryOperator, DataType, Dictionary, Expr, Literal, Projection, Query, SelectItem,
17
            SetExpr, Statement, TableAlias, TableFactor, TableWithJoins, Variable,
18
        },
19
        data::{Key, Row, SCHEMALESS_DOC_COLUMN, Schema, Value},
20
        result::{Error, Result},
21
        store::{GStore, GStoreMut},
22
    },
23
    futures::stream::{StreamExt, TryStreamExt},
24
    serde::{Deserialize, Serialize},
25
    std::{
26
        collections::{BTreeMap, HashMap},
27
        env::var,
28
        fmt::Debug,
29
        sync::Arc,
30
    },
31
    thiserror::Error as ThisError,
32
};
33

34
#[derive(ThisError, Serialize, Debug, PartialEq, Eq)]
35
pub enum ExecuteError {
36
    #[error("table not found: {0}")]
37
    TableNotFound(String),
38

39
    #[error("expected Map value in _doc column")]
40
    ExpectedMapValueInDocColumn,
41
}
42

43
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
44
pub enum Payload {
45
    ShowColumns(Vec<(String, DataType)>),
46
    Create,
47
    Insert(usize),
48
    Select {
49
        labels: Vec<String>,
50
        rows: Vec<Vec<Value>>,
51
    },
52
    SelectMap(Vec<BTreeMap<String, Value>>),
53
    Delete(usize),
54
    Update(usize),
55
    DropTable(usize),
56
    DropFunction,
57
    AlterTable,
58
    CreateIndex,
59
    DropIndex,
60
    StartTransaction,
61
    Commit,
62
    Rollback,
63
    ShowVariable(PayloadVariable),
64
}
65

66
impl Payload {
67
    /// Exports `select` payloads as an [`std::iter::Iterator`].
68
    ///
69
    /// The items of the Iterator are `HashMap<Column, Value>`, and they are borrowed by default.
70
    /// If ownership is required, you need to acquire them directly.
71
    ///
72
    /// - Some: [`Payload::Select`], [`Payload::SelectMap`]
73
    /// - None: otherwise
UNCOV
74
    pub fn select(&self) -> Option<impl Iterator<Item = HashMap<&str, &Value>>> {
×
75
        #[derive(iter_enum::Iterator)]
76
        enum Iter<I1, I2> {
77
            Schema(I1),
78
            Schemaless(I2),
79
        }
80

81
        Some(match self {
×
82
            Payload::Select { labels, rows } => Iter::Schema(rows.iter().map(move |row| {
×
83
                labels
×
84
                    .iter()
×
85
                    .zip(row.iter())
×
86
                    .map(|(label, value)| (label.as_str(), value))
×
87
                    .collect::<HashMap<_, _>>()
×
88
            })),
×
89
            Payload::SelectMap(rows) => Iter::Schemaless(rows.iter().map(|row| {
×
90
                row.iter()
×
91
                    .map(|(k, v)| (k.as_str(), v))
×
UNCOV
92
                    .collect::<HashMap<_, _>>()
×
93
            })),
×
UNCOV
94
            _ => return None,
×
95
        })
UNCOV
96
    }
×
97
}
98

99
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
100
pub enum PayloadVariable {
101
    Tables(Vec<String>),
102
    Functions(Vec<String>),
103
    Version(String),
104
}
105

106
pub async fn execute<T: GStore + GStoreMut>(
31,175✔
107
    storage: &mut T,
31,175✔
108
    statement: &Statement,
31,175✔
109
) -> Result<Payload> {
31,175✔
110
    if matches!(
30,970✔
111
        statement,
31,175✔
112
        Statement::StartTransaction | Statement::Rollback | Statement::Commit
113
    ) {
114
        return execute_inner(storage, statement).await;
205✔
115
    }
30,970✔
116

117
    let autocommit = storage.begin(true).await?;
30,970✔
118
    let result = execute_inner(storage, statement).await;
30,970✔
119

120
    if !autocommit {
30,970✔
121
        return result;
24,094✔
122
    }
6,876✔
123

124
    match result {
6,876✔
125
        Ok(payload) => storage.commit().await.map(|()| payload),
5,709✔
126
        Err(error) => {
1,167✔
127
            storage.rollback().await?;
1,167✔
128

129
            Err(error)
1,167✔
130
        }
131
    }
132
}
31,175✔
133

134
async fn execute_inner<T: GStore + GStoreMut>(
31,175✔
135
    storage: &mut T,
31,175✔
136
    statement: &Statement,
31,175✔
137
) -> Result<Payload> {
31,175✔
138
    match statement {
31,175✔
139
        //- Modification
140
        //-- Tables
141
        Statement::CreateTable {
142
            name,
3,813✔
143
            columns,
3,813✔
144
            if_not_exists,
3,813✔
145
            source,
3,813✔
146
            engine,
3,813✔
147
            foreign_keys,
3,813✔
148
            comment,
3,813✔
149
        } => {
150
            let options = CreateTableOptions {
3,813✔
151
                target_table_name: name,
3,813✔
152
                column_defs: columns.as_ref().map(Vec::as_slice),
3,813✔
153
                if_not_exists: *if_not_exists,
3,813✔
154
                source,
3,813✔
155
                engine,
3,813✔
156
                foreign_keys,
3,813✔
157
                comment,
3,813✔
158
            };
3,813✔
159

160
            create_table(storage, options)
3,813✔
161
                .await
162
                .map(|()| Payload::Create)
3,813✔
163
        }
164
        Statement::DropTable {
165
            names,
209✔
166
            if_exists,
209✔
167
            cascade,
209✔
168
            ..
169
        } => drop_table(storage, names, *if_exists, *cascade)
209✔
170
            .await
171
            .map(Payload::DropTable),
209✔
172
        Statement::AlterTable { name, operation } => alter_table(storage, name, operation)
282✔
173
            .await
174
            .map(|()| Payload::AlterTable),
282✔
175
        Statement::CreateIndex {
176
            name,
74✔
177
            table_name,
74✔
178
            column,
74✔
179
        } => create_index(storage, table_name, name, column)
74✔
180
            .await
181
            .map(|()| Payload::CreateIndex),
74✔
182
        Statement::DropIndex { name, table_name } => storage
14✔
183
            .drop_index(table_name, name)
14✔
184
            .await
185
            .map(|()| Payload::DropIndex),
14✔
186
        //- Transaction
187
        Statement::StartTransaction => storage
101✔
188
            .begin(false)
101✔
189
            .await
190
            .map(|_| Payload::StartTransaction),
101✔
191
        Statement::Commit => storage.commit().await.map(|()| Payload::Commit),
54✔
192
        Statement::Rollback => storage.rollback().await.map(|()| Payload::Rollback),
50✔
193
        //-- Rows
194
        Statement::Insert {
195
            table_name,
4,898✔
196
            columns,
4,898✔
197
            source,
4,898✔
198
        } => insert(storage, table_name, columns, source)
4,898✔
199
            .await
200
            .map(Payload::Insert),
4,898✔
201
        Statement::Update {
202
            table_name,
605✔
203
            selection,
605✔
204
            assignments,
605✔
205
        } => {
206
            let Schema {
207
                column_defs,
591✔
208
                foreign_keys,
591✔
209
                ..
210
            } = storage
605✔
211
                .fetch_schema(table_name)
605✔
212
                .await?
605✔
213
                .ok_or_else(|| ExecuteError::TableNotFound(table_name.to_owned()))?;
605✔
214

215
            let all_columns = column_defs.as_deref().map_or_else(
591✔
216
                || Arc::from(vec![SCHEMALESS_DOC_COLUMN.to_owned()]),
31✔
217
                |columns| columns.iter().map(|col_def| col_def.name.clone()).collect(),
1,594✔
218
            );
219
            let columns_to_update: Vec<String> = assignments
591✔
220
                .iter()
591✔
221
                .map(|assignment| assignment.id.clone())
633✔
222
                .collect();
591✔
223

224
            let update = Update::new(storage, table_name, assignments, column_defs.as_deref())?;
591✔
225

226
            let foreign_keys = Arc::new(foreign_keys);
549✔
227

228
            let rows = fetch(storage, table_name, all_columns, selection.as_ref())
549✔
229
                .await?
549✔
230
                .and_then(|item| {
986✔
231
                    let update = &update;
986✔
232
                    let (key, row) = item;
986✔
233

234
                    let foreign_keys = Arc::clone(&foreign_keys);
986✔
235
                    async move {
986✔
236
                        let row = update.apply(row, foreign_keys.as_ref()).await?;
986✔
237

238
                        Ok((key, row))
901✔
239
                    }
986✔
240
                })
986✔
241
                .try_collect::<Vec<(Key, Row)>>()
549✔
242
                .await?;
549✔
243

244
            if let Some(column_defs) = column_defs {
464✔
245
                let column_validation =
434✔
246
                    ColumnValidation::SpecifiedColumns(&column_defs, columns_to_update);
434✔
247
                let rows = rows.iter().map(|(_, row)| row.values.as_slice());
434✔
248

249
                validate_unique(storage, table_name, column_validation, rows).await?;
434✔
250
            }
30✔
251

252
            let num_rows = rows.len();
422✔
253
            let rows = rows
422✔
254
                .into_iter()
422✔
255
                .map(|(key, row)| (key, row.into_values()))
831✔
256
                .collect();
422✔
257

258
            storage
422✔
259
                .insert_data(table_name, rows)
422✔
260
                .await
261
                .map(|()| Payload::Update(num_rows))
422✔
262
        }
263
        Statement::Delete {
264
            table_name,
481✔
265
            selection,
481✔
266
        } => delete(storage, table_name, selection.as_ref()).await,
481✔
267

268
        //- Selection
269
        Statement::Query(query) => {
20,452✔
270
            let (labels, rows) = select_with_labels(storage, query, None).await?;
20,452✔
271

272
            let is_schemaless_map = matches!(
20,060✔
273
                &query.body,
19,528✔
274
                SetExpr::Select(select) if matches!(select.projection, Projection::SchemalessMap)
19,528✔
275
            );
276

277
            if is_schemaless_map {
20,060✔
278
                rows.map(|row| {
210✔
279
                    let mut values = row?.into_values().into_iter();
210✔
280
                    match (values.next(), values.next()) {
209✔
281
                        (Some(Value::Map(map)), None) => Ok(map),
209✔
UNCOV
282
                        _ => Err(ExecuteError::ExpectedMapValueInDocColumn.into()),
×
283
                    }
284
                })
210✔
285
                .try_collect::<Vec<_>>()
152✔
286
                .await
287
                .map(Payload::SelectMap)
152✔
288
            } else {
289
                rows.map(|row| Ok(row?.into_values()))
39,079✔
290
                    .try_collect::<Vec<_>>()
19,908✔
291
                    .await
292
                    .map(|rows| Payload::Select { labels, rows })
19,908✔
293
            }
294
        }
295
        Statement::ShowColumns { table_name } => {
42✔
296
            let Schema { column_defs, .. } = storage
42✔
297
                .fetch_schema(table_name)
42✔
298
                .await?
42✔
299
                .ok_or_else(|| ExecuteError::TableNotFound(table_name.to_owned()))?;
42✔
300

301
            let output: Vec<(String, DataType)> = column_defs
28✔
302
                .unwrap_or_default()
28✔
303
                .into_iter()
28✔
304
                .map(|key| (key.name, key.data_type))
252✔
305
                .collect();
28✔
306

307
            Ok(Payload::ShowColumns(output))
28✔
308
        }
309
        Statement::ShowIndexes(table_name) => {
2✔
310
            let query = Query {
2✔
311
                body: SetExpr::Select(Box::new(crate::ast::Select {
2✔
312
                    distinct: false,
2✔
313
                    projection: Projection::SelectItems(vec![SelectItem::Wildcard]),
2✔
314
                    from: TableWithJoins {
2✔
315
                        relation: TableFactor::Dictionary {
2✔
316
                            dict: Dictionary::GlueIndexes,
2✔
317
                            alias: TableAlias {
2✔
318
                                name: "GLUE_INDEXES".to_owned(),
2✔
319
                                columns: Vec::new(),
2✔
320
                            },
2✔
321
                        },
2✔
322
                        joins: Vec::new(),
2✔
323
                    },
2✔
324
                    selection: Some(Expr::BinaryOp {
2✔
325
                        left: Box::new(Expr::Identifier("TABLE_NAME".to_owned())),
2✔
326
                        op: BinaryOperator::Eq,
2✔
327
                        right: Box::new(Expr::Literal(Literal::QuotedString(
2✔
328
                            table_name.to_owned(),
2✔
329
                        ))),
2✔
330
                    }),
2✔
331
                    group_by: Vec::new(),
2✔
332
                    having: None,
2✔
333
                })),
2✔
334
                order_by: Vec::new(),
2✔
335
                limit: None,
2✔
336
                offset: None,
2✔
337
            };
2✔
338

339
            let (labels, rows) = select_with_labels(storage, &query, None).await?;
2✔
340
            let rows = rows
2✔
341
                .map(|row| Ok::<_, Error>(row?.into_values()))
3✔
342
                .try_collect::<Vec<_>>()
2✔
343
                .await?;
2✔
344

345
            if rows.is_empty() {
2✔
346
                return Err(ExecuteError::TableNotFound(table_name.to_owned()).into());
1✔
347
            }
1✔
348

349
            Ok(Payload::Select { labels, rows })
1✔
350
        }
351
        Statement::ShowVariable(variable) => match variable {
83✔
352
            Variable::Tables => {
353
                let query = Query {
66✔
354
                    body: SetExpr::Select(Box::new(crate::ast::Select {
66✔
355
                        distinct: false,
66✔
356
                        projection: Projection::SelectItems(vec![SelectItem::Expr {
66✔
357
                            expr: Expr::Identifier("TABLE_NAME".to_owned()),
66✔
358
                            label: "TABLE_NAME".to_owned(),
66✔
359
                        }]),
66✔
360
                        from: TableWithJoins {
66✔
361
                            relation: TableFactor::Dictionary {
66✔
362
                                dict: Dictionary::GlueTables,
66✔
363
                                alias: TableAlias {
66✔
364
                                    name: "GLUE_TABLES".to_owned(),
66✔
365
                                    columns: Vec::new(),
66✔
366
                                },
66✔
367
                            },
66✔
368
                            joins: Vec::new(),
66✔
369
                        },
66✔
370
                        selection: None,
66✔
371
                        group_by: Vec::new(),
66✔
372
                        having: None,
66✔
373
                    })),
66✔
374
                    order_by: Vec::new(),
66✔
375
                    limit: None,
66✔
376
                    offset: None,
66✔
377
                };
66✔
378

379
                let table_names = select(storage, &query, None)
66✔
380
                    .await?
66✔
381
                    .map(|row| Ok::<_, Error>(row?.into_values()))
90✔
382
                    .try_collect::<Vec<Vec<Value>>>()
66✔
383
                    .await?
66✔
384
                    .iter()
66✔
385
                    .flat_map(|values| values.iter().map(Into::into))
90✔
386
                    .collect::<Vec<_>>();
66✔
387

388
                Ok(Payload::ShowVariable(PayloadVariable::Tables(table_names)))
66✔
389
            }
390
            Variable::Functions => {
391
                let mut function_desc: Vec<_> = storage
3✔
392
                    .fetch_all_functions()
3✔
393
                    .await?
3✔
394
                    .iter()
1✔
395
                    .map(|f| f.to_str())
3✔
396
                    .collect();
1✔
397
                function_desc.sort();
1✔
398
                Ok(Payload::ShowVariable(PayloadVariable::Functions(
1✔
399
                    function_desc,
1✔
400
                )))
1✔
401
            }
402
            Variable::Version => {
403
                let version = var("CARGO_PKG_VERSION")
14✔
404
                    .unwrap_or_else(|_| env!("CARGO_PKG_VERSION").to_owned());
14✔
405
                let payload = Payload::ShowVariable(PayloadVariable::Version(version));
14✔
406

407
                Ok(payload)
14✔
408
            }
409
        },
410
        Statement::CreateFunction {
411
            or_replace,
10✔
412
            name,
10✔
413
            args,
10✔
414
            return_,
10✔
415
        } => insert_function(storage, name, args, *or_replace, return_)
10✔
416
            .await
417
            .map(|()| Payload::Create),
10✔
418
        Statement::DropFunction { if_exists, names } => delete_function(storage, names, *if_exists)
5✔
419
            .await
420
            .map(|()| Payload::DropFunction),
5✔
421
    }
422
}
31,175✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc