• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gluesql / gluesql / 22275856923

22 Feb 2026 11:01AM UTC coverage: 98.17% (+0.09%) from 98.085%
22275856923

Pull #1883

github

web-flow
Merge 6026cc70f into 80a446e87
Pull Request #1883: Add patch coverage diff summary to PR coverage bot comment

21 of 29 new or added lines in 11 files covered. (72.41%)

67 existing lines in 24 files now uncovered.

42750 of 43547 relevant lines covered (98.17%)

66356.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.94
/core/src/executor/insert.rs
1
use {
2
    super::{
3
        select::select,
4
        validate::{ColumnValidation, validate_unique},
5
    },
6
    crate::{
7
        ast::{ColumnDef, ColumnUniqueOption, Expr, ForeignKey, Query, SetExpr, Values},
8
        data::{Key, Row, SCHEMALESS_DOC_COLUMN, Schema, Value, value::BTreeMapJsonExt},
9
        executor::{evaluate::evaluate_stateless, limit::Limit},
10
        result::{Error, Result},
11
        store::{GStore, GStoreMut},
12
    },
13
    futures::stream::{self, StreamExt, TryStreamExt},
14
    serde::Serialize,
15
    std::{collections::BTreeMap, fmt::Debug, sync::Arc},
16
    thiserror::Error as ThisError,
17
};
18

19
#[derive(ThisError, Serialize, Debug, PartialEq, Eq)]
20
pub enum InsertError {
21
    #[error("table not found: {0}")]
22
    TableNotFound(String),
23

24
    #[error("lack of required column: {0}")]
25
    LackOfRequiredColumn(String),
26

27
    #[error("wrong column name: {0}")]
28
    WrongColumnName(String),
29

30
    #[error("column and values not matched")]
31
    ColumnAndValuesNotMatched,
32

33
    #[error("literals have more values than target columns")]
34
    TooManyValues,
35

36
    #[error("only single value accepted for schemaless row insert: got {0}")]
37
    OnlySingleValueAcceptedForSchemalessRow(usize),
38

39
    #[error("map type required: {0}")]
40
    MapTypeValueRequired(String),
41

42
    #[error(
43
        "cannot find referenced value on {table_name}.{column_name} with value {referenced_value:?}"
44
    )]
45
    CannotFindReferencedValue {
46
        table_name: String,
47
        column_name: String,
48
        referenced_value: String,
49
    },
50

51
    #[error("unreachable referencing column name: {0}")]
52
    ConflictReferencingColumnName(String),
53
}
54

55
enum RowsData {
56
    Append(Vec<Vec<Value>>),
57
    Insert(Vec<(Key, Vec<Value>)>),
58
}
59

60
pub async fn insert<T: GStore + GStoreMut>(
4,898✔
61
    storage: &mut T,
4,898✔
62
    table_name: &str,
4,898✔
63
    columns: &[String],
4,898✔
64
    source: &Query,
4,898✔
65
) -> Result<usize> {
4,898✔
66
    let Schema {
67
        column_defs,
4,884✔
68
        foreign_keys,
4,884✔
69
        ..
70
    } = storage
4,898✔
71
        .fetch_schema(table_name)
4,898✔
72
        .await?
4,898✔
73
        .ok_or_else(|| InsertError::TableNotFound(table_name.to_owned()))?;
4,898✔
74

75
    let rows = match column_defs {
4,884✔
76
        Some(column_defs) => {
4,630✔
77
            fetch_vec_rows(
4,630✔
78
                storage,
4,630✔
79
                table_name,
4,630✔
80
                column_defs,
4,630✔
81
                columns,
4,630✔
82
                source,
4,630✔
83
                foreign_keys,
4,630✔
84
            )
4,630✔
85
            .await
86
        }
87
        None => fetch_schemaless_rows(storage, source)
254✔
88
            .await
89
            .map(RowsData::Append),
254✔
90
    }?;
896✔
91

92
    match rows {
3,988✔
93
        RowsData::Append(rows) => {
3,491✔
94
            let num_rows = rows.len();
3,491✔
95

96
            storage
3,491✔
97
                .append_data(table_name, rows)
3,491✔
98
                .await
99
                .map(|()| num_rows)
3,491✔
100
        }
101
        RowsData::Insert(rows) => {
497✔
102
            let num_rows = rows.len();
497✔
103

104
            storage
497✔
105
                .insert_data(table_name, rows)
497✔
106
                .await
107
                .map(|()| num_rows)
497✔
108
        }
109
    }
110
}
4,898✔
111

112
async fn fetch_vec_rows<T: GStore>(
4,630✔
113
    storage: &T,
4,630✔
114
    table_name: &str,
4,630✔
115
    column_defs: Vec<ColumnDef>,
4,630✔
116
    columns: &[String],
4,630✔
117
    source: &Query,
4,630✔
118
    foreign_keys: Vec<ForeignKey>,
4,630✔
119
) -> Result<RowsData> {
4,630✔
120
    #[derive(futures_enum::Stream)]
121
    enum Rows<I1, I2> {
122
        Values(I1),
123
        Select(I2),
124
    }
125

126
    let labels = Arc::from(
4,630✔
127
        column_defs
4,630✔
128
            .iter()
4,630✔
129
            .map(|column_def| column_def.name.clone())
10,707✔
130
            .collect::<Vec<_>>(),
4,630✔
131
    );
132
    let column_defs = Arc::from(column_defs);
4,630✔
133
    let column_validation = ColumnValidation::All(&column_defs);
4,630✔
134

135
    let rows = match &source.body {
4,630✔
136
        SetExpr::Values(Values(values_list)) => {
4,504✔
137
            let limit = Limit::new(source.limit.as_ref(), source.offset.as_ref()).await?;
4,504✔
138
            let rows = stream::iter(values_list).then(|values| {
10,169✔
139
                let column_defs = Arc::clone(&column_defs);
10,169✔
140
                let labels = Arc::clone(&labels);
10,169✔
141

142
                async move {
10,169✔
143
                    Ok(Row {
144
                        columns: labels,
10,169✔
145
                        values: fill_values(&column_defs, columns, values).await?,
10,169✔
146
                    })
147
                }
10,169✔
148
            });
10,169✔
149
            let rows = limit.apply(rows);
4,504✔
150
            let rows = rows.map(|row| Ok::<_, Error>(row?.into_values()));
10,127✔
151

152
            Rows::Values(rows)
4,504✔
153
        }
154
        SetExpr::Select(_) => {
155
            let rows = select(storage, source, None).await?.map(|row| {
392✔
156
                let values = row?.into_values();
392✔
157

158
                column_defs
392✔
159
                    .iter()
392✔
160
                    .zip(values.iter())
392✔
161
                    .try_for_each(|(column_def, value)| {
840✔
162
                        let ColumnDef {
163
                            data_type,
840✔
164
                            nullable,
840✔
165
                            ..
166
                        } = column_def;
840✔
167

168
                        value.validate_type(data_type)?;
840✔
169
                        value.validate_null(*nullable)
826✔
170
                    })?;
840✔
171

172
                Ok(values)
364✔
173
            });
392✔
174

175
            Rows::Select(rows)
126✔
176
        }
177
    }
178
    .try_collect::<Vec<Vec<Value>>>()
4,630✔
179
    .await?;
4,630✔
180

181
    validate_unique(
3,972✔
182
        storage,
3,972✔
183
        table_name,
3,972✔
184
        column_validation,
3,972✔
185
        rows.iter().map(std::vec::Vec::as_slice),
3,972✔
186
    )
3,972✔
187
    .await?;
3,972✔
188

189
    validate_foreign_key(storage, &column_defs, foreign_keys, &rows).await?;
3,860✔
190

191
    let primary_key = column_defs.iter().position(|ColumnDef { unique, .. }| {
8,364✔
192
        unique == &Some(ColumnUniqueOption { is_primary: true })
8,364✔
193
    });
8,364✔
194

195
    match primary_key {
3,846✔
196
        Some(i) => rows
497✔
197
            .into_iter()
497✔
198
            .filter_map(|values| {
958✔
199
                values
958✔
200
                    .get(i)
958✔
201
                    .map(Key::try_from)
958✔
202
                    .map(|result| result.map(|key| (key, values)))
958✔
203
            })
958✔
204
            .collect::<Result<Vec<_>>>()
497✔
205
            .map(RowsData::Insert),
497✔
206
        None => Ok(RowsData::Append(rows)),
3,349✔
207
    }
208
}
4,630✔
209

210
async fn validate_foreign_key<T: GStore>(
3,860✔
211
    storage: &T,
3,860✔
212
    column_defs: &Arc<[ColumnDef]>,
3,860✔
213
    foreign_keys: Vec<ForeignKey>,
3,860✔
214
    rows: &[Vec<Value>],
3,860✔
215
) -> Result<()> {
3,860✔
216
    for foreign_key in foreign_keys {
3,916✔
217
        let ForeignKey {
218
            referencing_column_name,
70✔
219
            referenced_table_name,
70✔
220
            referenced_column_name,
70✔
221
            ..
222
        } = &foreign_key;
70✔
223

224
        let target_index = column_defs
70✔
225
            .iter()
70✔
226
            .enumerate()
70✔
227
            .find(|(_, c)| &c.name == referencing_column_name)
224✔
228
            .ok_or_else(|| {
70✔
UNCOV
229
                InsertError::ConflictReferencingColumnName(referencing_column_name.to_owned())
×
UNCOV
230
            })?;
×
231

232
        for row in rows {
126✔
233
            let value =
70✔
234
                row.get(target_index.0)
70✔
235
                    .ok_or(InsertError::ConflictReferencingColumnName(
70✔
236
                        referencing_column_name.to_owned(),
70✔
237
                    ))?;
70✔
238

239
            if value == &Value::Null {
70✔
240
                continue;
14✔
241
            }
56✔
242

243
            let no_referenced = storage
56✔
244
                .fetch_data(referenced_table_name, &Key::try_from(value)?)
56✔
245
                .await?
56✔
246
                .is_none();
56✔
247

248
            if no_referenced {
56✔
249
                return Err(InsertError::CannotFindReferencedValue {
14✔
250
                    table_name: referenced_table_name.to_owned(),
14✔
251
                    column_name: referenced_column_name.to_owned(),
14✔
252
                    referenced_value: String::from(value),
14✔
253
                }
14✔
254
                .into());
14✔
255
            }
42✔
256
        }
257
    }
258

259
    Ok(())
3,846✔
260
}
3,860✔
261

262
async fn fetch_schemaless_rows<T: GStore>(storage: &T, source: &Query) -> Result<Vec<Vec<Value>>> {
254✔
263
    #[derive(futures_enum::Stream)]
264
    enum Rows<I1, I2> {
265
        Values(I1),
266
        Select(I2),
267
    }
268

269
    let doc_column: Arc<[String]> = Arc::from(vec![SCHEMALESS_DOC_COLUMN.to_owned()]);
254✔
270

271
    let rows = match &source.body {
254✔
272
        SetExpr::Values(Values(values_list)) => {
212✔
273
            let limit = Limit::new(source.limit.as_ref(), source.offset.as_ref()).await?;
212✔
274
            let rows = stream::iter(values_list).then({
212✔
275
                let doc_column = Arc::clone(&doc_column);
212✔
276
                move |values| {
308✔
277
                    let doc_column = Arc::clone(&doc_column);
308✔
278
                    async move {
308✔
279
                        if values.len() > 1 {
308✔
280
                            return Err(InsertError::OnlySingleValueAcceptedForSchemalessRow(
14✔
281
                                values.len(),
14✔
282
                            )
14✔
283
                            .into());
14✔
284
                        }
294✔
285

286
                        let Some(value) = values.first() else {
294✔
287
                            return Err(
14✔
288
                                InsertError::OnlySingleValueAcceptedForSchemalessRow(0).into()
14✔
289
                            );
14✔
290
                        };
291

292
                        let map: BTreeMap<String, Value> =
238✔
293
                            evaluate_stateless(None, value).await?.try_into()?;
280✔
294

295
                        Ok(Row {
238✔
296
                            columns: doc_column,
238✔
297
                            values: vec![Value::Map(map)],
238✔
298
                        })
238✔
299
                    }
308✔
300
                }
308✔
301
            });
302
            let rows = limit.apply(rows);
212✔
303
            let rows = rows.map_ok(Row::into_values);
212✔
304

305
            Rows::Values(rows)
212✔
306
        }
307
        SetExpr::Select(_) => {
308
            let rows = select(storage, source, None).await?.map(|row| {
42✔
309
                let values = row?.into_values();
42✔
310

311
                if values.len() > 1 {
42✔
312
                    return Err(
14✔
313
                        InsertError::OnlySingleValueAcceptedForSchemalessRow(values.len()).into(),
14✔
314
                    );
14✔
315
                }
28✔
316

317
                let map = match values.into_iter().next() {
28✔
318
                    None => {
319
                        return Err(InsertError::OnlySingleValueAcceptedForSchemalessRow(0).into());
14✔
320
                    }
UNCOV
321
                    Some(Value::Map(map)) => map,
×
UNCOV
322
                    Some(Value::Str(s)) => BTreeMap::parse_json_object(&s)?,
×
323
                    Some(v) => return Err(InsertError::MapTypeValueRequired((&v).into()).into()),
14✔
324
                };
325

UNCOV
326
                Ok(vec![Value::Map(map)])
×
327
            });
42✔
328

329
            Rows::Select(rows)
42✔
330
        }
331
    }
332
    .try_collect::<Vec<Vec<Value>>>()
254✔
333
    .await?;
254✔
334

335
    Ok(rows)
142✔
336
}
254✔
337

338
async fn fill_values(
917,158✔
339
    column_defs: &[ColumnDef],
917,158✔
340
    columns: &[String],
917,158✔
341
    values: &[Expr],
917,158✔
342
) -> Result<Vec<Value>> {
917,158✔
343
    #[derive(iter_enum::Iterator)]
344
    enum Columns<I1, I2> {
345
        All(I1),
346
        Specified(I2),
347
    }
348

349
    if !columns.is_empty() && values.len() != columns.len() {
10,169✔
350
        return Err(InsertError::ColumnAndValuesNotMatched.into());
14✔
351
    } else if values.len() > column_defs.len() {
10,155✔
352
        return Err(InsertError::TooManyValues.into());
14✔
353
    }
10,141✔
354

355
    if let Some(wrong_column_name) = columns.iter().find(|column_name| {
11,943✔
356
        !column_defs
11,383✔
357
            .iter()
11,383✔
358
            .any(|column_def| &&column_def.name == column_name)
23,759✔
359
    }) {
11,383✔
360
        return Err(InsertError::WrongColumnName(wrong_column_name.to_owned()).into());
14✔
361
    }
10,127✔
362

363
    let columns = if columns.is_empty() {
10,127✔
364
        Columns::All(column_defs.iter().map(|ColumnDef { name, .. }| name))
6,129✔
365
    } else {
366
        Columns::Specified(columns.iter())
3,998✔
367
    };
368

369
    let column_name_value_list = columns.zip(values.iter()).collect::<Vec<(_, _)>>();
10,127✔
370

371
    let values = stream::iter(column_defs)
10,127✔
372
        .then(|column_def| {
23,694✔
373
            let column_name_value_list = &column_name_value_list;
23,694✔
374

375
            async move {
23,694✔
376
                let ColumnDef {
377
                    name: def_name,
23,694✔
378
                    data_type,
23,694✔
379
                    nullable,
23,694✔
380
                    ..
381
                } = column_def;
23,694✔
382

383
                let value = column_name_value_list
23,694✔
384
                    .iter()
23,694✔
385
                    .find(|(name, _)| name == &def_name)
46,377✔
386
                    .map(|(_, value)| value);
23,694✔
387

388
                match (value, &column_def.default, nullable) {
23,694✔
389
                    (Some(&expr), _, _) | (None, Some(expr), _) => evaluate_stateless(None, expr)
23,568✔
390
                        .await?
23,568✔
391
                        .try_into_value(data_type, *nullable),
23,512✔
392
                    (None, None, true) => Ok(Value::Null),
98✔
393
                    (None, None, false) => {
394
                        Err(InsertError::LackOfRequiredColumn(def_name.to_owned()).into())
28✔
395
                    }
396
                }
397
            }
23,694✔
398
        })
23,694✔
399
        .try_collect::<Vec<Value>>()
10,127✔
400
        .await?;
10,127✔
401

402
    Ok(values)
9,539✔
403
}
10,169✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc