• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

joaoh82 / rust_sqlite / 25373448073

05 May 2026 11:23AM UTC coverage: 63.201% (+0.2%) from 63.042%
25373448073

push

github

web-flow
release: v0.5.1 (#94)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

7641 of 12090 relevant lines covered (63.2%)

1.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.18
/src/sql/pager/mod.rs
1
//! On-disk persistence for a `Database`, using fixed-size paged files.
2
//!
3
//! The file is a sequence of 4 KiB pages. Page 0 holds the header
4
//! (magic, version, page count, schema-root pointer). Every other page carries
5
//! a small per-page header (type tag + next-page pointer + payload length)
6
//! followed by a payload of up to 4089 bytes.
7
//!
8
//! **Storage strategy (format version 2, Phase 3c.5).**
9
//!
10
//! - Each `Table`'s rows live as **cells** in a chain of `TableLeaf` pages.
11
//!   Cell layout and slot directory are in `cell.rs` / `table_page.rs`;
12
//!   cells that exceed the inline threshold spill into an overflow chain
13
//!   via `overflow.rs`.
14
//! - The schema catalog is itself a regular table named `sqlrite_master`,
15
//!   with one row per user table:
16
//!       `(name TEXT PRIMARY KEY, sql TEXT NOT NULL,
17
//!         rootpage INTEGER NOT NULL, last_rowid INTEGER NOT NULL)`
18
//!   This is the SQLite-style approach: the schema of `sqlrite_master`
19
//!   itself is hardcoded into the engine so the open path can bootstrap.
20
//! - Page 0's `schema_root_page` field points at the first leaf of
21
//!   `sqlrite_master`.
22
//!
23
//! **Format version.** Version 2 is not compatible with files produced by
24
//! earlier commits. Opening a v1 file returns a clean error — users on
25
//! old files have to regenerate them from CREATE/INSERT, as there's no
26
//! production data to migrate yet.
27

28
// Data-layer modules. Not every helper in these modules is used by save/open
29
// yet — some exist for tests, some for future maintenance operations.
30
// Module-level #[allow(dead_code)] keeps the build quiet without dotting
31
// the modules with per-item attributes.
32
#[allow(dead_code)]
33
pub mod allocator;
34
#[allow(dead_code)]
35
pub mod cell;
36
pub mod file;
37
#[allow(dead_code)]
38
pub mod freelist;
39
#[allow(dead_code)]
40
pub mod fts_cell;
41
pub mod header;
42
#[allow(dead_code)]
43
pub mod hnsw_cell;
44
#[allow(dead_code)]
45
pub mod index_cell;
46
#[allow(dead_code)]
47
pub mod interior_page;
48
pub mod overflow;
49
pub mod page;
50
pub mod pager;
51
#[allow(dead_code)]
52
pub mod table_page;
53
#[allow(dead_code)]
54
pub mod varint;
55
#[allow(dead_code)]
56
pub mod wal;
57

58
use std::collections::{BTreeMap, HashMap};
59
use std::path::Path;
60
use std::sync::{Arc, Mutex};
61

62
use sqlparser::dialect::SQLiteDialect;
63
use sqlparser::parser::Parser;
64

65
use crate::error::{Result, SQLRiteError};
66
use crate::sql::db::database::Database;
67
use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
68
use crate::sql::db::table::{Column, DataType, Row, Table, Value};
69
use crate::sql::pager::cell::Cell;
70
use crate::sql::pager::header::DbHeader;
71
use crate::sql::pager::index_cell::IndexCell;
72
use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
73
use crate::sql::pager::overflow::{
74
    OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
75
};
76
use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
77
use crate::sql::pager::pager::Pager;
78
use crate::sql::pager::table_page::TablePage;
79
use crate::sql::parser::create::CreateQuery;
80

81
// Re-export so callers can spell `sql::pager::AccessMode` without
82
// reaching into the `pager::pager::pager` submodule path.
83
pub use crate::sql::pager::pager::AccessMode;
84

85
/// Name of the internal catalog table. Reserved — user CREATEs of this
86
/// name must be rejected upstream.
87
pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
88

89
/// Opens a database file in read-write mode. Shorthand for
90
/// [`open_database_with_mode`] with [`AccessMode::ReadWrite`].
91
pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
2✔
92
    open_database_with_mode(path, db_name, AccessMode::ReadWrite)
2✔
93
}
94

95
/// Opens a database file in read-only mode. Acquires a shared OS-level
96
/// advisory lock, so other read-only openers coexist but any writer is
97
/// excluded. Attempts to mutate the returned `Database` (e.g. an
98
/// `INSERT`, or a `save_database` call against it) bottom out in a
99
/// `cannot commit: database is opened read-only` error from the Pager.
100
pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
1✔
101
    open_database_with_mode(path, db_name, AccessMode::ReadOnly)
1✔
102
}
103

104
/// Opens a database file and reconstructs the in-memory `Database`,
105
/// leaving the long-lived `Pager` attached for subsequent auto-save
106
/// (read-write) or consistent-snapshot reads (read-only).
107
pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
2✔
108
    let pager = Pager::open_with_mode(path, mode)?;
5✔
109

110
    // 1. Load sqlrite_master from the tree at header.schema_root_page.
111
    let mut master = build_empty_master_table();
2✔
112
    load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
4✔
113

114
    // 2. Two passes over master rows: first build every user table, then
115
    //    attach secondary indexes. Indexes need their base table to exist
116
    //    before we can populate them. Auto-indexes are created at table
117
    //    build time so we only have to load explicit indexes from disk
118
    //    (but we also reload the auto-index CONTENT because Table::new
119
    //    built it empty).
120
    let mut db = Database::new(db_name);
2✔
121
    let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
2✔
122

123
    for rowid in master.rowids() {
6✔
124
        let ty = take_text(&master, "type", rowid)?;
4✔
125
        let name = take_text(&master, "name", rowid)?;
4✔
126
        let sql = take_text(&master, "sql", rowid)?;
4✔
127
        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
4✔
128
        let last_rowid = take_integer(&master, "last_rowid", rowid)?;
2✔
129

130
        match ty.as_str() {
2✔
131
            "table" => {
2✔
132
                let (parsed_name, columns) = parse_create_sql(&sql)?;
4✔
133
                if parsed_name != name {
4✔
134
                    return Err(SQLRiteError::Internal(format!(
×
135
                        "sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
136
                    )));
137
                }
138
                let mut table = build_empty_table(&name, columns, last_rowid);
4✔
139
                if rootpage != 0 {
2✔
140
                    load_table_rows(&pager, &mut table, rootpage)?;
4✔
141
                }
142
                if last_rowid > table.last_rowid {
2✔
143
                    table.last_rowid = last_rowid;
×
144
                }
145
                db.tables.insert(name, table);
4✔
146
            }
147
            "index" => {
4✔
148
                index_rows.push(IndexCatalogRow {
4✔
149
                    name,
2✔
150
                    sql,
2✔
151
                    rootpage,
152
                });
153
            }
154
            other => {
×
155
                return Err(SQLRiteError::Internal(format!(
×
156
                    "sqlrite_master row '{name}' has unknown type '{other}'"
157
                )));
158
            }
159
        }
160
    }
161

162
    // Second pass: attach each index to its table. HNSW indexes
163
    // (Phase 7d.2) take a different code path because their persisted
164
    // form is just the CREATE INDEX SQL — the graph itself isn't
165
    // persisted yet (Phase 7d.3). Detect HNSW via the SQL's USING clause
166
    // and route to a graph-rebuild instead of the B-Tree-cell load.
167
    //
168
    // Phase 8b — same shape for FTS indexes. The posting lists aren't
169
    // persisted yet (Phase 8c), so we replay the CREATE INDEX SQL on
170
    // open and let `execute_create_index` walk current rows.
171
    for row in index_rows {
6✔
172
        if create_index_sql_uses_hnsw(&row.sql) {
4✔
173
            rebuild_hnsw_index(&mut db, &pager, &row)?;
2✔
174
        } else if create_index_sql_uses_fts(&row.sql) {
4✔
175
            rebuild_fts_index(&mut db, &pager, &row)?;
2✔
176
        } else {
177
            attach_index(&mut db, &pager, row)?;
4✔
178
        }
179
    }
180

181
    db.source_path = Some(path.to_path_buf());
2✔
182
    db.pager = Some(pager);
2✔
183
    Ok(db)
2✔
184
}
185

186
/// Catalog row for a secondary index — deferred until after every table is
187
/// loaded so the index's base table exists by the time we populate it.
188
struct IndexCatalogRow {
189
    name: String,
190
    sql: String,
191
    rootpage: u32,
192
}
193

194
/// Persists `db` to disk. Diff-pager skips writing pages whose bytes
195
/// haven't changed; the [`PageAllocator`] preserves per-table page
196
/// numbers across saves so unchanged tables produce zero dirty frames.
197
///
198
/// Pages that were live before this save but aren't restaged this round
199
/// (e.g., the leaves of a dropped table) move onto a persisted free
200
/// list rooted at `header.freelist_head`; subsequent saves draw from
201
/// the freelist before extending the file. `VACUUM` (see
202
/// [`vacuum_database`]) compacts the file by ignoring the freelist and
203
/// allocating linearly from page 1.
204
///
205
/// [`PageAllocator`]: crate::sql::pager::allocator::PageAllocator
206
pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
2✔
207
    save_database_with_mode(db, path, /*compact=*/ false)
2✔
208
}
209

210
/// Reclaims space by rewriting every live B-Tree contiguously from
211
/// page 1, with no freelist. Equivalent to `save_database` but ignores
212
/// the existing freelist and per-table preferred pools — every page is
213
/// allocated by extending the high-water mark — so the resulting file
214
/// is tightly packed and the freelist is empty.
215
///
216
/// Used by the SQL-level `VACUUM;` statement.
217
pub fn vacuum_database(db: &mut Database, path: &Path) -> Result<()> {
1✔
218
    save_database_with_mode(db, path, /*compact=*/ true)
1✔
219
}
220

221
/// Shared save core. `compact = false` is the normal save path (uses
222
/// the existing freelist + per-table preferred pools). `compact = true`
223
/// is the VACUUM path (empty freelist, empty preferred pools, linear
224
/// allocation from page 1).
225
fn save_database_with_mode(db: &mut Database, path: &Path, compact: bool) -> Result<()> {
2✔
226
    // Phase 7d.3 — rebuild any HNSW index that DELETE / UPDATE-on-vector
227
    // marked dirty. Done up front under the &mut Database borrow we
228
    // already hold, before the immutable iteration loops below need
229
    // their own borrow.
230
    rebuild_dirty_hnsw_indexes(db);
2✔
231
    // Phase 8b — same drill for FTS indexes flagged by DELETE / UPDATE.
232
    rebuild_dirty_fts_indexes(db);
2✔
233

234
    let same_path = db.source_path.as_deref() == Some(path);
2✔
235
    let mut pager = if same_path {
2✔
236
        match db.pager.take() {
2✔
237
            Some(p) => p,
2✔
238
            None if path.exists() => Pager::open(path)?,
4✔
239
            None => Pager::create(path)?,
2✔
240
        }
241
    } else if path.exists() {
3✔
242
        Pager::open(path)?
1✔
243
    } else {
244
        Pager::create(path)?
2✔
245
    };
246

247
    // Snapshot what was live BEFORE we reset staged. Used to compute the
248
    // newly-freed set after staging completes. Page 0 (the header) is
249
    // never on the freelist — it's always live.
250
    let old_header = pager.header();
2✔
251
    let old_live: std::collections::HashSet<u32> = (1..old_header.page_count).collect();
2✔
252

253
    // Read the previously-persisted freelist so its leaf pages can be
254
    // reused as preferred allocations and its trunk pages don't leak.
255
    let (old_free_leaves, old_free_trunks) = if compact || old_header.freelist_head == 0 {
6✔
256
        (Vec::new(), Vec::new())
4✔
257
    } else {
258
        crate::sql::pager::freelist::read_freelist(&pager, old_header.freelist_head)?
2✔
259
    };
260

261
    // Snapshot the previous rootpages of each table/index so we can
262
    // seed per-table preferred pools (the unchanged-table case stages
263
    // byte-identical pages → diff pager skips every write for it).
264
    let old_rootpages = if compact {
2✔
265
        HashMap::new()
2✔
266
    } else {
267
        read_old_rootpages(&pager, old_header.schema_root_page)?
4✔
268
    };
269

270
    // SQLR-1 — snapshot every prior B-Tree's page set NOW, before any
271
    // staging starts. `Pager::read_page` shadows on-disk bytes with the
272
    // current `staged` buffer, so if we deferred these walks until each
273
    // object's turn in the staging loop, a *new* index added in this
274
    // save would extend past the old high-water and overwrite the
275
    // pages of any later-staged object whose old root sits in that
276
    // range — including `sqlrite_master`, which is always staged last.
277
    // The follow-up walk would then read the wrong B-Tree's bytes and
278
    // either hand the allocator a bogus preferred pool or panic
279
    // dispatching cells (a table-cell decoder vs. an index leaf, the
280
    // shape of the original SQLR-1 panic). Walking up front pins each
281
    // map to the committed bytes that were on disk before this save
282
    // touched anything.
283
    let old_preferred_pages: HashMap<(String, String), Vec<u32>> = if compact {
2✔
284
        HashMap::new()
2✔
285
    } else {
286
        let mut map: HashMap<(String, String), Vec<u32>> = HashMap::new();
2✔
287
        for ((kind, name), &root) in &old_rootpages {
6✔
288
            // Tables can carry overflow chains; index/HNSW/FTS leaves
289
            // never overflow in the current encoding, so the cheaper
290
            // walk suffices for them.
291
            let follow = kind == "table";
4✔
292
            let pages = collect_pages_for_btree(&pager, root, follow)?;
2✔
293
            map.insert((kind.clone(), name.clone()), pages);
4✔
294
        }
295
        map
2✔
296
    };
297
    let old_master_pages: Vec<u32> = if compact || old_header.schema_root_page == 0 {
4✔
298
        Vec::new()
2✔
299
    } else {
300
        collect_pages_for_btree(
4✔
301
            &pager,
302
            old_header.schema_root_page,
2✔
303
            /*follow_overflow=*/ true,
304
        )?
305
    };
306

307
    pager.clear_staged();
2✔
308

309
    // Allocator: in normal mode, seed with the old freelist; in compact
310
    // mode, start empty so allocation extends linearly from page 1.
311
    use std::collections::VecDeque;
312
    let initial_freelist: VecDeque<u32> = if compact {
2✔
313
        VecDeque::new()
2✔
314
    } else {
315
        crate::sql::pager::freelist::freelist_to_deque(old_free_leaves.clone())
4✔
316
    };
317
    let mut alloc = crate::sql::pager::allocator::PageAllocator::new(initial_freelist, 1);
2✔
318

319
    // 1. Stage each user table's B-Tree, collecting master-row info.
320
    //    `kind` is "table" or "index" — master has one row per each.
321
    let mut master_rows: Vec<CatalogEntry> = Vec::new();
2✔
322

323
    let mut table_names: Vec<&String> = db.tables.keys().collect();
4✔
324
    table_names.sort();
4✔
325
    for name in table_names {
4✔
326
        if name == MASTER_TABLE_NAME {
4✔
327
            return Err(SQLRiteError::Internal(format!(
×
328
                "user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
329
            )));
330
        }
331
        if !compact {
2✔
332
            if let Some(prev) = old_preferred_pages.get(&("table".to_string(), name.to_string())) {
6✔
333
                alloc.set_preferred(prev.clone());
4✔
334
            }
335
        }
336
        let table = &db.tables[name];
4✔
337
        let rootpage = stage_table_btree(&mut pager, table, &mut alloc)?;
2✔
338
        alloc.finish_preferred();
2✔
339
        master_rows.push(CatalogEntry {
2✔
340
            kind: "table".into(),
2✔
341
            name: name.clone(),
2✔
342
            sql: table_to_create_sql(table),
2✔
343
            rootpage,
344
            last_rowid: table.last_rowid,
2✔
345
        });
346
    }
347

348
    // 2. Stage each secondary index's B-Tree. Indexes persist in a
349
    //    deterministic order: sorted by (owning_table, index_name).
350
    let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
2✔
351
    for table in db.tables.values() {
4✔
352
        for idx in &table.secondary_indexes {
4✔
353
            index_entries.push((table, idx));
2✔
354
        }
355
    }
356
    index_entries
2✔
357
        .sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
4✔
358
    for (_table, idx) in index_entries {
4✔
359
        if !compact {
2✔
360
            if let Some(prev) =
4✔
361
                old_preferred_pages.get(&("index".to_string(), idx.name.to_string()))
362
            {
363
                alloc.set_preferred(prev.clone());
4✔
364
            }
365
        }
366
        let rootpage = stage_index_btree(&mut pager, idx, &mut alloc)?;
4✔
367
        alloc.finish_preferred();
2✔
368
        master_rows.push(CatalogEntry {
2✔
369
            kind: "index".into(),
2✔
370
            name: idx.name.clone(),
2✔
371
            sql: idx.synthesized_sql(),
2✔
372
            rootpage,
373
            last_rowid: 0,
374
        });
375
    }
376

377
    // 2b. Phase 7d.3: persist HNSW indexes as their own cell-encoded
378
    //     page trees, with the rootpage recorded in sqlrite_master.
379
    //     Reopen loads the graph back from cells (fast, exact match)
380
    //     instead of rebuilding from rows.
381
    //
382
    //     Dirty indexes (set by DELETE / UPDATE-on-vector-col) are
383
    //     rebuilt from current rows BEFORE staging, so the on-disk
384
    //     graph reflects the current row set.
385
    let mut hnsw_entries: Vec<(&Table, &crate::sql::db::table::HnswIndexEntry)> = Vec::new();
2✔
386
    for table in db.tables.values() {
4✔
387
        for entry in &table.hnsw_indexes {
4✔
388
            hnsw_entries.push((table, entry));
1✔
389
        }
390
    }
391
    hnsw_entries
2✔
392
        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
2✔
393
    for (table, entry) in hnsw_entries {
4✔
394
        if !compact {
1✔
395
            if let Some(prev) =
3✔
396
                old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
397
            {
398
                alloc.set_preferred(prev.clone());
×
399
            }
400
        }
401
        let rootpage = stage_hnsw_btree(&mut pager, &entry.index, &mut alloc)?;
2✔
402
        alloc.finish_preferred();
1✔
403
        master_rows.push(CatalogEntry {
1✔
404
            kind: "index".into(),
1✔
405
            name: entry.name.clone(),
1✔
406
            sql: format!(
2✔
407
                "CREATE INDEX {} ON {} USING hnsw ({})",
408
                entry.name, table.tb_name, entry.column_name
409
            ),
410
            rootpage,
411
            last_rowid: 0,
412
        });
413
    }
414

415
    // 2c. Phase 8c — persist FTS posting lists as their own
416
    //     cell-encoded page trees, with the rootpage recorded in
417
    //     sqlrite_master. Reopen loads the postings back from cells
418
    //     (fast, exact match) instead of re-tokenizing rows.
419
    //
420
    //     Dirty indexes (set by DELETE / UPDATE-on-text-col) are
421
    //     rebuilt from current rows BEFORE staging by
422
    //     `rebuild_dirty_fts_indexes`, so the on-disk tree reflects
423
    //     the current row set.
424
    let mut fts_entries: Vec<(&Table, &crate::sql::db::table::FtsIndexEntry)> = Vec::new();
2✔
425
    for table in db.tables.values() {
4✔
426
        for entry in &table.fts_indexes {
4✔
427
            fts_entries.push((table, entry));
1✔
428
        }
429
    }
430
    fts_entries
2✔
431
        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
2✔
432
    let any_fts = !fts_entries.is_empty();
2✔
433
    for (table, entry) in fts_entries {
4✔
434
        if !compact {
1✔
435
            if let Some(prev) =
3✔
436
                old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
437
            {
438
                alloc.set_preferred(prev.clone());
×
439
            }
440
        }
441
        let rootpage = stage_fts_btree(&mut pager, &entry.index, &mut alloc)?;
2✔
442
        alloc.finish_preferred();
1✔
443
        master_rows.push(CatalogEntry {
1✔
444
            kind: "index".into(),
1✔
445
            name: entry.name.clone(),
1✔
446
            sql: format!(
2✔
447
                "CREATE INDEX {} ON {} USING fts ({})",
448
                entry.name, table.tb_name, entry.column_name
449
            ),
450
            rootpage,
451
            last_rowid: 0,
452
        });
453
    }
454

455
    // 3. Build an in-memory sqlrite_master with one row per table or index,
456
    //    then stage it via the same tree-build path. Seed master's
457
    //    preferred pool with the previous master tree's pages so the
458
    //    catalog page numbers stay stable across saves whenever the
459
    //    catalog content didn't change.
460
    let mut master = build_empty_master_table();
2✔
461
    for (i, entry) in master_rows.into_iter().enumerate() {
8✔
462
        let rowid = (i as i64) + 1;
4✔
463
        master.restore_row(
2✔
464
            rowid,
465
            vec![
4✔
466
                Some(Value::Text(entry.kind)),
2✔
467
                Some(Value::Text(entry.name)),
2✔
468
                Some(Value::Text(entry.sql)),
2✔
469
                Some(Value::Integer(entry.rootpage as i64)),
2✔
470
                Some(Value::Integer(entry.last_rowid)),
2✔
471
            ],
472
        )?;
473
    }
474
    if !compact && !old_master_pages.is_empty() {
4✔
475
        // Use the page list snapshotted before any staging touched
476
        // disk; re-walking here would read whatever a new index
477
        // already restaged on top of master's old root (SQLR-1).
478
        alloc.set_preferred(old_master_pages.clone());
2✔
479
    }
480
    let master_root = stage_table_btree(&mut pager, &master, &mut alloc)?;
4✔
481
    alloc.finish_preferred();
2✔
482

483
    // 4. Compute newly-freed pages: the previously-live set minus what
484
    //    we just restaged. The previous freelist's trunk pages get
485
    //    re-encoded too — they're in `old_live`, weren't restaged, so
486
    //    the filter naturally moves them to the new freelist.
487
    //
488
    // In `compact` mode (VACUUM), we *discard* newly_freed instead of
489
    // routing it onto the new freelist. The whole point of VACUUM is
490
    // to let the file truncate to the new high-water mark, so any page
491
    // past it gets dropped at the next checkpoint.
492
    if !compact {
2✔
493
        let used = alloc.used().clone();
4✔
494
        let mut newly_freed: Vec<u32> = old_live
495
            .iter()
496
            .copied()
497
            .filter(|p| !used.contains(p))
6✔
498
            .collect();
499
        let _ = &old_free_trunks; // silenced — handled by the old_live filter
500
        alloc.add_to_freelist(newly_freed.drain(..));
4✔
501
    }
502

503
    // 5. Encode the new freelist into trunk pages. `stage_freelist`
504
    //    consumes some of the free pages AS the trunk pages themselves —
505
    //    a trunk is just a free page borrowed for metadata. Pages that
506
    //    were on the freelist but become trunks no longer need to be
507
    //    "extension" pages; the high-water mark from the staging loop
508
    //    above is already correct.
509
    let new_free_pages = alloc.drain_freelist();
2✔
510
    let new_freelist_head =
2✔
511
        crate::sql::pager::freelist::stage_freelist(&mut pager, new_free_pages)?;
512

513
    // 6. Pick the format version. v6 is on demand: only bumps when the
514
    //    new freelist is non-empty. FTS-bearing files keep their v5
515
    //    promotion; v6 is a strict superset (v6 readers handle v4/v5/v6).
516
    use crate::sql::pager::header::{FORMAT_VERSION_V5, FORMAT_VERSION_V6};
517
    let format_version = if new_freelist_head != 0 {
3✔
518
        FORMAT_VERSION_V6
1✔
519
    } else if any_fts {
4✔
520
        // Preserve a v6 file at v6 (don't downgrade) but otherwise
521
        // bump v4 → v5 for FTS like Phase 8c does.
522
        std::cmp::max(FORMAT_VERSION_V5, old_header.format_version)
2✔
523
    } else {
524
        // Preserve whatever the file already was.
525
        old_header.format_version
2✔
526
    };
527

528
    pager.commit(DbHeader {
2✔
529
        page_count: alloc.high_water(),
2✔
530
        schema_root_page: master_root,
531
        format_version,
2✔
532
        freelist_head: new_freelist_head,
533
    })?;
534

535
    if same_path {
4✔
536
        db.pager = Some(pager);
2✔
537
    }
538
    Ok(())
2✔
539
}
540

541
/// Build material for a single row in sqlrite_master.
542
struct CatalogEntry {
543
    kind: String, // "table" or "index"
544
    name: String,
545
    sql: String,
546
    rootpage: u32,
547
    last_rowid: i64,
548
}
549

550
// -------------------------------------------------------------------------
551
// sqlrite_master — hardcoded catalog table schema
552

553
fn build_empty_master_table() -> Table {
2✔
554
    // Phase 3e: `type` is the first column, matching SQLite's convention.
555
    // It distinguishes `'table'` rows from `'index'` rows.
556
    let columns = vec![
4✔
557
        Column::new("type".into(), "text".into(), false, true, false),
4✔
558
        Column::new("name".into(), "text".into(), true, true, true),
4✔
559
        Column::new("sql".into(), "text".into(), false, true, false),
4✔
560
        Column::new("rootpage".into(), "integer".into(), false, true, false),
4✔
561
        Column::new("last_rowid".into(), "integer".into(), false, true, false),
4✔
562
    ];
563
    build_empty_table(MASTER_TABLE_NAME, columns, 0)
2✔
564
}
565

566
/// Reads a required Text column from a known-good catalog row.
567
fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
2✔
568
    match table.get_value(col, rowid) {
2✔
569
        Some(Value::Text(s)) => Ok(s),
2✔
570
        other => Err(SQLRiteError::Internal(format!(
×
571
            "sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
572
        ))),
573
    }
574
}
575

576
/// Reads a required Integer column from a known-good catalog row.
577
fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
2✔
578
    match table.get_value(col, rowid) {
2✔
579
        Some(Value::Integer(v)) => Ok(v),
2✔
580
        other => Err(SQLRiteError::Internal(format!(
×
581
            "sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
582
        ))),
583
    }
584
}
585

586
// -------------------------------------------------------------------------
587
// CREATE-TABLE SQL synthesis and re-parsing
588

589
/// Synthesizes a CREATE TABLE SQL string that recreates the table's schema.
590
/// Deterministic: same schema → same SQL, so diffing commits stay stable.
591
fn table_to_create_sql(table: &Table) -> String {
2✔
592
    let mut parts = Vec::with_capacity(table.columns.len());
2✔
593
    for c in &table.columns {
4✔
594
        // Render the SQL type literally so the round-trip through
595
        // CREATE TABLE re-parsing recreates the same schema. Vector
596
        // carries its dimension inline.
597
        let ty: String = match &c.datatype {
2✔
598
            DataType::Integer => "INTEGER".to_string(),
4✔
599
            DataType::Text => "TEXT".to_string(),
4✔
600
            DataType::Real => "REAL".to_string(),
×
601
            DataType::Bool => "BOOLEAN".to_string(),
×
602
            DataType::Vector(dim) => format!("VECTOR({dim})"),
2✔
603
            DataType::Json => "JSON".to_string(),
2✔
604
            DataType::None | DataType::Invalid => "TEXT".to_string(),
×
605
        };
606
        let mut piece = format!("{} {}", c.column_name, ty);
4✔
607
        if c.is_pk {
2✔
608
            piece.push_str(" PRIMARY KEY");
4✔
609
        } else {
610
            if c.is_unique {
2✔
611
                piece.push_str(" UNIQUE");
2✔
612
            }
613
            if c.not_null {
2✔
614
                piece.push_str(" NOT NULL");
2✔
615
            }
616
        }
617
        if let Some(default) = &c.default {
3✔
618
            piece.push_str(" DEFAULT ");
1✔
619
            piece.push_str(&render_default_literal(default));
1✔
620
        }
621
        parts.push(piece);
2✔
622
    }
623
    format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
2✔
624
}
625

626
/// Renders a DEFAULT value back to SQL-literal form so the synthesized
627
/// CREATE TABLE round-trips through `parse_create_sql`. Text values get
628
/// single-quoted with single-quote doubling for escaping. Vector defaults
629
/// are not currently expressible at CREATE TABLE time, so we render them
630
/// as their bracket-array form (matches the INSERT literal grammar).
631
fn render_default_literal(value: &Value) -> String {
1✔
632
    match value {
1✔
633
        Value::Integer(i) => i.to_string(),
1✔
634
        Value::Real(f) => f.to_string(),
×
635
        Value::Bool(b) => {
×
636
            if *b {
×
637
                "TRUE".to_string()
×
638
            } else {
639
                "FALSE".to_string()
×
640
            }
641
        }
642
        Value::Text(s) => format!("'{}'", s.replace('\'', "''")),
1✔
643
        Value::Null => "NULL".to_string(),
×
644
        Value::Vector(_) => value.to_display_string(),
×
645
    }
646
}
647

648
/// Reverses `table_to_create_sql`: feeds the SQL back through `sqlparser`
649
/// and produces our internal column list. Returns `(table_name, columns)`.
650
fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
2✔
651
    let dialect = SQLiteDialect {};
2✔
652
    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
2✔
653
    let stmt = ast.pop().ok_or_else(|| {
4✔
654
        SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
×
655
    })?;
656
    let create = CreateQuery::new(&stmt)?;
4✔
657
    let columns = create
2✔
658
        .columns
659
        .into_iter()
660
        .map(|pc| {
4✔
661
            Column::with_default(
2✔
662
                pc.name,
2✔
663
                pc.datatype,
2✔
664
                pc.is_pk,
2✔
665
                pc.not_null,
2✔
666
                pc.is_unique,
2✔
667
                pc.default,
2✔
668
            )
669
        })
670
        .collect();
671
    Ok((create.table_name, columns))
2✔
672
}
673

674
// -------------------------------------------------------------------------
675
// In-memory table (re)construction
676

677
/// Builds an empty in-memory `Table` given the declared columns.
678
fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
2✔
679
    let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
4✔
680
    let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
2✔
681
    {
682
        let mut map = rows.lock().expect("rows mutex poisoned");
4✔
683
        for col in &columns {
6✔
684
            // Mirror the dispatch in `Table::new` so the reconstructed
685
            // table has the same shape it'd have if it were built fresh
686
            // from SQL. Phase 7a adds the Vector arm — without it,
687
            // VECTOR columns silently restore as Row::None and every
688
            // restore_row hits a "storage None vs value Some(Vector(...))"
689
            // type mismatch.
690
            let row = match &col.datatype {
2✔
691
                DataType::Integer => Row::Integer(BTreeMap::new()),
4✔
692
                DataType::Text => Row::Text(BTreeMap::new()),
4✔
693
                DataType::Real => Row::Real(BTreeMap::new()),
×
694
                DataType::Bool => Row::Bool(BTreeMap::new()),
×
695
                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
2✔
696
                // JSON columns reuse Text storage — see Table::new and
697
                // Phase 7e's scope-correction note.
698
                DataType::Json => Row::Text(BTreeMap::new()),
2✔
699
                DataType::None | DataType::Invalid => Row::None,
×
700
            };
701
            map.insert(col.column_name.clone(), row);
4✔
702

703
            // Auto-create UNIQUE/PK indexes so the restored table has the
704
            // same shape Table::new would have built from fresh SQL.
705
            if (col.is_pk || col.is_unique)
2✔
706
                && matches!(col.datatype, DataType::Integer | DataType::Text)
2✔
707
            {
708
                if let Ok(idx) = SecondaryIndex::new(
709
                    SecondaryIndex::auto_name(name, &col.column_name),
2✔
710
                    name.to_string(),
4✔
711
                    col.column_name.clone(),
2✔
712
                    &col.datatype,
713
                    true,
714
                    IndexOrigin::Auto,
715
                ) {
716
                    secondary_indexes.push(idx);
2✔
717
                }
718
            }
719
        }
720
    }
721

722
    let primary_key = columns
4✔
723
        .iter()
724
        .find(|c| c.is_pk)
6✔
725
        .map(|c| c.column_name.clone())
6✔
726
        .unwrap_or_else(|| "-1".to_string());
2✔
727

728
    Table {
729
        tb_name: name.to_string(),
2✔
730
        columns,
731
        rows,
732
        secondary_indexes,
733
        // HNSW indexes (Phase 7d.2) are reconstructed on open by re-
734
        // executing each `CREATE INDEX … USING hnsw` SQL stored in
735
        // `sqlrite_master`. This builder produces the empty shell;
736
        // `replay_create_index_for_hnsw` (in this same module) walks
737
        // sqlrite_master after every table is loaded and rebuilds the
738
        // graph from current row data. Persistence of the graph itself
739
        // (avoiding the on-open rebuild cost) is Phase 7d.3.
740
        hnsw_indexes: Vec::new(),
2✔
741
        // FTS indexes (Phase 8b) follow the same pattern — the
742
        // CREATE INDEX … USING fts SQL is the source of truth on open
743
        // and the in-memory posting list gets rebuilt from current
744
        // rows. Cell-encoded persistence of the postings is Phase 8c.
745
        fts_indexes: Vec::new(),
2✔
746
        last_rowid,
747
        primary_key,
748
    }
749
}
750

751
// -------------------------------------------------------------------------
752
// Leaf-chain read / write
753

754
/// Walks a table's B-Tree from `root_page`, following the leftmost-child
755
/// chain down to the first leaf, then iterating leaves via their sibling
756
/// `next_page` pointers. Every cell is decoded and replayed into `table`.
757
///
758
/// Open-path note: we eagerly materialize the entire table into `Table`'s
759
/// in-memory maps. Phase 5 will introduce a `Cursor` that hits the pager
760
/// on demand so queries can stream through the tree without a full upfront
761
/// load.
762
/// Re-parses `CREATE INDEX` SQL from sqlrite_master and restores the
763
/// index on its base table by walking the tree of index cells at
764
/// `rootpage`. The base table is expected to already be in `db.tables`.
765
fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
2✔
766
    let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
4✔
767

768
    let table = db.get_table_mut(table_name.clone()).map_err(|_| {
4✔
769
        SQLRiteError::Internal(format!(
×
770
            "index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
771
            row.name
772
        ))
773
    })?;
774
    let datatype = table
6✔
775
        .columns
776
        .iter()
2✔
777
        .find(|c| c.column_name == column_name)
6✔
778
        .map(|c| clone_datatype(&c.datatype))
6✔
779
        .ok_or_else(|| {
2✔
780
            SQLRiteError::Internal(format!(
×
781
                "index '{}' references unknown column '{column_name}' on '{table_name}'",
782
                row.name
783
            ))
784
        })?;
785

786
    // An auto-index on this column may already exist (built by
787
    // build_empty_table for UNIQUE/PK columns). If the names match, reuse
788
    // the slot instead of adding a duplicate entry.
789
    let existing_slot = table
6✔
790
        .secondary_indexes
791
        .iter()
792
        .position(|i| i.name == row.name);
6✔
793
    let idx = match existing_slot {
2✔
794
        Some(i) => {
2✔
795
            // Drain any entries that may have been populated during table
796
            // restore_row calls — we're about to repopulate from the
797
            // persisted tree.
798
            table.secondary_indexes.remove(i)
4✔
799
        }
800
        None => SecondaryIndex::new(
2✔
801
            row.name.clone(),
1✔
802
            table_name.clone(),
2✔
803
            column_name.clone(),
1✔
804
            &datatype,
805
            is_unique,
806
            IndexOrigin::Explicit,
807
        )?,
808
    };
809
    let mut idx = idx;
2✔
810
    // Wipe any stale entries from the auto path so the load is idempotent.
811
    let is_unique_flag = idx.is_unique;
2✔
812
    let origin = idx.origin;
2✔
813
    idx = SecondaryIndex::new(
6✔
814
        idx.name,
2✔
815
        idx.table_name,
2✔
816
        idx.column_name,
2✔
817
        &datatype,
818
        is_unique_flag,
819
        origin,
820
    )?;
821

822
    // Populate from the index tree's cells.
823
    load_index_rows(pager, &mut idx, row.rootpage)?;
2✔
824

825
    table.secondary_indexes.push(idx);
2✔
826
    Ok(())
2✔
827
}
828

829
/// Walks the leaves of an index B-Tree rooted at `root_page` and inserts
830
/// every `(value, rowid)` pair into `idx`.
831
fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
2✔
832
    if root_page == 0 {
2✔
833
        return Ok(());
×
834
    }
835
    let first_leaf = find_leftmost_leaf(pager, root_page)?;
2✔
836
    let mut current = first_leaf;
2✔
837
    while current != 0 {
2✔
838
        let page_buf = pager
2✔
839
            .read_page(current)
2✔
840
            .ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
2✔
841
        if page_buf[0] != PageType::TableLeaf as u8 {
2✔
842
            return Err(SQLRiteError::Internal(format!(
×
843
                "page {current} tagged {} but expected TableLeaf (index)",
844
                page_buf[0]
845
            )));
846
        }
847
        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
2✔
848
        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
4✔
849
            .try_into()
2✔
850
            .map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
2✔
851
        let leaf = TablePage::from_bytes(payload);
2✔
852

853
        for slot in 0..leaf.slot_count() {
4✔
854
            // Slots on an index page hold KIND_INDEX cells; decode directly.
855
            let offset = leaf.slot_offset_raw(slot)?;
4✔
856
            let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
2✔
857
            idx.insert(&ic.value, ic.rowid)?;
4✔
858
        }
859
        current = next_leaf;
2✔
860
    }
861
    Ok(())
2✔
862
}
863

864
/// Minimal recognizer for the synthesized-or-user `CREATE INDEX` SQL we
865
/// store in sqlrite_master. Returns `(table_name, column_name, is_unique)`.
866
///
867
/// Uses sqlparser so user-supplied SQL with extra whitespace, case, etc.
868
/// still works; the only shape we accept is single-column indexes.
869
fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
2✔
870
    use sqlparser::ast::{CreateIndex, Expr, Statement};
871

872
    let dialect = SQLiteDialect {};
2✔
873
    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
2✔
874
    let Some(Statement::CreateIndex(CreateIndex {
4✔
875
        table_name,
2✔
876
        columns,
2✔
877
        unique,
2✔
878
        ..
879
    })) = ast.pop()
6✔
880
    else {
881
        return Err(SQLRiteError::Internal(format!(
×
882
            "sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
883
        )));
884
    };
885
    if columns.len() != 1 {
4✔
886
        return Err(SQLRiteError::NotImplemented(
×
887
            "multi-column indexes aren't supported yet".to_string(),
×
888
        ));
889
    }
890
    let col = match &columns[0].column.expr {
4✔
891
        Expr::Identifier(ident) => ident.value.clone(),
4✔
892
        Expr::CompoundIdentifier(parts) => {
×
893
            parts.last().map(|p| p.value.clone()).unwrap_or_default()
×
894
        }
895
        other => {
×
896
            return Err(SQLRiteError::Internal(format!(
×
897
                "unsupported indexed column expression: {other:?}"
898
            )));
899
        }
900
    };
901
    Ok((table_name.to_string(), col, unique))
4✔
902
}
903

904
/// True iff a CREATE INDEX SQL string uses `USING hnsw` (case-insensitive).
905
/// Used by the open path to route HNSW indexes to the graph-rebuild path
906
/// instead of the standard B-Tree cell-load. Pre-Phase-7d.2 indexes
907
/// don't have a USING clause, so they all return false and continue
908
/// taking the existing path.
909
fn create_index_sql_uses_hnsw(sql: &str) -> bool {
2✔
910
    use sqlparser::ast::{CreateIndex, IndexType, Statement};
911

912
    let dialect = SQLiteDialect {};
2✔
913
    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
4✔
914
        return false;
×
915
    };
916
    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
6✔
917
        return false;
×
918
    };
919
    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw"))
3✔
920
}
921

922
/// Phase 8b — peeks at a CREATE INDEX SQL to detect `USING fts(...)`.
923
/// Mirrors [`create_index_sql_uses_hnsw`].
924
fn create_index_sql_uses_fts(sql: &str) -> bool {
2✔
925
    use sqlparser::ast::{CreateIndex, IndexType, Statement};
926

927
    let dialect = SQLiteDialect {};
2✔
928
    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
4✔
929
        return false;
×
930
    };
931
    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
6✔
932
        return false;
×
933
    };
934
    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts"))
3✔
935
}
936

937
/// Phase 8c — loads (or rebuilds) an FTS index on database open. Two
938
/// paths mirror [`rebuild_hnsw_index`]:
939
///
940
///   - **rootpage != 0** (Phase 8c default): the posting list is
941
///     persisted as cell-encoded pages. Read every cell directly via
942
///     [`load_fts_postings`] and reconstruct the index — no
943
///     re-tokenization, exact bit-for-bit reproduction.
944
///
945
///   - **rootpage == 0** (compatibility): no on-disk postings, e.g.
946
///     for files saved by Phase 8b before persistence landed. Replay
947
///     the CREATE INDEX SQL through `execute_create_index`, which
948
///     walks the table's current rows and tokenizes them fresh.
949
fn rebuild_fts_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
1✔
950
    use crate::sql::db::table::FtsIndexEntry;
951
    use crate::sql::executor::execute_create_index;
952
    use crate::sql::fts::PostingList;
953
    use sqlparser::ast::Statement;
954

955
    let dialect = SQLiteDialect {};
1✔
956
    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
1✔
957
    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
3✔
958
        return Err(SQLRiteError::Internal(format!(
×
959
            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {}",
960
            row.sql
961
        )));
962
    };
963

964
    if row.rootpage == 0 {
1✔
965
        // Compatibility path — no persisted postings; replay rows.
966
        execute_create_index(&stmt, db)?;
×
967
        return Ok(());
×
968
    }
969

970
    let (doc_lengths, postings) = load_fts_postings(pager, row.rootpage)?;
2✔
971
    let index = PostingList::from_persisted_postings(doc_lengths, postings);
2✔
972
    let (tbl_name, col_name) = parse_fts_create_index_sql(&row.sql)?;
2✔
973
    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
2✔
974
        SQLRiteError::Internal(format!(
×
975
            "FTS index '{}' references unknown table '{tbl_name}'",
976
            row.name
977
        ))
978
    })?;
979
    table_mut.fts_indexes.push(FtsIndexEntry {
2✔
980
        name: row.name.clone(),
1✔
981
        column_name: col_name,
1✔
982
        index,
1✔
983
        needs_rebuild: false,
984
    });
985
    Ok(())
1✔
986
}
987

988
/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING fts(col)`
989
/// SQL string. Same shape as `parse_hnsw_create_index_sql`.
990
fn parse_fts_create_index_sql(sql: &str) -> Result<(String, String)> {
1✔
991
    use sqlparser::ast::{CreateIndex, Expr, Statement};
992

993
    let dialect = SQLiteDialect {};
1✔
994
    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1✔
995
    let Some(Statement::CreateIndex(CreateIndex {
2✔
996
        table_name,
1✔
997
        columns,
1✔
998
        ..
999
    })) = ast.pop()
3✔
1000
    else {
1001
        return Err(SQLRiteError::Internal(format!(
×
1002
            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {sql}"
1003
        )));
1004
    };
1005
    if columns.len() != 1 {
2✔
1006
        return Err(SQLRiteError::NotImplemented(
×
1007
            "multi-column FTS indexes aren't supported yet".to_string(),
×
1008
        ));
1009
    }
1010
    let col = match &columns[0].column.expr {
2✔
1011
        Expr::Identifier(ident) => ident.value.clone(),
2✔
1012
        Expr::CompoundIdentifier(parts) => {
×
1013
            parts.last().map(|p| p.value.clone()).unwrap_or_default()
×
1014
        }
1015
        other => {
×
1016
            return Err(SQLRiteError::Internal(format!(
×
1017
                "FTS CREATE INDEX has unexpected column expr: {other:?}"
1018
            )));
1019
        }
1020
    };
1021
    Ok((table_name.to_string(), col))
2✔
1022
}
1023

1024
/// Loads (or rebuilds) an HNSW index on database open. Two paths:
1025
///
1026
///   - **rootpage != 0** (Phase 7d.3 default): the graph is persisted
1027
///     as cell-encoded pages. Read every node directly via
1028
///     `load_hnsw_nodes` and reconstruct the index — fast, zero
1029
///     algorithm runs, exact bit-for-bit reproduction of what was saved.
1030
///
1031
///   - **rootpage == 0** (compatibility): no on-disk graph, e.g. for
1032
///     files saved by Phase 7d.2 before persistence landed. Replay the
1033
///     CREATE INDEX SQL through `execute_create_index`, which walks the
1034
///     table's current rows and populates a fresh graph. Slower but
1035
///     correctness-equivalent on the first save with the new code.
1036
fn rebuild_hnsw_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
1✔
1037
    use crate::sql::db::table::HnswIndexEntry;
1038
    use crate::sql::executor::execute_create_index;
1039
    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
1040
    use sqlparser::ast::Statement;
1041

1042
    let dialect = SQLiteDialect {};
1✔
1043
    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
1✔
1044
    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
3✔
1045
        return Err(SQLRiteError::Internal(format!(
×
1046
            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {}",
1047
            row.sql
1048
        )));
1049
    };
1050

1051
    if row.rootpage == 0 {
1✔
1052
        // Compatibility path — no persisted graph; walk current rows.
1053
        execute_create_index(&stmt, db)?;
×
1054
        return Ok(());
×
1055
    }
1056

1057
    // Persistence path — read the cell tree, deserialize.
1058
    let nodes = load_hnsw_nodes(pager, row.rootpage)?;
2✔
1059
    let index = HnswIndex::from_persisted_nodes(DistanceMetric::L2, 0xC0FFEE, nodes);
2✔
1060

1061
    // Parse the CREATE INDEX to know which table + column to attach to
1062
    // — same shape as the row-walk path; we just don't execute it.
1063
    let (tbl_name, col_name) = parse_hnsw_create_index_sql(&row.sql)?;
2✔
1064
    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
3✔
1065
        SQLRiteError::Internal(format!(
×
1066
            "HNSW index '{}' references unknown table '{tbl_name}'",
1067
            row.name
1068
        ))
1069
    })?;
1070
    table_mut.hnsw_indexes.push(HnswIndexEntry {
2✔
1071
        name: row.name.clone(),
1✔
1072
        column_name: col_name,
1✔
1073
        index,
1✔
1074
        needs_rebuild: false,
1075
    });
1076
    Ok(())
1✔
1077
}
1078

1079
/// Phase 7d.3 — Phase-7d.3-side helper: walk every leaf in the HNSW
1080
/// page tree at `root_page` and decode each cell as a node. Returns
1081
/// the (node_id, layers) tuples in slot-order (already ascending by
1082
/// node_id since they were staged that way). The caller hands them to
1083
/// `HnswIndex::from_persisted_nodes`.
1084
fn load_hnsw_nodes(pager: &Pager, root_page: u32) -> Result<Vec<(i64, Vec<Vec<i64>>)>> {
1✔
1085
    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1086

1087
    let mut nodes: Vec<(i64, Vec<Vec<i64>>)> = Vec::new();
1✔
1088
    let first_leaf = find_leftmost_leaf(pager, root_page)?;
2✔
1089
    let mut current = first_leaf;
1✔
1090
    while current != 0 {
1✔
1091
        let page_buf = pager
1✔
1092
            .read_page(current)
1✔
1093
            .ok_or_else(|| SQLRiteError::Internal(format!("missing HNSW leaf page {current}")))?;
1✔
1094
        if page_buf[0] != PageType::TableLeaf as u8 {
1✔
1095
            return Err(SQLRiteError::Internal(format!(
×
1096
                "page {current} tagged {} but expected TableLeaf (HNSW)",
1097
                page_buf[0]
×
1098
            )));
1099
        }
1100
        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
2✔
1101
        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
2✔
1102
            .try_into()
1✔
1103
            .map_err(|_| SQLRiteError::Internal("HNSW leaf payload size".to_string()))?;
1✔
1104
        let leaf = TablePage::from_bytes(payload);
1✔
1105
        for slot in 0..leaf.slot_count() {
3✔
1106
            let offset = leaf.slot_offset_raw(slot)?;
2✔
1107
            let (cell, _) = HnswNodeCell::decode(leaf.as_bytes(), offset)?;
1✔
1108
            nodes.push((cell.node_id, cell.layers));
1✔
1109
        }
1110
        current = next_leaf;
1✔
1111
    }
1112
    Ok(nodes)
1✔
1113
}
1114

1115
/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING hnsw (col)`
1116
/// SQL string. Used by the persistence path on open to know where to
1117
/// attach the loaded graph. Same shape as `parse_create_index_sql` for
1118
/// regular indexes — only the assertion differs (we don't care about
1119
/// UNIQUE for HNSW).
1120
fn parse_hnsw_create_index_sql(sql: &str) -> Result<(String, String)> {
1✔
1121
    use sqlparser::ast::{CreateIndex, Expr, Statement};
1122

1123
    let dialect = SQLiteDialect {};
1✔
1124
    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1✔
1125
    let Some(Statement::CreateIndex(CreateIndex {
2✔
1126
        table_name,
1✔
1127
        columns,
1✔
1128
        ..
1129
    })) = ast.pop()
3✔
1130
    else {
1131
        return Err(SQLRiteError::Internal(format!(
×
1132
            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {sql}"
1133
        )));
1134
    };
1135
    if columns.len() != 1 {
2✔
1136
        return Err(SQLRiteError::NotImplemented(
×
1137
            "multi-column HNSW indexes aren't supported yet".to_string(),
×
1138
        ));
1139
    }
1140
    let col = match &columns[0].column.expr {
2✔
1141
        Expr::Identifier(ident) => ident.value.clone(),
2✔
1142
        Expr::CompoundIdentifier(parts) => {
×
1143
            parts.last().map(|p| p.value.clone()).unwrap_or_default()
×
1144
        }
1145
        other => {
×
1146
            return Err(SQLRiteError::Internal(format!(
×
1147
                "unsupported HNSW indexed column expression: {other:?}"
1148
            )));
1149
        }
1150
    };
1151
    Ok((table_name.to_string(), col))
2✔
1152
}
1153

1154
/// Phase 7d.3 — rebuilds in-place any HnswIndexEntry whose
1155
/// `needs_rebuild` flag is set (DELETE / UPDATE-on-vector marked it).
1156
/// Walks the table's current Vec<f32> column storage and runs the
1157
/// HNSW algorithm fresh. Called at the top of `save_database` before
1158
/// any immutable borrows of `db` start.
1159
///
1160
/// Cost: O(N · ef_construction · log N) per dirty index. Fine for
1161
/// small tables, expensive for ≥100k-row tables — matches the
1162
/// trade-off SQLite makes for FTS5: dirtying-and-rebuilding is the
1163
/// MVP, more sophisticated incremental delete strategies (soft-delete
1164
/// + tombstones, neighbor reconnection) are future polish.
1165
fn rebuild_dirty_hnsw_indexes(db: &mut Database) {
2✔
1166
    use crate::sql::hnsw::{DistanceMetric, HnswIndex};
1167

1168
    for table in db.tables.values_mut() {
5✔
1169
        // Snapshot which (index_name, column) pairs need rebuilding,
1170
        // before we go grabbing column data — keeps the borrow
1171
        // structure simple.
1172
        let dirty: Vec<(String, String)> = table
4✔
1173
            .hnsw_indexes
1174
            .iter()
1175
            .filter(|e| e.needs_rebuild)
4✔
1176
            .map(|e| (e.name.clone(), e.column_name.clone()))
4✔
1177
            .collect();
1178
        if dirty.is_empty() {
4✔
1179
            continue;
1180
        }
1181

1182
        for (idx_name, col_name) in dirty {
3✔
1183
            // Snapshot every (rowid, vec) for this column.
1184
            let mut vectors: Vec<(i64, Vec<f32>)> = Vec::new();
1✔
1185
            {
1186
                let row_data = table.rows.lock().expect("rows mutex poisoned");
2✔
1187
                if let Some(Row::Vector(map)) = row_data.get(&col_name) {
3✔
1188
                    for (id, v) in map.iter() {
1✔
1189
                        vectors.push((*id, v.clone()));
1✔
1190
                    }
1191
                }
1192
            }
1193
            // Pre-build a HashMap for the get_vec closure so we don't
1194
            // pay O(N) lookup per insert call.
1195
            let snapshot: std::collections::HashMap<i64, Vec<f32>> =
1✔
1196
                vectors.iter().cloned().collect();
1197

1198
            let mut new_idx = HnswIndex::new(DistanceMetric::L2, 0xC0FFEE);
2✔
1199
            // Sort by id so the rebuild is deterministic across runs.
1200
            vectors.sort_by_key(|(id, _)| *id);
4✔
1201
            for (id, v) in &vectors {
1✔
1202
                new_idx.insert(*id, v, |q| snapshot.get(&q).cloned().unwrap_or_default());
4✔
1203
            }
1204

1205
            // Replace the entry's index + clear the dirty flag.
1206
            if let Some(entry) = table.hnsw_indexes.iter_mut().find(|e| e.name == idx_name) {
4✔
1207
                entry.index = new_idx;
1✔
1208
                entry.needs_rebuild = false;
1✔
1209
            }
1210
        }
1211
    }
1212
}
1213

1214
/// Phase 8b — rebuild every FTS index a DELETE / UPDATE-on-text-col
1215
/// marked dirty. Mirrors [`rebuild_dirty_hnsw_indexes`]; runs at save
1216
/// time under `&mut Database`. Cheap on a clean DB (the `dirty` snapshot
1217
/// is empty so the per-table loop short-circuits).
1218
fn rebuild_dirty_fts_indexes(db: &mut Database) {
2✔
1219
    use crate::sql::fts::PostingList;
1220

1221
    for table in db.tables.values_mut() {
5✔
1222
        let dirty: Vec<(String, String)> = table
4✔
1223
            .fts_indexes
1224
            .iter()
1225
            .filter(|e| e.needs_rebuild)
4✔
1226
            .map(|e| (e.name.clone(), e.column_name.clone()))
4✔
1227
            .collect();
1228
        if dirty.is_empty() {
4✔
1229
            continue;
1230
        }
1231

1232
        for (idx_name, col_name) in dirty {
3✔
1233
            // Snapshot every (rowid, text) pair for this column under
1234
            // the row mutex, then drop the lock before re-tokenizing.
1235
            let mut docs: Vec<(i64, String)> = Vec::new();
1✔
1236
            {
1237
                let row_data = table.rows.lock().expect("rows mutex poisoned");
2✔
1238
                if let Some(Row::Text(map)) = row_data.get(&col_name) {
3✔
1239
                    for (id, v) in map.iter() {
1✔
1240
                        // "Null" sentinel is the parser's
1241
                        // null-marker for TEXT cells; skip those —
1242
                        // they'd round-trip as the literal string
1243
                        // "Null" otherwise. Aligns with insert_row's
1244
                        // typed_value gate.
1245
                        if v != "Null" {
1✔
1246
                            docs.push((*id, v.clone()));
1✔
1247
                        }
1248
                    }
1249
                }
1250
            }
1251

1252
            let mut new_idx = PostingList::new();
1✔
1253
            // Sort by id so the rebuild is deterministic across runs
1254
            // (the BTreeMap inside PostingList is order-stable, but
1255
            // doc-length aggregation order doesn't matter — sorting
1256
            // here is purely for reproducibility on inspection).
1257
            docs.sort_by_key(|(id, _)| *id);
4✔
1258
            for (id, text) in &docs {
1✔
1259
                new_idx.insert(*id, text);
2✔
1260
            }
1261

1262
            if let Some(entry) = table.fts_indexes.iter_mut().find(|e| e.name == idx_name) {
4✔
1263
                entry.index = new_idx;
1✔
1264
                entry.needs_rebuild = false;
1✔
1265
            }
1266
        }
1267
    }
1268
}
1269

1270
/// Cheap clone helper — `DataType` doesn't derive `Clone` elsewhere.
1271
fn clone_datatype(dt: &DataType) -> DataType {
2✔
1272
    match dt {
2✔
1273
        DataType::Integer => DataType::Integer,
2✔
1274
        DataType::Text => DataType::Text,
1✔
1275
        DataType::Real => DataType::Real,
×
1276
        DataType::Bool => DataType::Bool,
×
1277
        DataType::Vector(dim) => DataType::Vector(*dim),
×
1278
        DataType::Json => DataType::Json,
×
1279
        DataType::None => DataType::None,
×
1280
        DataType::Invalid => DataType::Invalid,
×
1281
    }
1282
}
1283

1284
/// Stages an index's B-Tree at `start_page`. Each leaf cell is a
1285
/// `KIND_INDEX` entry carrying `(original_rowid, value)`. Returns
1286
/// `(root_page, next_free_page)`.
1287
///
1288
/// The tree's shape matches a regular table's — leaves chained via
1289
/// `next_page`, optional interior layer above. `Cell::peek_rowid` works
1290
/// uniformly for index cells (same prefix as local cells), so the
1291
/// existing slot directory and binary search carry over.
1292
fn stage_index_btree(
2✔
1293
    pager: &mut Pager,
1294
    idx: &SecondaryIndex,
1295
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1296
) -> Result<u32> {
1297
    // Build the leaves.
1298
    let leaves = stage_index_leaves(pager, idx, alloc)?;
2✔
1299
    if leaves.len() == 1 {
4✔
1300
        return Ok(leaves[0].0);
4✔
1301
    }
1302
    let mut level: Vec<(u32, i64)> = leaves;
1✔
1303
    while level.len() > 1 {
4✔
1304
        level = stage_interior_level(pager, &level, alloc)?;
2✔
1305
    }
1306
    Ok(level[0].0)
2✔
1307
}
1308

1309
/// Packs the index's (value, rowid) entries into a sibling-chained run
1310
/// of `TableLeaf` pages. Iteration order matches `SecondaryIndex::iter_entries`
1311
/// (ascending value; rowids in insertion order within a value), which is
1312
/// also ascending by the "cell rowid" carried in each IndexCell (the
1313
/// original row's rowid) — so Cell::peek_rowid + the slot directory's
1314
/// rowid ordering stays consistent.
1315
fn stage_index_leaves(
2✔
1316
    pager: &mut Pager,
1317
    idx: &SecondaryIndex,
1318
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1319
) -> Result<Vec<(u32, i64)>> {
1320
    let mut leaves: Vec<(u32, i64)> = Vec::new();
2✔
1321
    let mut current_leaf = TablePage::empty();
4✔
1322
    let mut current_leaf_page = alloc.allocate();
4✔
1323
    let mut current_max_rowid: Option<i64> = None;
2✔
1324

1325
    // Sort the entries by original rowid so the in-page slot directory,
1326
    // which binary-searches by rowid, stays valid. (iter_entries orders by
1327
    // value; we reorder here for B-Tree correctness.)
1328
    let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
2✔
1329
    entries.sort_by_key(|(_, r)| *r);
6✔
1330

1331
    for (value, rowid) in entries {
4✔
1332
        let cell = IndexCell::new(rowid, value);
2✔
1333
        let entry_bytes = cell.encode()?;
4✔
1334

1335
        if !current_leaf.would_fit(entry_bytes.len()) {
4✔
1336
            let next_leaf_page_num = alloc.allocate();
2✔
1337
            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1✔
1338
            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1✔
1339
            current_leaf = TablePage::empty();
1✔
1340
            current_leaf_page = next_leaf_page_num;
1✔
1341

1342
            if !current_leaf.would_fit(entry_bytes.len()) {
1✔
1343
                return Err(SQLRiteError::Internal(format!(
×
1344
                    "index entry of {} bytes exceeds empty-page capacity {}",
1345
                    entry_bytes.len(),
×
1346
                    current_leaf.free_space()
×
1347
                )));
1348
            }
1349
        }
1350
        current_leaf.insert_entry(rowid, &entry_bytes)?;
4✔
1351
        current_max_rowid = Some(rowid);
2✔
1352
    }
1353

1354
    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
2✔
1355
    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
2✔
1356
    Ok(leaves)
2✔
1357
}
1358

1359
/// Phase 7d.3 — stages an HNSW index's page tree at `start_page`.
1360
/// Each leaf cell is a `KIND_HNSW` entry carrying one node's
1361
/// (node_id, layers). Returns `(root_page, next_free_page)`.
1362
///
1363
/// Tree shape is identical to `stage_index_btree` — chained leaves +
1364
/// optional interior layers. The slot directory binary-searches by
1365
/// node_id (which is the cell's "rowid" in `Cell::peek_rowid` terms),
1366
/// so reads can locate any node in O(log N) once 7d.4-or-later
1367
/// optimizes the load path to lazy-fetch instead of read-all.
1368
/// Today, `load_hnsw_nodes` reads the entire tree on open.
1369
fn stage_hnsw_btree(
1✔
1370
    pager: &mut Pager,
1371
    idx: &crate::sql::hnsw::HnswIndex,
1372
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1373
) -> Result<u32> {
1374
    let leaves = stage_hnsw_leaves(pager, idx, alloc)?;
1✔
1375
    if leaves.len() == 1 {
2✔
1376
        return Ok(leaves[0].0);
2✔
1377
    }
1378
    let mut level: Vec<(u32, i64)> = leaves;
×
1379
    while level.len() > 1 {
×
1380
        level = stage_interior_level(pager, &level, alloc)?;
×
1381
    }
1382
    Ok(level[0].0)
×
1383
}
1384

1385
/// Phase 8c — stage one FTS index as a `TableLeaf`-shaped B-Tree.
1386
/// Mirrors `stage_hnsw_btree` (sibling-chained leaves, optional interior
1387
/// levels). Returns `(root_page, next_free_page)`. Each leaf is filled
1388
/// with `KIND_FTS_POSTING` cells: one sidecar cell holding the
1389
/// doc-lengths map, then one cell per term in lexicographic order.
1390
fn stage_fts_btree(
1✔
1391
    pager: &mut Pager,
1392
    idx: &crate::sql::fts::PostingList,
1393
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1394
) -> Result<u32> {
1395
    let leaves = stage_fts_leaves(pager, idx, alloc)?;
1✔
1396
    if leaves.len() == 1 {
2✔
1397
        return Ok(leaves[0].0);
2✔
1398
    }
1399
    let mut level: Vec<(u32, i64)> = leaves;
1✔
1400
    while level.len() > 1 {
4✔
1401
        level = stage_interior_level(pager, &level, alloc)?;
2✔
1402
    }
1403
    Ok(level[0].0)
2✔
1404
}
1405

1406
/// Packs FTS posting cells into a sibling-chained run of `TableLeaf`
1407
/// pages. Cell layout: a single doc-lengths sidecar at `cell_id = 1`,
1408
/// followed by one cell per term in lexicographic order with
1409
/// `cell_id = 2..=N + 1`. Sequential ids keep the slot directory's
1410
/// rowid ordering valid (the `cell_id` field is what `peek_rowid`
1411
/// returns).
1412
fn stage_fts_leaves(
1✔
1413
    pager: &mut Pager,
1414
    idx: &crate::sql::fts::PostingList,
1415
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1416
) -> Result<Vec<(u32, i64)>> {
1417
    use crate::sql::pager::fts_cell::FtsPostingCell;
1418

1419
    let mut leaves: Vec<(u32, i64)> = Vec::new();
1✔
1420
    let mut current_leaf = TablePage::empty();
2✔
1421
    let mut current_leaf_page = alloc.allocate();
2✔
1422
    let mut current_max_rowid: Option<i64> = None;
1✔
1423

1424
    // Build the cell sequence: sidecar first, then per-term cells. The
1425
    // sidecar always exists (even on an empty index) so reload sees a
1426
    // canonical "this index was persisted" marker in slot 0.
1427
    let mut cell_id: i64 = 1;
1✔
1428
    let mut cells: Vec<FtsPostingCell> = Vec::new();
1✔
1429
    cells.push(FtsPostingCell::doc_lengths(
2✔
1430
        cell_id,
1✔
1431
        idx.serialize_doc_lengths(),
1✔
1432
    ));
1433
    for (term, entries) in idx.serialize_postings() {
3✔
1434
        cell_id += 1;
2✔
1435
        cells.push(FtsPostingCell::posting(cell_id, term, entries));
2✔
1436
    }
1437

1438
    for cell in cells {
2✔
1439
        let entry_bytes = cell.encode()?;
2✔
1440

1441
        if !current_leaf.would_fit(entry_bytes.len()) {
2✔
1442
            let next_leaf_page_num = alloc.allocate();
2✔
1443
            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1✔
1444
            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1✔
1445
            current_leaf = TablePage::empty();
1✔
1446
            current_leaf_page = next_leaf_page_num;
1✔
1447

1448
            if !current_leaf.would_fit(entry_bytes.len()) {
1✔
1449
                // A single posting cell exceeds page capacity. Phase
1450
                // 8c MVP doesn't chain via overflow cells (the plan
1451
                // notes this as a stretch goal); surface a clear
1452
                // error so users know which term tripped it.
1453
                return Err(SQLRiteError::Internal(format!(
×
1454
                    "FTS posting cell {} of {} bytes exceeds empty-page capacity {} \
1455
                     (term too long or too many postings; overflow chaining is Phase 8.1)",
1456
                    cell.cell_id,
1457
                    entry_bytes.len(),
×
1458
                    current_leaf.free_space()
×
1459
                )));
1460
            }
1461
        }
1462
        current_leaf.insert_entry(cell.cell_id, &entry_bytes)?;
2✔
1463
        current_max_rowid = Some(cell.cell_id);
1✔
1464
    }
1465

1466
    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1✔
1467
    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1✔
1468
    Ok(leaves)
1✔
1469
}
1470

1471
/// (rowid, value) pairs as decoded from a single FTS cell — value is
1472
/// either term frequency (posting cell) or doc length (sidecar cell).
1473
type FtsEntries = Vec<(i64, u32)>;
1474
/// (term, posting list) pairs as decoded from non-sidecar FTS cells.
1475
type FtsPostings = Vec<(String, FtsEntries)>;
1476

1477
/// Phase 8c — read every cell of an FTS index from `root_page` back
1478
/// into the `(doc_lengths, postings)` shape `PostingList::from_persisted_postings`
1479
/// expects. Mirrors `load_hnsw_nodes`: leftmost-leaf descent, walk the
1480
/// sibling chain, decode each slot.
1481
fn load_fts_postings(pager: &Pager, root_page: u32) -> Result<(FtsEntries, FtsPostings)> {
1✔
1482
    use crate::sql::pager::fts_cell::FtsPostingCell;
1483

1484
    let mut doc_lengths: Vec<(i64, u32)> = Vec::new();
1✔
1485
    let mut postings: Vec<(String, Vec<(i64, u32)>)> = Vec::new();
1✔
1486
    let mut saw_sidecar = false;
1✔
1487

1488
    let first_leaf = find_leftmost_leaf(pager, root_page)?;
2✔
1489
    let mut current = first_leaf;
1✔
1490
    while current != 0 {
1✔
1491
        let page_buf = pager
1✔
1492
            .read_page(current)
1✔
1493
            .ok_or_else(|| SQLRiteError::Internal(format!("missing FTS leaf page {current}")))?;
1✔
1494
        if page_buf[0] != PageType::TableLeaf as u8 {
1✔
1495
            return Err(SQLRiteError::Internal(format!(
×
1496
                "page {current} tagged {} but expected TableLeaf (FTS)",
1497
                page_buf[0]
×
1498
            )));
1499
        }
1500
        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
2✔
1501
        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
2✔
1502
            .try_into()
1✔
1503
            .map_err(|_| SQLRiteError::Internal("FTS leaf payload size".to_string()))?;
1✔
1504
        let leaf = TablePage::from_bytes(payload);
1✔
1505
        for slot in 0..leaf.slot_count() {
3✔
1506
            let offset = leaf.slot_offset_raw(slot)?;
2✔
1507
            let (cell, _) = FtsPostingCell::decode(leaf.as_bytes(), offset)?;
1✔
1508
            if cell.is_doc_lengths() {
3✔
1509
                if saw_sidecar {
1✔
1510
                    return Err(SQLRiteError::Internal(
×
1511
                        "FTS index has more than one doc-lengths sidecar cell".to_string(),
×
1512
                    ));
1513
                }
1514
                saw_sidecar = true;
1✔
1515
                doc_lengths = cell.entries;
1✔
1516
            } else {
1517
                postings.push((cell.term, cell.entries));
2✔
1518
            }
1519
        }
1520
        current = next_leaf;
1✔
1521
    }
1522

1523
    if !saw_sidecar {
1✔
1524
        return Err(SQLRiteError::Internal(
×
1525
            "FTS index missing doc-lengths sidecar cell — corrupt or truncated tree".to_string(),
×
1526
        ));
1527
    }
1528
    Ok((doc_lengths, postings))
1✔
1529
}
1530

1531
/// Packs HNSW nodes into a sibling-chained run of `TableLeaf` pages.
1532
/// `serialize_nodes` already returns nodes in ascending node_id order,
1533
/// so the slot directory's rowid ordering stays valid.
1534
fn stage_hnsw_leaves(
1✔
1535
    pager: &mut Pager,
1536
    idx: &crate::sql::hnsw::HnswIndex,
1537
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1538
) -> Result<Vec<(u32, i64)>> {
1539
    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1540

1541
    let mut leaves: Vec<(u32, i64)> = Vec::new();
1✔
1542
    let mut current_leaf = TablePage::empty();
2✔
1543
    let mut current_leaf_page = alloc.allocate();
2✔
1544
    let mut current_max_rowid: Option<i64> = None;
1✔
1545

1546
    let serialized = idx.serialize_nodes();
1✔
1547

1548
    // Empty index → emit a single empty leaf page so the rootpage
1549
    // pointer in sqlrite_master stays nonzero (== "graph is persisted,
1550
    // it just happens to be empty"). load_hnsw_nodes is fine with an
1551
    // empty leaf — slot_count() returns 0.
1552
    for (node_id, layers) in serialized {
2✔
1553
        let cell = HnswNodeCell::new(node_id, layers);
1✔
1554
        let entry_bytes = cell.encode()?;
2✔
1555

1556
        if !current_leaf.would_fit(entry_bytes.len()) {
2✔
1557
            let next_leaf_page_num = alloc.allocate();
×
1558
            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
×
1559
            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
×
1560
            current_leaf = TablePage::empty();
×
1561
            current_leaf_page = next_leaf_page_num;
×
1562

1563
            if !current_leaf.would_fit(entry_bytes.len()) {
×
1564
                return Err(SQLRiteError::Internal(format!(
×
1565
                    "HNSW node {node_id} cell of {} bytes exceeds empty-page capacity {}",
1566
                    entry_bytes.len(),
×
1567
                    current_leaf.free_space()
×
1568
                )));
1569
            }
1570
        }
1571
        current_leaf.insert_entry(node_id, &entry_bytes)?;
2✔
1572
        current_max_rowid = Some(node_id);
1✔
1573
    }
1574

1575
    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1✔
1576
    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1✔
1577
    Ok(leaves)
1✔
1578
}
1579

1580
fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
2✔
1581
    let first_leaf = find_leftmost_leaf(pager, root_page)?;
2✔
1582
    let mut current = first_leaf;
2✔
1583
    while current != 0 {
2✔
1584
        let page_buf = pager
2✔
1585
            .read_page(current)
2✔
1586
            .ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
2✔
1587
        if page_buf[0] != PageType::TableLeaf as u8 {
2✔
1588
            return Err(SQLRiteError::Internal(format!(
×
1589
                "page {current} tagged {} but expected TableLeaf",
1590
                page_buf[0]
1591
            )));
1592
        }
1593
        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
2✔
1594
        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
4✔
1595
            .try_into()
2✔
1596
            .map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
2✔
1597
        let leaf = TablePage::from_bytes(payload);
2✔
1598

1599
        for slot in 0..leaf.slot_count() {
6✔
1600
            let entry = leaf.entry_at(slot)?;
4✔
1601
            let cell = match entry {
2✔
1602
                PagedEntry::Local(c) => c,
2✔
1603
                PagedEntry::Overflow(r) => {
1✔
1604
                    let body_bytes =
2✔
1605
                        read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
1606
                    let (c, _) = Cell::decode(&body_bytes, 0)?;
2✔
1607
                    c
1✔
1608
                }
1609
            };
1610
            table.restore_row(cell.rowid, cell.values)?;
4✔
1611
        }
1612
        current = next_leaf;
2✔
1613
    }
1614
    Ok(())
2✔
1615
}
1616

1617
/// Walks every page reachable from `root_page` and returns their page
1618
/// numbers. Includes `root_page`, every interior page, every leaf, and
1619
/// — when `follow_overflow` is true — every overflow page chained off
1620
/// table-leaf cells. Used by `save_database` to seed each table's
1621
/// per-table preferred pool and to compute the newly-freed set.
1622
///
1623
/// `follow_overflow = true` for table B-Trees (cells may carry
1624
/// `OverflowRef`s pointing at chained overflow pages); `false` for
1625
/// secondary-index, HNSW, and FTS B-Trees, which never overflow in the
1626
/// current encoding.
1627
fn collect_pages_for_btree(
2✔
1628
    pager: &Pager,
1629
    root_page: u32,
1630
    follow_overflow: bool,
1631
) -> Result<Vec<u32>> {
1632
    if root_page == 0 {
2✔
1633
        return Ok(Vec::new());
×
1634
    }
1635
    let mut pages: Vec<u32> = Vec::new();
2✔
1636
    let mut stack: Vec<u32> = vec![root_page];
4✔
1637

1638
    while let Some(p) = stack.pop() {
4✔
1639
        let buf = pager.read_page(p).ok_or_else(|| {
4✔
1640
            SQLRiteError::Internal(format!(
×
1641
                "collect_pages: missing page {p} (rooted at {root_page})"
1642
            ))
1643
        })?;
1644
        pages.push(p);
2✔
1645
        match buf[0] {
2✔
1646
            t if t == PageType::InteriorNode as u8 => {
3✔
1647
                let payload: &[u8; PAYLOAD_PER_PAGE] =
2✔
1648
                    (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1649
                        SQLRiteError::Internal("interior payload slice size".to_string())
×
1650
                    })?;
1651
                let interior = InteriorPage::from_bytes(payload);
1✔
1652
                // Push every divider's child + the rightmost child.
1653
                for slot in 0..interior.slot_count() {
2✔
1654
                    let cell = interior.cell_at(slot)?;
2✔
1655
                    stack.push(cell.child_page);
1✔
1656
                }
1657
                stack.push(interior.rightmost_child());
1✔
1658
            }
1659
            t if t == PageType::TableLeaf as u8 => {
6✔
1660
                if follow_overflow {
2✔
1661
                    let payload: &[u8; PAYLOAD_PER_PAGE] =
2✔
1662
                        (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1663
                            SQLRiteError::Internal("leaf payload slice size".to_string())
×
1664
                        })?;
1665
                    let leaf = TablePage::from_bytes(payload);
2✔
1666
                    for slot in 0..leaf.slot_count() {
4✔
1667
                        match leaf.entry_at(slot)? {
4✔
1668
                            PagedEntry::Local(_) => {}
1669
                            PagedEntry::Overflow(r) => {
×
1670
                                let mut cur = r.first_overflow_page;
×
1671
                                while cur != 0 {
×
1672
                                    pages.push(cur);
×
1673
                                    let ob = pager.read_page(cur).ok_or_else(|| {
×
1674
                                        SQLRiteError::Internal(format!(
×
1675
                                            "collect_pages: missing overflow page {cur}"
1676
                                        ))
1677
                                    })?;
1678
                                    if ob[0] != PageType::Overflow as u8 {
×
1679
                                        return Err(SQLRiteError::Internal(format!(
×
1680
                                            "collect_pages: page {cur} expected Overflow, got tag {}",
1681
                                            ob[0]
×
1682
                                        )));
1683
                                    }
1684
                                    cur = u32::from_le_bytes(ob[1..5].try_into().unwrap());
×
1685
                                }
1686
                            }
1687
                        }
1688
                    }
1689
                }
1690
            }
1691
            other => {
×
1692
                return Err(SQLRiteError::Internal(format!(
×
1693
                    "collect_pages: unexpected page type {other} at page {p}"
1694
                )));
1695
            }
1696
        }
1697
    }
1698
    Ok(pages)
2✔
1699
}
1700

1701
/// Reads the previously-persisted `sqlrite_master` and returns a map from
1702
/// `(kind, name)` to that object's rootpage. Used by `save_database` to
1703
/// seed each table/index's per-table preferred pool with the pages it
1704
/// occupied last time round.
1705
///
1706
/// `kind` is `"table"` or `"index"` (the catalog already disambiguates
1707
/// the three index families via the SQL string, but for page-collection
1708
/// purposes a "table" tree must follow overflow refs while an "index"
1709
/// tree never does — that's the only distinction we need here).
1710
fn read_old_rootpages(pager: &Pager, schema_root: u32) -> Result<HashMap<(String, String), u32>> {
2✔
1711
    let mut out: HashMap<(String, String), u32> = HashMap::new();
2✔
1712
    if schema_root == 0 {
2✔
1713
        return Ok(out);
×
1714
    }
1715
    let mut master = build_empty_master_table();
2✔
1716
    load_table_rows(pager, &mut master, schema_root)?;
4✔
1717
    for rowid in master.rowids() {
6✔
1718
        let kind = take_text(&master, "type", rowid)?;
4✔
1719
        let name = take_text(&master, "name", rowid)?;
4✔
1720
        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
4✔
1721
        out.insert((kind, name), rootpage);
2✔
1722
    }
1723
    Ok(out)
2✔
1724
}
1725

1726
/// Descends from `root_page` through `InteriorNode` pages, always taking
1727
/// the leftmost child, until a `TableLeaf` is reached. Returns that leaf's
1728
/// page number. A root that's already a leaf is returned as-is.
1729
fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
2✔
1730
    let mut current = root_page;
2✔
1731
    loop {
1732
        let page_buf = pager.read_page(current).ok_or_else(|| {
2✔
1733
            SQLRiteError::Internal(format!("missing page {current} during tree descent"))
×
1734
        })?;
1735
        match page_buf[0] {
1736
            t if t == PageType::TableLeaf as u8 => return Ok(current),
4✔
1737
            t if t == PageType::InteriorNode as u8 => {
2✔
1738
                let payload: &[u8; PAYLOAD_PER_PAGE] =
1✔
1739
                    (&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
1740
                        SQLRiteError::Internal("interior payload slice size".to_string())
×
1741
                    })?;
1742
                let interior = InteriorPage::from_bytes(payload);
1✔
1743
                current = interior.leftmost_child()?;
2✔
1744
            }
1745
            other => {
×
1746
                return Err(SQLRiteError::Internal(format!(
×
1747
                    "unexpected page type {other} during tree descent at page {current}"
1748
                )));
1749
            }
1750
        }
1751
    }
1752
}
1753

1754
/// Stages a table's B-Tree, drawing every page number from `alloc`.
1755
/// Returns the root page (the topmost interior page, or the single leaf
1756
/// when the table fits in one page).
1757
///
1758
/// Builds bottom-up: pack rows into `TableLeaf` pages chained via
1759
/// `next_page`, then if more than one leaf, recursively wrap them in
1760
/// `InteriorNode` levels until one root remains.
1761
///
1762
/// Deterministic: same rows + same allocator handouts → byte-identical
1763
/// pages at the same numbers, so the diff pager skips unchanged tables.
1764
fn stage_table_btree(
2✔
1765
    pager: &mut Pager,
1766
    table: &Table,
1767
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1768
) -> Result<u32> {
1769
    let leaves = stage_leaves(pager, table, alloc)?;
2✔
1770
    if leaves.len() == 1 {
4✔
1771
        return Ok(leaves[0].0);
4✔
1772
    }
1773
    let mut level: Vec<(u32, i64)> = leaves;
1✔
1774
    while level.len() > 1 {
4✔
1775
        level = stage_interior_level(pager, &level, alloc)?;
2✔
1776
    }
1777
    Ok(level[0].0)
2✔
1778
}
1779

1780
/// Packs the table's rows into a sibling-linked chain of `TableLeaf` pages.
1781
/// Returns each leaf's `(page_number, max_rowid)` for use by the next
1782
/// interior level. Allocates leaf and overflow pages from `alloc`.
1783
fn stage_leaves(
2✔
1784
    pager: &mut Pager,
1785
    table: &Table,
1786
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1787
) -> Result<Vec<(u32, i64)>> {
1788
    let mut leaves: Vec<(u32, i64)> = Vec::new();
2✔
1789
    let mut current_leaf = TablePage::empty();
4✔
1790
    let mut current_leaf_page = alloc.allocate();
4✔
1791
    let mut current_max_rowid: Option<i64> = None;
2✔
1792

1793
    for rowid in table.rowids() {
4✔
1794
        let entry_bytes = build_row_entry(pager, table, rowid, alloc)?;
4✔
1795

1796
        if !current_leaf.would_fit(entry_bytes.len()) {
4✔
1797
            // The new leaf goes at whatever the allocator hands out
1798
            // next. Commit the current leaf with that as its sibling
1799
            // pointer.
1800
            let next_leaf_page_num = alloc.allocate();
2✔
1801
            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1✔
1802
            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1✔
1803
            current_leaf = TablePage::empty();
1✔
1804
            current_leaf_page = next_leaf_page_num;
1✔
1805
            // current_max_rowid is reassigned by the insert below; no need
1806
            // to zero it out here.
1807

1808
            if !current_leaf.would_fit(entry_bytes.len()) {
1✔
1809
                return Err(SQLRiteError::Internal(format!(
×
1810
                    "entry of {} bytes exceeds empty-page capacity {}",
1811
                    entry_bytes.len(),
×
1812
                    current_leaf.free_space()
×
1813
                )));
1814
            }
1815
        }
1816
        current_leaf.insert_entry(rowid, &entry_bytes)?;
4✔
1817
        current_max_rowid = Some(rowid);
2✔
1818
    }
1819

1820
    // Final leaf: sibling next_page = 0 (end of chain).
1821
    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
2✔
1822
    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
2✔
1823
    Ok(leaves)
2✔
1824
}
1825

1826
/// Encodes a single row's on-leaf entry — either the local cell bytes, or
1827
/// an `OverflowRef` pointing at a freshly-allocated overflow chain if the
1828
/// encoded cell exceeded the inline threshold. Allocates any overflow
1829
/// pages from `alloc`.
1830
fn build_row_entry(
2✔
1831
    pager: &mut Pager,
1832
    table: &Table,
1833
    rowid: i64,
1834
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1835
) -> Result<Vec<u8>> {
1836
    let values = table.extract_row(rowid);
2✔
1837
    let local_cell = Cell::new(rowid, values);
2✔
1838
    let local_bytes = local_cell.encode()?;
4✔
1839
    if local_bytes.len() > OVERFLOW_THRESHOLD {
7✔
1840
        let overflow_start = write_overflow_chain(pager, &local_bytes, alloc)?;
2✔
1841
        Ok(OverflowRef {
2✔
1842
            rowid,
1843
            total_body_len: local_bytes.len() as u64,
1✔
1844
            first_overflow_page: overflow_start,
1845
        }
1846
        .encode())
1✔
1847
    } else {
1848
        Ok(local_bytes)
2✔
1849
    }
1850
}
1851

1852
/// Builds one level of `InteriorNode` pages above the given children.
1853
/// Each interior packs as many dividers as will fit; the last child
1854
/// assigned to an interior becomes its `rightmost_child`. Returns the
1855
/// emitted interior pages as `(page_number, max_rowid_in_subtree)`.
1856
fn stage_interior_level(
1✔
1857
    pager: &mut Pager,
1858
    children: &[(u32, i64)],
1859
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1860
) -> Result<Vec<(u32, i64)>> {
1861
    let mut next_level: Vec<(u32, i64)> = Vec::new();
1✔
1862
    let mut idx = 0usize;
1✔
1863

1864
    while idx < children.len() {
1✔
1865
        let interior_page_num = alloc.allocate();
2✔
1866

1867
        // Seed the interior with the first unassigned child as its
1868
        // rightmost. As we add more children, the previous rightmost
1869
        // graduates to being a divider and the new arrival takes over
1870
        // as rightmost.
1871
        let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
1✔
1872
        idx += 1;
2✔
1873
        let mut interior = InteriorPage::empty(rightmost_child_page);
2✔
1874

1875
        while idx < children.len() {
1✔
1876
            let new_divider_cell = InteriorCell {
1877
                divider_rowid: rightmost_child_max,
1878
                child_page: rightmost_child_page,
1879
            };
1880
            let new_divider_bytes = new_divider_cell.encode();
1✔
1881
            if !interior.would_fit(new_divider_bytes.len()) {
2✔
1882
                break;
1883
            }
1884
            interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
2✔
1885
            let (next_child_page, next_child_max) = children[idx];
1✔
1886
            interior.set_rightmost_child(next_child_page);
1✔
1887
            rightmost_child_page = next_child_page;
1✔
1888
            rightmost_child_max = next_child_max;
1✔
1889
            idx += 1;
1✔
1890
        }
1891

1892
        emit_interior(pager, interior_page_num, &interior);
1✔
1893
        next_level.push((interior_page_num, rightmost_child_max));
1✔
1894
    }
1895

1896
    Ok(next_level)
1✔
1897
}
1898

1899
/// Wraps a `TablePage` in the 7-byte page header and hands it to the pager.
1900
fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
2✔
1901
    let mut buf = [0u8; PAGE_SIZE];
2✔
1902
    buf[0] = PageType::TableLeaf as u8;
2✔
1903
    buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
2✔
1904
    // For leaf pages the legacy `payload_len` field isn't used — the slot
1905
    // directory self-describes. Zero it by convention.
1906
    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
2✔
1907
    buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
2✔
1908
    pager.stage_page(page_num, buf);
2✔
1909
}
1910

1911
/// Wraps an `InteriorPage` in the 7-byte page header. Interior pages
1912
/// don't use `next_page` (there's no sibling chain between interiors);
1913
/// `payload_len` is also unused (the slot directory self-describes).
1914
fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
1✔
1915
    let mut buf = [0u8; PAGE_SIZE];
1✔
1916
    buf[0] = PageType::InteriorNode as u8;
1✔
1917
    buf[1..5].copy_from_slice(&0u32.to_le_bytes());
1✔
1918
    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1✔
1919
    buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
1✔
1920
    pager.stage_page(page_num, buf);
1✔
1921
}
1922

1923
#[cfg(test)]
1924
mod tests {
1925
    use super::*;
1926
    use crate::sql::pager::freelist::MIN_PAGES_FOR_AUTO_VACUUM;
1927
    use crate::sql::process_command;
1928

1929
    fn seed_db() -> Database {
1✔
1930
        let mut db = Database::new("test".to_string());
1✔
1931
        process_command(
1932
            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
1933
            &mut db,
1934
        )
1935
        .unwrap();
1936
        process_command(
1937
            "INSERT INTO users (name, age) VALUES ('alice', 30);",
1938
            &mut db,
1939
        )
1940
        .unwrap();
1941
        process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
1✔
1942
        process_command(
1943
            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
1944
            &mut db,
1945
        )
1946
        .unwrap();
1947
        process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
1✔
1948
        db
1✔
1949
    }
1950

1951
    fn tmp_path(name: &str) -> std::path::PathBuf {
1✔
1952
        let mut p = std::env::temp_dir();
1✔
1953
        let pid = std::process::id();
2✔
1954
        let nanos = std::time::SystemTime::now()
2✔
1955
            .duration_since(std::time::UNIX_EPOCH)
1✔
1956
            .map(|d| d.as_nanos())
3✔
1957
            .unwrap_or(0);
1958
        p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
1✔
1959
        p
1✔
1960
    }
1961

1962
    /// Phase 4c: every .sqlrite has a `-wal` sidecar now. Delete both so
1963
    /// `/tmp` doesn't accumulate orphan WALs across test runs.
1964
    fn cleanup(path: &std::path::Path) {
1✔
1965
        let _ = std::fs::remove_file(path);
1✔
1966
        let mut wal = path.as_os_str().to_owned();
1✔
1967
        wal.push("-wal");
1✔
1968
        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
1✔
1969
    }
1970

1971
    #[test]
1972
    fn round_trip_preserves_schema_and_data() {
4✔
1973
        let path = tmp_path("roundtrip");
1✔
1974
        let mut db = seed_db();
1✔
1975
        save_database(&mut db, &path).expect("save");
2✔
1976

1977
        let loaded = open_database(&path, "test".to_string()).expect("open");
1✔
1978
        assert_eq!(loaded.tables.len(), 2);
2✔
1979

1980
        let users = loaded.get_table("users".to_string()).expect("users table");
1✔
1981
        assert_eq!(users.columns.len(), 3);
1✔
1982
        let rowids = users.rowids();
1✔
1983
        assert_eq!(rowids.len(), 2);
2✔
1984
        let names: Vec<String> = rowids
1✔
1985
            .iter()
1986
            .filter_map(|r| match users.get_value("name", *r) {
3✔
1987
                Some(Value::Text(s)) => Some(s),
1✔
1988
                _ => None,
×
1989
            })
1990
            .collect();
1991
        assert!(names.contains(&"alice".to_string()));
2✔
1992
        assert!(names.contains(&"bob".to_string()));
1✔
1993

1994
        let notes = loaded.get_table("notes".to_string()).expect("notes table");
1✔
1995
        assert_eq!(notes.rowids().len(), 1);
1✔
1996

1997
        cleanup(&path);
1✔
1998
    }
1999

2000
    // -----------------------------------------------------------------
2001
    // Phase 7a — VECTOR(N) save / reopen round-trip
2002
    // -----------------------------------------------------------------
2003

2004
    #[test]
2005
    fn round_trip_preserves_vector_column() {
3✔
2006
        let path = tmp_path("vec_roundtrip");
1✔
2007

2008
        // Build, populate, save.
2009
        {
2010
            let mut db = Database::new("test".to_string());
2✔
2011
            process_command(
2012
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
2013
                &mut db,
2014
            )
2015
            .unwrap();
2016
            process_command(
2017
                "INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
2018
                &mut db,
2019
            )
2020
            .unwrap();
2021
            process_command(
2022
                "INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
2023
                &mut db,
2024
            )
2025
            .unwrap();
2026
            save_database(&mut db, &path).expect("save");
1✔
2027
        } // db drops → its exclusive lock releases before reopen.
1✔
2028

2029
        // Reopen and verify schema + data both round-tripped.
2030
        let loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2031
        let docs = loaded.get_table("docs".to_string()).expect("docs table");
2✔
2032

2033
        // Schema preserved: column is still VECTOR(3).
2034
        let embedding_col = docs
3✔
2035
            .columns
2036
            .iter()
2037
            .find(|c| c.column_name == "embedding")
3✔
2038
            .expect("embedding column");
2039
        assert!(
×
2040
            matches!(embedding_col.datatype, DataType::Vector(3)),
1✔
2041
            "expected DataType::Vector(3) after round-trip, got {:?}",
2042
            embedding_col.datatype
2043
        );
2044

2045
        // Data preserved: both vectors still readable bit-for-bit.
2046
        let mut rows: Vec<Vec<f32>> = docs
1✔
2047
            .rowids()
2048
            .iter()
2049
            .filter_map(|r| match docs.get_value("embedding", *r) {
3✔
2050
                Some(Value::Vector(v)) => Some(v),
1✔
2051
                _ => None,
×
2052
            })
2053
            .collect();
2054
        rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
3✔
2055
        assert_eq!(rows.len(), 2);
1✔
2056
        assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
1✔
2057
        assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
1✔
2058

2059
        cleanup(&path);
1✔
2060
    }
2061

2062
    #[test]
2063
    fn round_trip_preserves_json_column() {
4✔
2064
        // Phase 7e — JSON columns are stored as Text under the hood with
2065
        // INSERT-time validation. Save + reopen should preserve the
2066
        // schema (DataType::Json) and the underlying text bytes; a
2067
        // post-reopen json_extract should still resolve paths correctly.
2068
        let path = tmp_path("json_roundtrip");
1✔
2069

2070
        {
2071
            let mut db = Database::new("test".to_string());
2✔
2072
            process_command(
2073
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, payload JSON);",
2074
                &mut db,
2075
            )
2076
            .unwrap();
2077
            process_command(
2078
                r#"INSERT INTO docs (payload) VALUES ('{"name": "alice", "tags": ["rust","sql"]}');"#,
2079
                &mut db,
2080
            )
2081
            .unwrap();
2082
            save_database(&mut db, &path).expect("save");
1✔
2083
        }
2084

2085
        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2086
        let docs = loaded.get_table("docs".to_string()).expect("docs");
2✔
2087

2088
        // Schema: column declared as JSON, restored with the same type.
2089
        let payload_col = docs
3✔
2090
            .columns
2091
            .iter()
2092
            .find(|c| c.column_name == "payload")
3✔
2093
            .unwrap();
2094
        assert!(
×
2095
            matches!(payload_col.datatype, DataType::Json),
1✔
2096
            "expected DataType::Json, got {:?}",
2097
            payload_col.datatype
2098
        );
2099

2100
        // json_extract works against the reopened data — exercises the
2101
        // full Text-storage + serde_json::from_str path post-reopen.
2102
        let resp = process_command(
2103
            r#"SELECT id FROM docs WHERE json_extract(payload, '$.name') = 'alice';"#,
2104
            &mut loaded,
2105
        )
2106
        .expect("select via json_extract after reopen");
2107
        assert!(resp.contains("1 row returned"), "got: {resp}");
2✔
2108

2109
        cleanup(&path);
2✔
2110
    }
2111

2112
    #[test]
2113
    fn round_trip_rebuilds_hnsw_index_from_create_sql() {
3✔
2114
        // Phase 7d.3: HNSW indexes now persist their graph as cell-encoded
2115
        // pages. After save+reopen the index entry reattaches with the
2116
        // same column + same node count, loaded directly from disk
2117
        // instead of re-walking rows.
2118
        let path = tmp_path("hnsw_roundtrip");
1✔
2119

2120
        // Build, populate, index, save.
2121
        {
2122
            let mut db = Database::new("test".to_string());
2✔
2123
            process_command(
2124
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2125
                &mut db,
2126
            )
2127
            .unwrap();
2128
            for v in &[
1✔
2129
                "[1.0, 0.0]",
2130
                "[2.0, 0.0]",
2131
                "[0.0, 3.0]",
2132
                "[1.0, 4.0]",
2133
                "[10.0, 10.0]",
2134
            ] {
2135
                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2✔
2136
            }
2137
            process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
1✔
2138
            save_database(&mut db, &path).expect("save");
1✔
2139
        } // db drops → exclusive lock releases.
1✔
2140

2141
        // Reopen and verify the index reattached, with the same name +
2142
        // column + populated graph.
2143
        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2144
        {
2145
            let table = loaded.get_table("docs".to_string()).expect("docs");
2✔
2146
            assert_eq!(table.hnsw_indexes.len(), 1, "HNSW index should reattach");
1✔
2147
            let entry = &table.hnsw_indexes[0];
2✔
2148
            assert_eq!(entry.name, "ix_e");
1✔
2149
            assert_eq!(entry.column_name, "e");
1✔
2150
            assert_eq!(entry.index.len(), 5, "loaded graph should hold all 5 rows");
1✔
2151
            assert!(
×
2152
                !entry.needs_rebuild,
1✔
2153
                "fresh load should not be marked dirty"
2154
            );
2155
        }
2156

2157
        // Quick functional check: KNN query through the loaded index
2158
        // returns results.
2159
        let resp = process_command(
2160
            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
2161
            &mut loaded,
2162
        )
2163
        .unwrap();
2164
        assert!(resp.contains("3 rows returned"), "got: {resp}");
2✔
2165

2166
        cleanup(&path);
2✔
2167
    }
2168

2169
    #[test]
2170
    fn round_trip_rebuilds_fts_index_from_create_sql() {
3✔
2171
        // Phase 8c: FTS indexes now persist their posting lists as
2172
        // cell-encoded pages. After save+reopen the index entry
2173
        // reattaches with the same column + same posting count, loaded
2174
        // directly from disk (no re-tokenization).
2175
        let path = tmp_path("fts_roundtrip");
1✔
2176

2177
        {
2178
            let mut db = Database::new("test".to_string());
2✔
2179
            process_command(
2180
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2181
                &mut db,
2182
            )
2183
            .unwrap();
2184
            for body in &[
1✔
2185
                "rust embedded database",
2186
                "rust web framework",
2187
                "go embedded systems",
2188
                "python web framework",
2189
                "rust rust embedded power",
2190
            ] {
2191
                process_command(
2192
                    &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2✔
2193
                    &mut db,
2194
                )
2195
                .unwrap();
2196
            }
2197
            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2198
            save_database(&mut db, &path).expect("save");
1✔
2199
        } // db drops → exclusive lock releases.
1✔
2200

2201
        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2202
        {
2203
            let table = loaded.get_table("docs".to_string()).expect("docs");
2✔
2204
            assert_eq!(table.fts_indexes.len(), 1, "FTS index should reattach");
1✔
2205
            let entry = &table.fts_indexes[0];
2✔
2206
            assert_eq!(entry.name, "ix_body");
1✔
2207
            assert_eq!(entry.column_name, "body");
1✔
2208
            assert_eq!(
1✔
2209
                entry.index.len(),
1✔
2210
                5,
2211
                "rebuilt posting list should hold all 5 rows"
2212
            );
2213
            assert!(!entry.needs_rebuild);
1✔
2214
        }
2215

2216
        // Functional smoke: an FTS query through the reloaded index
2217
        // returns the expected hit count.
2218
        let resp = process_command(
2219
            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2220
            &mut loaded,
2221
        )
2222
        .unwrap();
2223
        assert!(resp.contains("3 rows returned"), "got: {resp}");
2✔
2224

2225
        cleanup(&path);
2✔
2226
    }
2227

2228
    #[test]
2229
    fn delete_then_save_then_reopen_excludes_deleted_node_from_fts() {
3✔
2230
        // Phase 8b — DELETE marks the FTS index dirty; save rebuilds it
2231
        // from current rows; reopen replays the CREATE INDEX SQL against
2232
        // the post-delete row set. The deleted rowid must not surface
2233
        // in `fts_match` results post-reopen.
2234
        let path = tmp_path("fts_delete_rebuild");
1✔
2235
        let mut db = Database::new("test".to_string());
2✔
2236
        process_command(
2237
            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2238
            &mut db,
2239
        )
2240
        .unwrap();
2241
        for body in &[
1✔
2242
            "rust embedded",
2243
            "rust framework",
2244
            "go embedded",
2245
            "python web",
2246
        ] {
2247
            process_command(
2248
                &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2✔
2249
                &mut db,
2250
            )
2251
            .unwrap();
2252
        }
2253
        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2254

2255
        // Delete row 1 ('rust embedded'); save (rebuild fires); reopen.
2256
        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
1✔
2257
        save_database(&mut db, &path).expect("save");
1✔
2258
        drop(db);
1✔
2259

2260
        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2261
        let resp = process_command(
2262
            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2263
            &mut loaded,
2264
        )
2265
        .unwrap();
2266
        // Pre-delete: 2 rows ('rust embedded', 'rust framework') had
2267
        // 'rust'. Post-delete: only id=2 remains.
2268
        assert!(resp.contains("1 row returned"), "got: {resp}");
2✔
2269

2270
        cleanup(&path);
2✔
2271
    }
2272

2273
    #[test]
2274
    fn fts_roundtrip_uses_persistence_path_not_replay() {
3✔
2275
        // Phase 8c — assert the reload didn't go through the
2276
        // rootpage=0 replay shortcut. We do this by reading the
2277
        // sqlrite_master row for the FTS index and confirming its
2278
        // rootpage field is non-zero.
2279
        let path = tmp_path("fts_persistence_path");
1✔
2280

2281
        {
2282
            let mut db = Database::new("test".to_string());
2✔
2283
            process_command(
2284
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2285
                &mut db,
2286
            )
2287
            .unwrap();
2288
            process_command(
2289
                "INSERT INTO docs (body) VALUES ('rust embedded database');",
2290
                &mut db,
2291
            )
2292
            .unwrap();
2293
            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2294
            save_database(&mut db, &path).expect("save");
1✔
2295
        }
2296

2297
        // Read raw sqlrite_master to find the FTS index row.
2298
        let pager = Pager::open(&path).expect("open pager");
1✔
2299
        let mut master = build_empty_master_table();
1✔
2300
        load_table_rows(&pager, &mut master, pager.header().schema_root_page).unwrap();
2✔
2301
        let mut found_rootpage: Option<u32> = None;
1✔
2302
        for rowid in master.rowids() {
2✔
2303
            let name = take_text(&master, "name", rowid).unwrap();
2✔
2304
            if name == "ix_body" {
3✔
2305
                let rp = take_integer(&master, "rootpage", rowid).unwrap();
1✔
2306
                found_rootpage = Some(rp as u32);
1✔
2307
            }
2308
        }
2309
        let rootpage = found_rootpage.expect("ix_body row in sqlrite_master");
1✔
2310
        assert!(
×
2311
            rootpage != 0,
1✔
2312
            "Phase 8c FTS save should set rootpage != 0; got {rootpage}"
2313
        );
2314

2315
        cleanup(&path);
2✔
2316
    }
2317

2318
    #[test]
2319
    fn save_without_fts_keeps_format_v4() {
3✔
2320
        // Phase 8c on-demand bump — a database with zero FTS indexes
2321
        // continues writing the v4 header. Existing v4 users must not
2322
        // see their files silently promoted to v5 by an upgrade.
2323
        use crate::sql::pager::header::FORMAT_VERSION_V4;
2324

2325
        let path = tmp_path("fts_no_bump");
1✔
2326
        let mut db = Database::new("test".to_string());
2✔
2327
        process_command(
2328
            "CREATE TABLE t (id INTEGER PRIMARY KEY, n INTEGER);",
2329
            &mut db,
2330
        )
2331
        .unwrap();
2332
        process_command("INSERT INTO t (n) VALUES (1);", &mut db).unwrap();
1✔
2333
        save_database(&mut db, &path).unwrap();
1✔
2334
        drop(db);
1✔
2335

2336
        let pager = Pager::open(&path).expect("open");
1✔
2337
        assert_eq!(
1✔
2338
            pager.header().format_version,
1✔
2339
            FORMAT_VERSION_V4,
2340
            "no-FTS save should keep v4"
2341
        );
2342
        cleanup(&path);
2✔
2343
    }
2344

2345
    #[test]
2346
    fn save_with_fts_bumps_to_v5() {
3✔
2347
        // Phase 8c on-demand bump — first FTS-bearing save promotes
2348
        // the file to v5. v5 readers handle both v4 and v5; v4
2349
        // readers correctly refuse a v5 file.
2350
        use crate::sql::pager::header::FORMAT_VERSION_V5;
2351

2352
        let path = tmp_path("fts_bump_v5");
1✔
2353
        let mut db = Database::new("test".to_string());
2✔
2354
        process_command(
2355
            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2356
            &mut db,
2357
        )
2358
        .unwrap();
2359
        process_command("INSERT INTO docs (body) VALUES ('hello');", &mut db).unwrap();
1✔
2360
        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2361
        save_database(&mut db, &path).unwrap();
1✔
2362
        drop(db);
1✔
2363

2364
        let pager = Pager::open(&path).expect("open");
1✔
2365
        assert_eq!(
1✔
2366
            pager.header().format_version,
1✔
2367
            FORMAT_VERSION_V5,
2368
            "FTS save should promote to v5"
2369
        );
2370
        cleanup(&path);
2✔
2371
    }
2372

2373
    #[test]
2374
    fn fts_persistence_handles_empty_and_zero_token_docs() {
4✔
2375
        // Phase 8c — sidecar cell carries doc-lengths for every doc
2376
        // including any with zero tokens (so total_docs is honest
2377
        // post-reopen). Empty index also round-trips: a CREATE INDEX
2378
        // on an empty table emits a single empty leaf with just the
2379
        // (empty) sidecar.
2380
        let path = tmp_path("fts_edges");
1✔
2381

2382
        {
2383
            let mut db = Database::new("test".to_string());
2✔
2384
            process_command(
2385
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2386
                &mut db,
2387
            )
2388
            .unwrap();
2389
            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2390
            // Mix: real text, then a row that tokenizes to zero tokens
2391
            // (only punctuation), then real again.
2392
            process_command("INSERT INTO docs (body) VALUES ('rust embedded');", &mut db).unwrap();
1✔
2393
            process_command("INSERT INTO docs (body) VALUES ('!!!---???');", &mut db).unwrap();
1✔
2394
            process_command("INSERT INTO docs (body) VALUES ('go embedded');", &mut db).unwrap();
1✔
2395
            save_database(&mut db, &path).unwrap();
1✔
2396
        }
2397

2398
        let loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2399
        let table = loaded.get_table("docs".to_string()).unwrap();
2✔
2400
        let entry = &table.fts_indexes[0];
1✔
2401
        // All three rows present — including the zero-token row,
2402
        // which is critical for total_docs honesty in BM25.
2403
        assert_eq!(entry.index.len(), 3);
1✔
2404
        // 'embedded' appears in 2 rows after reload.
2405
        let res = entry
1✔
2406
            .index
2407
            .query("embedded", &crate::sql::fts::Bm25Params::default());
1✔
2408
        assert_eq!(res.len(), 2);
2✔
2409

2410
        cleanup(&path);
1✔
2411
    }
2412

2413
    #[test]
2414
    fn fts_persistence_round_trips_large_corpus() {
3✔
2415
        // Phase 8c — exercise multi-leaf staging. ~500 docs with
2416
        // single-token bodies generates enough cells to overflow a
2417
        // single 4 KiB leaf (each posting cell averages ~8 bytes).
2418
        let path = tmp_path("fts_large_corpus");
1✔
2419

2420
        let mut expected_terms: std::collections::BTreeSet<String> =
1✔
2421
            std::collections::BTreeSet::new();
2422
        {
2423
            let mut db = Database::new("test".to_string());
2✔
2424
            process_command(
2425
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2426
                &mut db,
2427
            )
2428
            .unwrap();
2429
            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2430
            // 500 docs, each one a unique term — drives unique-term
2431
            // count up so multiple leaves are required.
2432
            for i in 0..500 {
2✔
2433
                let term = format!("term{i:04}");
2✔
2434
                process_command(
2435
                    &format!("INSERT INTO docs (body) VALUES ('{term}');"),
2✔
2436
                    &mut db,
2437
                )
2438
                .unwrap();
2439
                expected_terms.insert(term);
1✔
2440
            }
2441
            save_database(&mut db, &path).unwrap();
1✔
2442
        }
2443

2444
        let loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2445
        let table = loaded.get_table("docs".to_string()).unwrap();
2✔
2446
        let entry = &table.fts_indexes[0];
1✔
2447
        assert_eq!(entry.index.len(), 500);
1✔
2448

2449
        // Spot-check a handful of terms come back with their original
2450
        // single-row posting list.
2451
        for &i in &[0_i64, 137, 248, 391, 499] {
1✔
2452
            let term = format!("term{i:04}");
2✔
2453
            let res = entry
1✔
2454
                .index
2455
                .query(&term, &crate::sql::fts::Bm25Params::default());
2✔
2456
            assert_eq!(res.len(), 1, "term {term} should match exactly 1 row");
2✔
2457
            // PrimaryKey rowids start at 1; doc i was inserted at
2458
            // rowid i+1.
2459
            assert_eq!(res[0].0, i + 1);
2✔
2460
        }
2461

2462
        cleanup(&path);
1✔
2463
    }
2464

2465
    #[test]
2466
    fn delete_then_save_then_reopen_excludes_deleted_node_from_hnsw() {
3✔
2467
        // Phase 7d.3 — DELETE marks HNSW dirty; save rebuilds it from
2468
        // current rows + serializes; reopen loads the post-delete graph.
2469
        // After all that, the deleted rowid must NOT come back from a
2470
        // KNN query.
2471
        let path = tmp_path("hnsw_delete_rebuild");
1✔
2472
        let mut db = Database::new("test".to_string());
2✔
2473
        process_command(
2474
            "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2475
            &mut db,
2476
        )
2477
        .unwrap();
2478
        for v in &["[1.0, 0.0]", "[2.0, 0.0]", "[3.0, 0.0]", "[4.0, 0.0]"] {
1✔
2479
            process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2✔
2480
        }
2481
        process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
1✔
2482

2483
        // Delete row 1 (the closest match to [0.5, 0.0]).
2484
        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
1✔
2485
        // Confirm it marked dirty.
2486
        let dirty_before_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
1✔
2487
        assert!(dirty_before_save, "DELETE should mark dirty");
1✔
2488

2489
        save_database(&mut db, &path).expect("save");
2✔
2490
        // Confirm save cleared the dirty flag.
2491
        let dirty_after_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
1✔
2492
        assert!(!dirty_after_save, "save should clear dirty");
1✔
2493
        drop(db);
1✔
2494

2495
        // Reopen, query for the closest match. Row 1 is gone; row 2
2496
        // (id=2, vector [2.0, 0.0]) should now be the nearest.
2497
        let loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2498
        let docs = loaded.get_table("docs".to_string()).expect("docs");
2✔
2499

2500
        // Row 1 must not appear in any storage anymore.
2501
        assert!(
1✔
2502
            !docs.rowids().contains(&1),
2✔
2503
            "deleted row 1 should not be in row storage"
2504
        );
2505
        assert_eq!(docs.rowids().len(), 3, "should have 3 surviving rows");
1✔
2506

2507
        // The HNSW index must also have shed the deleted node.
2508
        assert_eq!(
1✔
2509
            docs.hnsw_indexes[0].index.len(),
1✔
2510
            3,
2511
            "HNSW graph should have shed the deleted node"
2512
        );
2513

2514
        cleanup(&path);
2✔
2515
    }
2516

2517
    #[test]
2518
    fn round_trip_survives_writes_after_load() {
3✔
2519
        let path = tmp_path("after_load");
1✔
2520
        save_database(&mut seed_db(), &path).unwrap();
2✔
2521

2522
        {
2523
            let mut db = open_database(&path, "test".to_string()).unwrap();
1✔
2524
            process_command(
2525
                "INSERT INTO users (name, age) VALUES ('carol', 40);",
2526
                &mut db,
2527
            )
2528
            .unwrap();
2529
            save_database(&mut db, &path).unwrap();
1✔
2530
        } // db drops → its exclusive lock releases before we reopen below.
1✔
2531

2532
        let db2 = open_database(&path, "test".to_string()).unwrap();
1✔
2533
        let users = db2.get_table("users".to_string()).unwrap();
2✔
2534
        assert_eq!(users.rowids().len(), 3);
1✔
2535

2536
        cleanup(&path);
1✔
2537
    }
2538

2539
    #[test]
2540
    fn open_rejects_garbage_file() {
3✔
2541
        let path = tmp_path("bad");
1✔
2542
        std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
2✔
2543
        let result = open_database(&path, "x".to_string());
1✔
2544
        assert!(result.is_err());
2✔
2545
        cleanup(&path);
1✔
2546
    }
2547

2548
    #[test]
2549
    fn many_small_rows_spread_across_leaves() {
3✔
2550
        let path = tmp_path("many_rows");
1✔
2551
        let mut db = Database::new("big".to_string());
2✔
2552
        process_command(
2553
            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2554
            &mut db,
2555
        )
2556
        .unwrap();
2557
        for i in 0..200 {
1✔
2558
            let body = "x".repeat(200);
1✔
2559
            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2✔
2560
            process_command(&q, &mut db).unwrap();
2✔
2561
        }
2562
        save_database(&mut db, &path).unwrap();
1✔
2563
        let loaded = open_database(&path, "big".to_string()).unwrap();
1✔
2564
        let things = loaded.get_table("things".to_string()).unwrap();
2✔
2565
        assert_eq!(things.rowids().len(), 200);
1✔
2566
        cleanup(&path);
1✔
2567
    }
2568

2569
    #[test]
2570
    fn huge_row_goes_through_overflow() {
3✔
2571
        let path = tmp_path("overflow_row");
1✔
2572
        let mut db = Database::new("big".to_string());
2✔
2573
        process_command(
2574
            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2575
            &mut db,
2576
        )
2577
        .unwrap();
2578
        let body = "A".repeat(10_000);
1✔
2579
        process_command(
2580
            &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2✔
2581
            &mut db,
2582
        )
2583
        .unwrap();
2584
        save_database(&mut db, &path).unwrap();
1✔
2585

2586
        let loaded = open_database(&path, "big".to_string()).unwrap();
1✔
2587
        let docs = loaded.get_table("docs".to_string()).unwrap();
2✔
2588
        let rowids = docs.rowids();
1✔
2589
        assert_eq!(rowids.len(), 1);
2✔
2590
        let stored = docs.get_value("body", rowids[0]);
1✔
2591
        match stored {
1✔
2592
            Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
1✔
2593
            other => panic!("expected Text, got {other:?}"),
×
2594
        }
2595
        cleanup(&path);
1✔
2596
    }
2597

2598
    #[test]
2599
    fn create_sql_synthesis_round_trips() {
3✔
2600
        // Build a table via CREATE, then verify table_to_create_sql +
2601
        // parse_create_sql reproduce an equivalent column list.
2602
        let mut db = Database::new("x".to_string());
1✔
2603
        process_command(
2604
            "CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
2605
            &mut db,
2606
        )
2607
        .unwrap();
2608
        let t = db.get_table("t".to_string()).unwrap();
1✔
2609
        let sql = table_to_create_sql(t);
1✔
2610
        let (name, cols) = parse_create_sql(&sql).unwrap();
2✔
2611
        assert_eq!(name, "t");
2✔
2612
        assert_eq!(cols.len(), 3);
1✔
2613
        assert!(cols[0].is_pk);
1✔
2614
        assert!(cols[1].is_unique);
1✔
2615
        assert!(cols[2].not_null);
1✔
2616
    }
2617

2618
    #[test]
2619
    fn sqlrite_master_is_not_exposed_as_a_user_table() {
3✔
2620
        // After open, the public db.tables map should not list the master.
2621
        let path = tmp_path("no_master");
1✔
2622
        save_database(&mut seed_db(), &path).unwrap();
2✔
2623
        let loaded = open_database(&path, "x".to_string()).unwrap();
1✔
2624
        assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
2✔
2625
        cleanup(&path);
2✔
2626
    }
2627

2628
    #[test]
2629
    fn multi_leaf_table_produces_an_interior_root() {
3✔
2630
        // 200 fat rows force the table into multiple leaves, which means
2631
        // save_database must build at least one InteriorNode above them.
2632
        // The test verifies the round-trip works and confirms the root is
2633
        // indeed an interior page (not a leaf) by reading the page type
2634
        // directly out of the open pager.
2635
        let path = tmp_path("multi_leaf_interior");
1✔
2636
        let mut db = Database::new("big".to_string());
2✔
2637
        process_command(
2638
            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2639
            &mut db,
2640
        )
2641
        .unwrap();
2642
        for i in 0..200 {
1✔
2643
            let body = "x".repeat(200);
1✔
2644
            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2✔
2645
            process_command(&q, &mut db).unwrap();
2✔
2646
        }
2647
        save_database(&mut db, &path).unwrap();
1✔
2648

2649
        // Confirm the round-trip preserved all 200 rows.
2650
        let loaded = open_database(&path, "big".to_string()).unwrap();
1✔
2651
        let things = loaded.get_table("things".to_string()).unwrap();
2✔
2652
        assert_eq!(things.rowids().len(), 200);
1✔
2653

2654
        // Peek at `things`'s root page via the pager attached to the
2655
        // loaded DB and check it's an InteriorNode, not a leaf.
2656
        let pager = loaded
2✔
2657
            .pager
2658
            .as_ref()
2659
            .expect("loaded DB should have a pager");
2660
        // sqlrite_master's row for `things` holds its root page. Easiest
2661
        // way to find it: walk the leaf chain by using find_leftmost_leaf
2662
        // and then hop one level up. Simpler: read the master, scan for
2663
        // the "things" row, look up rootpage.
2664
        let mut master = build_empty_master_table();
1✔
2665
        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2✔
2666
        let things_root = master
1✔
2667
            .rowids()
2668
            .into_iter()
2669
            .find_map(|r| match master.get_value("name", r) {
3✔
2670
                Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
3✔
2671
                    Some(Value::Integer(p)) => Some(p as u32),
1✔
2672
                    _ => None,
×
2673
                },
2674
                _ => None,
×
2675
            })
2676
            .expect("things should appear in sqlrite_master");
2677
        let root_buf = pager.read_page(things_root).unwrap();
1✔
2678
        assert_eq!(
1✔
2679
            root_buf[0],
2680
            PageType::InteriorNode as u8,
2681
            "expected a multi-leaf table to have an interior root, got tag {}",
2682
            root_buf[0]
×
2683
        );
2684

2685
        cleanup(&path);
2✔
2686
    }
2687

2688
    #[test]
2689
    fn explicit_index_persists_across_save_and_open() {
3✔
2690
        let path = tmp_path("idx_persist");
1✔
2691
        let mut db = Database::new("idx".to_string());
2✔
2692
        process_command(
2693
            "CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
2694
            &mut db,
2695
        )
2696
        .unwrap();
2697
        for i in 1..=5 {
1✔
2698
            let tag = if i % 2 == 0 { "odd" } else { "even" };
2✔
2699
            process_command(
2700
                &format!("INSERT INTO users (tag) VALUES ('{tag}');"),
1✔
2701
                &mut db,
2702
            )
2703
            .unwrap();
2704
        }
2705
        process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
1✔
2706
        save_database(&mut db, &path).unwrap();
1✔
2707

2708
        let loaded = open_database(&path, "idx".to_string()).unwrap();
1✔
2709
        let users = loaded.get_table("users".to_string()).unwrap();
2✔
2710
        let idx = users
1✔
2711
            .index_by_name("users_tag_idx")
2712
            .expect("explicit index should survive save/open");
2713
        assert_eq!(idx.column_name, "tag");
1✔
2714
        assert!(!idx.is_unique);
1✔
2715
        // 5 rows: rowids 2, 4 are "odd" (i % 2 == 0 when i is 2 or 4) — 2 entries;
2716
        // rowids 1, 3, 5 are "even" (i % 2 != 0) — 3 entries.
2717
        let even_rowids = idx.lookup(&Value::Text("even".into()));
2✔
2718
        let odd_rowids = idx.lookup(&Value::Text("odd".into()));
1✔
2719
        assert_eq!(even_rowids.len(), 3);
1✔
2720
        assert_eq!(odd_rowids.len(), 2);
1✔
2721

2722
        cleanup(&path);
1✔
2723
    }
2724

2725
    #[test]
2726
    fn auto_indexes_for_unique_columns_survive_save_open() {
3✔
2727
        let path = tmp_path("auto_idx_persist");
1✔
2728
        let mut db = Database::new("a".to_string());
2✔
2729
        process_command(
2730
            "CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
2731
            &mut db,
2732
        )
2733
        .unwrap();
2734
        process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
1✔
2735
        process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
1✔
2736
        save_database(&mut db, &path).unwrap();
1✔
2737

2738
        let loaded = open_database(&path, "a".to_string()).unwrap();
1✔
2739
        let users = loaded.get_table("users".to_string()).unwrap();
2✔
2740
        // Every UNIQUE column auto-creates an index; the load path populated
2741
        // it from the persisted entries.
2742
        let auto_name = SecondaryIndex::auto_name("users", "email");
1✔
2743
        let idx = users
1✔
2744
            .index_by_name(&auto_name)
2✔
2745
            .expect("auto index should be restored");
2746
        assert!(idx.is_unique);
1✔
2747
        assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
1✔
2748
        assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
1✔
2749

2750
        cleanup(&path);
1✔
2751
    }
2752

2753
    /// SQLR-1 — `CREATE INDEX` on a wide table must round-trip when the
2754
    /// index B-tree grows past one leaf and needs an interior level.
2755
    /// Before the fix, the post-DDL auto-save panicked with
2756
    /// `Internal("unknown paged-entry kind tag 0x4 …")` because a
2757
    /// table-cell decoder was being run against an index leaf
2758
    /// (`KIND_INDEX = 0x04`).
2759
    ///
2760
    /// 5 000 rows mirror the original repro from the issue and exceed
2761
    /// every leaf-fanout cliff for the small `(rowid, value)` cells in
2762
    /// a TEXT-keyed secondary index.
2763
    #[test]
2764
    fn secondary_index_with_interior_level_round_trips() {
3✔
2765
        let path = tmp_path("sqlr1_wide_index");
1✔
2766
        let mut db = Database::new("idx".to_string());
2✔
2767
        db.source_path = Some(path.clone());
2✔
2768

2769
        process_command(
2770
            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
2771
            &mut db,
2772
        )
2773
        .unwrap();
2774
        // BEGIN/COMMIT collapses 5 000 inserts into one save (matches
2775
        // `auto_vacuum_setup` and the issue's repro shape).
2776
        process_command("BEGIN;", &mut db).unwrap();
1✔
2777
        for i in 0..5000 {
1✔
2778
            process_command(
2779
                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
2✔
2780
                &mut db,
2781
            )
2782
            .unwrap();
2783
        }
2784
        process_command("COMMIT;", &mut db).unwrap();
1✔
2785

2786
        // The DDL that used to panic.
2787
        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
1✔
2788

2789
        // Reopen and verify lookups, plus that the index tree actually
2790
        // grew an interior layer (otherwise this test wouldn't cover the
2791
        // regression).
2792
        drop(db);
1✔
2793
        let loaded = open_database(&path, "idx".to_string()).unwrap();
1✔
2794
        let bloat = loaded.get_table("bloat".to_string()).unwrap();
2✔
2795
        let idx = bloat
1✔
2796
            .index_by_name("idx_p")
2797
            .expect("idx_p should survive close/reopen");
2798
        assert!(!idx.is_unique);
1✔
2799

2800
        // Spot-check the keyspace: first, middle, last value each map
2801
        // back to exactly the row that carried them.
2802
        for &(probe_i, expected_rowid) in &[(0i64, 1i64), (2500, 2501), (4999, 5000)] {
2✔
2803
            let value = Value::Text(format!("p-{probe_i:08}"));
2✔
2804
            let hits = idx.lookup(&value);
1✔
2805
            assert_eq!(
1✔
2806
                hits,
2807
                vec![expected_rowid],
2✔
2808
                "lookup({value:?}) should yield rowid {expected_rowid}",
2809
            );
2810
        }
2811

2812
        // Confirm the index tree is multi-level (the regression's
2813
        // necessary condition) — root must be an `InteriorNode` and
2814
        // `find_leftmost_leaf` must reach a `TableLeaf` through it.
2815
        let pager = loaded.pager.as_ref().unwrap();
1✔
2816
        let mut master = build_empty_master_table();
1✔
2817
        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2✔
2818
        let idx_root = master
1✔
2819
            .rowids()
2820
            .into_iter()
2821
            .find_map(
2822
                |r| match (master.get_value("name", r), master.get_value("type", r)) {
3✔
2823
                    (Some(Value::Text(name)), Some(Value::Text(kind)))
2✔
2824
                        if name == "idx_p" && kind == "index" =>
2✔
2825
                    {
2826
                        match master.get_value("rootpage", r) {
2✔
2827
                            Some(Value::Integer(p)) => Some(p as u32),
1✔
2828
                            _ => None,
×
2829
                        }
2830
                    }
2831
                    _ => None,
1✔
2832
                },
2833
            )
2834
            .expect("idx_p should appear in sqlrite_master");
2835
        let root_buf = pager.read_page(idx_root).unwrap();
1✔
2836
        assert_eq!(
1✔
2837
            root_buf[0],
2838
            PageType::InteriorNode as u8,
2839
            "5 000-entry index must have an interior root — without one this test wouldn't cover SQLR-1",
2840
        );
2841
        let leaf = find_leftmost_leaf(pager, idx_root).unwrap();
2✔
2842
        let leaf_buf = pager.read_page(leaf).unwrap();
1✔
2843
        assert_eq!(leaf_buf[0], PageType::TableLeaf as u8);
1✔
2844

2845
        cleanup(&path);
1✔
2846
    }
2847

2848
    /// SQLR-1 follow-on — the page-recycling path between two large
2849
    /// versions of the same index name must not corrupt cell decoding.
2850
    /// `DROP INDEX` returns its pages to the freelist; the next
2851
    /// `CREATE INDEX` is free to reuse them. If the allocator hands an
2852
    /// old index leaf to a *table* without zeroing it, an upstream
2853
    /// table walk would see KIND_INDEX cells and panic.
2854
    #[test]
2855
    fn drop_then_recreate_wide_index_does_not_panic() {
3✔
2856
        let path = tmp_path("sqlr1_drop_recreate");
1✔
2857
        let mut db = Database::new("idx".to_string());
2✔
2858
        db.source_path = Some(path.clone());
2✔
2859

2860
        process_command(
2861
            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
2862
            &mut db,
2863
        )
2864
        .unwrap();
2865
        process_command("BEGIN;", &mut db).unwrap();
1✔
2866
        for i in 0..5000 {
1✔
2867
            process_command(
2868
                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
2✔
2869
                &mut db,
2870
            )
2871
            .unwrap();
2872
        }
2873
        process_command("COMMIT;", &mut db).unwrap();
1✔
2874

2875
        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
1✔
2876
        process_command("DROP INDEX idx_p;", &mut db).unwrap();
1✔
2877
        // Recreate from scratch — exercises the recycle path.
2878
        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
1✔
2879

2880
        drop(db);
1✔
2881
        let loaded = open_database(&path, "idx".to_string()).unwrap();
1✔
2882
        let bloat = loaded.get_table("bloat".to_string()).unwrap();
2✔
2883
        let idx = bloat
1✔
2884
            .index_by_name("idx_p")
2885
            .expect("idx_p should survive drop+recreate+reopen");
2886
        assert_eq!(
1✔
2887
            idx.lookup(&Value::Text("p-00002500".into())),
1✔
2888
            vec![2501],
2✔
2889
            "post-recycle lookup must still resolve correctly",
2890
        );
2891

2892
        cleanup(&path);
1✔
2893
    }
2894

2895
    #[test]
2896
    fn deep_tree_round_trips() {
3✔
2897
        // Force a 3-level tree by bypassing process_command (which prints
2898
        // the full table on every INSERT, making large bulk loads O(N^2)
2899
        // in I/O). We build the Table directly via restore_row.
2900
        use crate::sql::db::table::Column as TableColumn;
2901

2902
        let path = tmp_path("deep_tree");
1✔
2903
        let mut db = Database::new("deep".to_string());
2✔
2904
        let columns = vec![
3✔
2905
            TableColumn::new("id".into(), "integer".into(), true, true, true),
2✔
2906
            TableColumn::new("s".into(), "text".into(), false, true, false),
2✔
2907
        ];
2908
        let mut table = build_empty_table("t", columns, 0);
1✔
2909
        // ~900-byte rows → ~4 rows per leaf. 6000 rows → ~1500 leaves,
2910
        // which with interior fanout ~400 needs 2 interior levels (3-level
2911
        // tree total, counting leaves).
2912
        for i in 1..=6_000i64 {
2✔
2913
            let body = "q".repeat(900);
1✔
2914
            table
1✔
2915
                .restore_row(
2916
                    i,
2917
                    vec![
3✔
2918
                        Some(Value::Integer(i)),
1✔
2919
                        Some(Value::Text(format!("r-{i}-{body}"))),
2✔
2920
                    ],
2921
                )
2922
                .unwrap();
2923
        }
2924
        db.tables.insert("t".to_string(), table);
1✔
2925
        save_database(&mut db, &path).unwrap();
1✔
2926

2927
        let loaded = open_database(&path, "deep".to_string()).unwrap();
1✔
2928
        let t = loaded.get_table("t".to_string()).unwrap();
2✔
2929
        assert_eq!(t.rowids().len(), 6_000);
1✔
2930

2931
        // Confirm the tree actually grew past 2 levels — i.e., the root's
2932
        // leftmost child is itself an interior page, not a leaf.
2933
        let pager = loaded.pager.as_ref().unwrap();
1✔
2934
        let mut master = build_empty_master_table();
1✔
2935
        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2✔
2936
        let t_root = master
1✔
2937
            .rowids()
2938
            .into_iter()
2939
            .find_map(|r| match master.get_value("name", r) {
3✔
2940
                Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
3✔
2941
                    Some(Value::Integer(p)) => Some(p as u32),
1✔
2942
                    _ => None,
×
2943
                },
2944
                _ => None,
×
2945
            })
2946
            .expect("t in sqlrite_master");
2947
        let root_buf = pager.read_page(t_root).unwrap();
1✔
2948
        assert_eq!(root_buf[0], PageType::InteriorNode as u8);
1✔
2949
        let root_payload: &[u8; PAYLOAD_PER_PAGE] =
1✔
2950
            (&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
2951
        let root_interior = InteriorPage::from_bytes(root_payload);
1✔
2952
        let child = root_interior.leftmost_child().unwrap();
2✔
2953
        let child_buf = pager.read_page(child).unwrap();
1✔
2954
        assert_eq!(
1✔
2955
            child_buf[0],
2956
            PageType::InteriorNode as u8,
2957
            "expected 3-level tree: root's leftmost child should also be InteriorNode",
2958
        );
2959

2960
        cleanup(&path);
2✔
2961
    }
2962

2963
    #[test]
2964
    fn alter_rename_table_survives_save_and_reopen() {
4✔
2965
        let path = tmp_path("alter_rename_table_roundtrip");
1✔
2966
        let mut db = seed_db();
1✔
2967
        save_database(&mut db, &path).expect("save");
2✔
2968

2969
        process_command("ALTER TABLE users RENAME TO members;", &mut db).expect("rename");
1✔
2970
        save_database(&mut db, &path).expect("save after rename");
1✔
2971

2972
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
2973
        assert!(!loaded.contains_table("users".to_string()));
2✔
2974
        assert!(loaded.contains_table("members".to_string()));
2✔
2975
        let members = loaded.get_table("members".to_string()).unwrap();
1✔
2976
        assert_eq!(members.rowids().len(), 2, "rows should survive");
1✔
2977
        // Auto-indexes followed the rename.
2978
        assert!(
×
2979
            members
2✔
2980
                .index_by_name("sqlrite_autoindex_members_id")
1✔
2981
                .is_some()
1✔
2982
        );
2983
        assert!(
×
2984
            members
2✔
2985
                .index_by_name("sqlrite_autoindex_members_name")
1✔
2986
                .is_some()
1✔
2987
        );
2988

2989
        cleanup(&path);
1✔
2990
    }
2991

2992
    #[test]
2993
    fn alter_rename_column_survives_save_and_reopen() {
3✔
2994
        let path = tmp_path("alter_rename_col_roundtrip");
1✔
2995
        let mut db = seed_db();
1✔
2996
        save_database(&mut db, &path).expect("save");
2✔
2997

2998
        process_command(
2999
            "ALTER TABLE users RENAME COLUMN name TO full_name;",
3000
            &mut db,
3001
        )
3002
        .expect("rename column");
3003
        save_database(&mut db, &path).expect("save after rename");
1✔
3004

3005
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
3006
        let users = loaded.get_table("users".to_string()).unwrap();
2✔
3007
        assert!(users.contains_column("full_name".to_string()));
1✔
3008
        assert!(!users.contains_column("name".to_string()));
1✔
3009
        // Verify a row's value survived the rename round-trip.
3010
        let alice_rowid = users
1✔
3011
            .rowids()
3012
            .into_iter()
3013
            .find(|r| users.get_value("full_name", *r) == Some(Value::Text("alice".to_string())))
3✔
3014
            .expect("alice row should be findable under renamed column");
3015
        assert_eq!(
1✔
3016
            users.get_value("full_name", alice_rowid),
1✔
3017
            Some(Value::Text("alice".to_string()))
2✔
3018
        );
3019

3020
        cleanup(&path);
1✔
3021
    }
3022

3023
    #[test]
3024
    fn alter_add_column_with_default_survives_save_and_reopen() {
3✔
3025
        let path = tmp_path("alter_add_default_roundtrip");
1✔
3026
        let mut db = seed_db();
1✔
3027
        save_database(&mut db, &path).expect("save");
2✔
3028

3029
        process_command(
3030
            "ALTER TABLE users ADD COLUMN status TEXT DEFAULT 'active';",
3031
            &mut db,
3032
        )
3033
        .expect("add column");
3034
        save_database(&mut db, &path).expect("save after add");
1✔
3035

3036
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
3037
        let users = loaded.get_table("users".to_string()).unwrap();
2✔
3038
        assert!(users.contains_column("status".to_string()));
1✔
3039
        for rowid in users.rowids() {
2✔
3040
            assert_eq!(
1✔
3041
                users.get_value("status", rowid),
1✔
3042
                Some(Value::Text("active".to_string())),
2✔
3043
                "backfilled default should round-trip for rowid {rowid}"
3044
            );
3045
        }
3046
        // The DEFAULT clause itself should still be on the column metadata
3047
        // so a subsequent INSERT picks it up.
3048
        let status_col = users
3✔
3049
            .columns
3050
            .iter()
3051
            .find(|c| c.column_name == "status")
3✔
3052
            .unwrap();
3053
        assert_eq!(status_col.default, Some(Value::Text("active".to_string())));
1✔
3054

3055
        cleanup(&path);
1✔
3056
    }
3057

3058
    #[test]
3059
    fn alter_drop_column_survives_save_and_reopen() {
3✔
3060
        let path = tmp_path("alter_drop_col_roundtrip");
1✔
3061
        let mut db = seed_db();
1✔
3062
        save_database(&mut db, &path).expect("save");
2✔
3063

3064
        process_command("ALTER TABLE users DROP COLUMN age;", &mut db).expect("drop column");
1✔
3065
        save_database(&mut db, &path).expect("save after drop");
1✔
3066

3067
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
3068
        let users = loaded.get_table("users".to_string()).unwrap();
2✔
3069
        assert!(!users.contains_column("age".to_string()));
1✔
3070
        assert!(users.contains_column("name".to_string()));
2✔
3071

3072
        cleanup(&path);
1✔
3073
    }
3074

3075
    #[test]
3076
    fn drop_table_survives_save_and_reopen() {
3✔
3077
        let path = tmp_path("drop_table_roundtrip");
1✔
3078
        let mut db = seed_db();
1✔
3079
        save_database(&mut db, &path).expect("save");
2✔
3080

3081
        // Verify both tables landed.
3082
        {
3083
            let loaded = open_database(&path, "t".to_string()).expect("open");
1✔
3084
            assert!(loaded.contains_table("users".to_string()));
2✔
3085
            assert!(loaded.contains_table("notes".to_string()));
1✔
3086
        }
3087

3088
        process_command("DROP TABLE users;", &mut db).expect("drop users");
1✔
3089
        save_database(&mut db, &path).expect("save after drop");
1✔
3090

3091
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
3092
        assert!(
×
3093
            !loaded.contains_table("users".to_string()),
2✔
3094
            "dropped table should not resurface on reopen"
3095
        );
3096
        assert!(
×
3097
            loaded.contains_table("notes".to_string()),
2✔
3098
            "untouched table should survive"
3099
        );
3100

3101
        cleanup(&path);
2✔
3102
    }
3103

3104
    #[test]
3105
    fn drop_index_survives_save_and_reopen() {
3✔
3106
        let path = tmp_path("drop_index_roundtrip");
1✔
3107
        let mut db = Database::new("t".to_string());
2✔
3108
        process_command(
3109
            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
3110
            &mut db,
3111
        )
3112
        .unwrap();
3113
        process_command("CREATE INDEX notes_body_idx ON notes (body);", &mut db).unwrap();
1✔
3114
        save_database(&mut db, &path).expect("save");
1✔
3115

3116
        process_command("DROP INDEX notes_body_idx;", &mut db).unwrap();
1✔
3117
        save_database(&mut db, &path).expect("save after drop");
1✔
3118

3119
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
3120
        let notes = loaded.get_table("notes".to_string()).unwrap();
2✔
3121
        assert!(
×
3122
            notes.index_by_name("notes_body_idx").is_none(),
1✔
3123
            "dropped index should not resurface on reopen"
3124
        );
3125
        // The auto-index for the PK should still be there.
3126
        assert!(notes.index_by_name("sqlrite_autoindex_notes_id").is_some());
2✔
3127

3128
        cleanup(&path);
1✔
3129
    }
3130

3131
    #[test]
3132
    fn default_clause_survives_save_and_reopen() {
3✔
3133
        let path = tmp_path("default_roundtrip");
1✔
3134
        let mut db = Database::new("t".to_string());
2✔
3135

3136
        process_command(
3137
            "CREATE TABLE users (id INTEGER PRIMARY KEY, status TEXT DEFAULT 'active', score INTEGER DEFAULT 0);",
3138
            &mut db,
3139
        )
3140
        .unwrap();
3141
        save_database(&mut db, &path).expect("save");
1✔
3142

3143
        let mut loaded = open_database(&path, "t".to_string()).expect("open");
1✔
3144

3145
        // The reloaded column metadata should still carry the DEFAULT.
3146
        let users = loaded.get_table("users".to_string()).expect("users table");
2✔
3147
        let status_col = users
3✔
3148
            .columns
3149
            .iter()
3150
            .find(|c| c.column_name == "status")
3✔
3151
            .expect("status column");
3152
        assert_eq!(
2✔
3153
            status_col.default,
3154
            Some(Value::Text("active".to_string())),
1✔
3155
            "DEFAULT 'active' should round-trip"
3156
        );
3157
        let score_col = users
3✔
3158
            .columns
3159
            .iter()
3160
            .find(|c| c.column_name == "score")
3✔
3161
            .expect("score column");
3162
        assert_eq!(
1✔
3163
            score_col.default,
3164
            Some(Value::Integer(0)),
3165
            "DEFAULT 0 should round-trip"
3166
        );
3167

3168
        // Now exercise the runtime path: an INSERT that omits both DEFAULT
3169
        // columns should pick them up from the reloaded schema.
3170
        process_command("INSERT INTO users (id) VALUES (1);", &mut loaded).unwrap();
2✔
3171
        let users = loaded.get_table("users".to_string()).unwrap();
1✔
3172
        assert_eq!(
1✔
3173
            users.get_value("status", 1),
1✔
3174
            Some(Value::Text("active".to_string()))
2✔
3175
        );
3176
        assert_eq!(users.get_value("score", 1), Some(Value::Integer(0)));
1✔
3177

3178
        cleanup(&path);
1✔
3179
    }
3180

3181
    // ---------------------------------------------------------------------
3182
    // SQLR-6 — free-list + VACUUM tests
3183
    // ---------------------------------------------------------------------
3184

3185
    /// Drop a table; subsequent CREATE TABLE should reuse the freed pages
3186
    /// rather than extending the file. The page_count after drop+create
3187
    /// should be at most what it was after the original two tables —
3188
    /// proving the new table landed on freelist pages.
3189
    #[test]
3190
    fn drop_table_freelist_persists_pages_for_reuse() {
3✔
3191
        let path = tmp_path("freelist_reuse");
1✔
3192
        let mut db = seed_db();
1✔
3193
        db.source_path = Some(path.clone());
2✔
3194
        save_database(&mut db, &path).expect("save");
1✔
3195
        let pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
1✔
3196

3197
        // Drop one table; its pages go on the freelist.
3198
        process_command("DROP TABLE users;", &mut db).expect("drop users");
1✔
3199
        let pages_after_drop = db.pager.as_ref().unwrap().header().page_count;
1✔
3200
        assert_eq!(
1✔
3201
            pages_after_drop, pages_two_tables,
3202
            "page_count should not shrink on drop — the freed pages persist on the freelist"
3203
        );
3204
        let head_after_drop = db.pager.as_ref().unwrap().header().freelist_head;
2✔
3205
        assert!(
×
3206
            head_after_drop != 0,
1✔
3207
            "freelist_head must be non-zero after drop"
3208
        );
3209

3210
        // Re-create a similar-shaped table; should reuse freelist pages.
3211
        process_command(
3212
            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3213
            &mut db,
3214
        )
3215
        .expect("create accounts");
3216
        process_command("INSERT INTO accounts (label) VALUES ('a');", &mut db).unwrap();
1✔
3217
        process_command("INSERT INTO accounts (label) VALUES ('b');", &mut db).unwrap();
1✔
3218
        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
1✔
3219
        assert!(
×
3220
            pages_after_create <= pages_two_tables + 2,
1✔
3221
            "creating a similar-sized table after a drop should mostly draw from the \
3222
             freelist, not extend the file (got {pages_after_create} > {pages_two_tables} + 2)"
3223
        );
3224

3225
        cleanup(&path);
2✔
3226
    }
3227

3228
    /// `VACUUM;` after a drop must shrink the file and clear the freelist.
3229
    #[test]
3230
    fn drop_then_vacuum_shrinks_file() {
3✔
3231
        let path = tmp_path("vacuum_shrinks");
1✔
3232
        let mut db = seed_db();
1✔
3233
        db.source_path = Some(path.clone());
2✔
3234
        // Add a few more rows to make the dropped table bigger.
3235
        for i in 0..20 {
1✔
3236
            process_command(
3237
                &format!("INSERT INTO users (name, age) VALUES ('user{i}', {i});"),
2✔
3238
                &mut db,
3239
            )
3240
            .unwrap();
3241
        }
3242
        save_database(&mut db, &path).expect("save");
1✔
3243

3244
        process_command("DROP TABLE users;", &mut db).expect("drop");
1✔
3245
        let size_before_vacuum = std::fs::metadata(&path).unwrap().len();
1✔
3246
        let pages_before_vacuum = db.pager.as_ref().unwrap().header().page_count;
1✔
3247
        let head_before = db.pager.as_ref().unwrap().header().freelist_head;
1✔
3248
        assert!(head_before != 0, "drop should populate the freelist");
1✔
3249

3250
        // VACUUM (via process_command) checkpoints internally so the
3251
        // file actually shrinks on disk before we observe its size.
3252
        process_command("VACUUM;", &mut db).expect("vacuum");
2✔
3253

3254
        let size_after = std::fs::metadata(&path).unwrap().len();
1✔
3255
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3256
        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
1✔
3257
        assert!(
×
3258
            pages_after < pages_before_vacuum,
1✔
3259
            "VACUUM must reduce page_count: was {pages_before_vacuum}, now {pages_after}"
3260
        );
3261
        assert_eq!(head_after, 0, "VACUUM must clear the freelist");
1✔
3262
        assert!(
×
3263
            size_after < size_before_vacuum,
1✔
3264
            "VACUUM must shrink the file on disk: was {size_before_vacuum} bytes, now {size_after}"
3265
        );
3266

3267
        cleanup(&path);
2✔
3268
    }
3269

3270
    /// VACUUM on a non-empty multi-table DB must not lose any rows.
3271
    #[test]
3272
    fn vacuum_round_trips_data() {
3✔
3273
        let path = tmp_path("vacuum_round_trip");
1✔
3274
        let mut db = seed_db();
1✔
3275
        db.source_path = Some(path.clone());
2✔
3276
        save_database(&mut db, &path).expect("save");
1✔
3277
        process_command("VACUUM;", &mut db).expect("vacuum");
1✔
3278

3279
        // Re-open from disk to make sure the on-disk catalog round-trips.
3280
        drop(db);
1✔
3281
        let loaded = open_database(&path, "t".to_string()).expect("reopen after vacuum");
1✔
3282
        assert!(loaded.contains_table("users".to_string()));
2✔
3283
        assert!(loaded.contains_table("notes".to_string()));
1✔
3284
        let users = loaded.get_table("users".to_string()).unwrap();
1✔
3285
        // seed_db inserts two users.
3286
        assert_eq!(users.rowids().len(), 2);
1✔
3287

3288
        cleanup(&path);
1✔
3289
    }
3290

3291
    /// Format version is bumped to v6 only after a save that creates a
3292
    /// non-empty freelist. VACUUM clears the freelist but doesn't
3293
    /// downgrade — v6 is a strict superset, so once at v6 we stay.
3294
    #[test]
3295
    fn freelist_format_version_promotion() {
3✔
3296
        use crate::sql::pager::header::{FORMAT_VERSION_BASELINE, FORMAT_VERSION_V6};
3297
        let path = tmp_path("v6_promotion");
1✔
3298
        let mut db = seed_db();
1✔
3299
        db.source_path = Some(path.clone());
2✔
3300
        save_database(&mut db, &path).expect("save");
1✔
3301
        let v_after_save = db.pager.as_ref().unwrap().header().format_version;
1✔
3302
        assert_eq!(
1✔
3303
            v_after_save, FORMAT_VERSION_BASELINE,
3304
            "fresh DB without drops should stay at the baseline version"
3305
        );
3306

3307
        process_command("DROP TABLE users;", &mut db).expect("drop");
2✔
3308
        let v_after_drop = db.pager.as_ref().unwrap().header().format_version;
1✔
3309
        assert_eq!(
1✔
3310
            v_after_drop, FORMAT_VERSION_V6,
3311
            "first save with a non-empty freelist must promote to V6"
3312
        );
3313

3314
        process_command("VACUUM;", &mut db).expect("vacuum");
2✔
3315
        let v_after_vacuum = db.pager.as_ref().unwrap().header().format_version;
1✔
3316
        assert_eq!(
1✔
3317
            v_after_vacuum, FORMAT_VERSION_V6,
3318
            "VACUUM must not downgrade — V6 is a strict superset"
3319
        );
3320

3321
        cleanup(&path);
2✔
3322
    }
3323

3324
    /// Freelist persists across reopen: drop, save, close, reopen,
3325
    /// confirm the next CREATE TABLE re-uses pages from the persisted
3326
    /// freelist (rather than extending the file).
3327
    #[test]
3328
    fn freelist_round_trip_through_reopen() {
3✔
3329
        let path = tmp_path("freelist_reopen");
1✔
3330
        let pages_two_tables;
3331
        {
3332
            let mut db = seed_db();
1✔
3333
            db.source_path = Some(path.clone());
2✔
3334
            save_database(&mut db, &path).expect("save");
1✔
3335
            pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
1✔
3336
            process_command("DROP TABLE users;", &mut db).expect("drop");
1✔
3337
            let head = db.pager.as_ref().unwrap().header().freelist_head;
1✔
3338
            assert!(head != 0, "drop must populate the freelist");
1✔
3339
        }
3340

3341
        // Reopen from disk — the freelist must come back.
3342
        let mut db = open_database(&path, "t".to_string()).expect("reopen");
1✔
3343
        assert!(
×
3344
            db.pager.as_ref().unwrap().header().freelist_head != 0,
2✔
3345
            "freelist_head must survive close/reopen"
3346
        );
3347

3348
        process_command(
3349
            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3350
            &mut db,
3351
        )
3352
        .expect("create accounts");
3353
        process_command("INSERT INTO accounts (label) VALUES ('reopened');", &mut db).unwrap();
1✔
3354
        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
1✔
3355
        assert!(
×
3356
            pages_after_create <= pages_two_tables + 2,
1✔
3357
            "post-reopen create should reuse freelist (got {pages_after_create} > \
3358
             {pages_two_tables} + 2 — file extended instead of reusing)"
3359
        );
3360

3361
        cleanup(&path);
2✔
3362
    }
3363

3364
    /// VACUUM inside an explicit transaction must error before touching the
3365
    /// disk. `BEGIN; VACUUM;` is the documented rejection path.
3366
    #[test]
3367
    fn vacuum_inside_transaction_is_rejected() {
3✔
3368
        let path = tmp_path("vacuum_txn");
1✔
3369
        let mut db = seed_db();
1✔
3370
        db.source_path = Some(path.clone());
2✔
3371
        save_database(&mut db, &path).expect("save");
1✔
3372

3373
        process_command("BEGIN;", &mut db).expect("begin");
1✔
3374
        let err = process_command("VACUUM;", &mut db).unwrap_err();
1✔
3375
        assert!(
×
3376
            format!("{err}").contains("VACUUM cannot run inside a transaction"),
3✔
3377
            "expected in-transaction rejection, got: {err}"
3378
        );
3379
        // Roll back to leave the DB in a clean state.
3380
        process_command("ROLLBACK;", &mut db).unwrap();
1✔
3381
        cleanup(&path);
1✔
3382
    }
3383

3384
    /// VACUUM on an in-memory database is a documented no-op.
3385
    #[test]
3386
    fn vacuum_on_in_memory_database_is_noop() {
3✔
3387
        let mut db = Database::new("mem".to_string());
1✔
3388
        process_command("CREATE TABLE t (id INTEGER PRIMARY KEY);", &mut db).unwrap();
2✔
3389
        let out = process_command("VACUUM;", &mut db).expect("vacuum no-op");
1✔
3390
        assert!(
×
3391
            out.to_lowercase().contains("no-op") || out.to_lowercase().contains("in-memory"),
2✔
3392
            "expected no-op message for in-memory VACUUM, got: {out}"
3393
        );
3394
    }
3395

3396
    /// Untouched tables shouldn't write any pages on the save that
3397
    /// follows a DROP of an unrelated table. Confirms the per-table
3398
    /// preferred pool keeps page numbers stable so the diff pager skips
3399
    /// every byte-identical leaf.
3400
    #[test]
3401
    fn unchanged_table_pages_skip_diff_after_unrelated_drop() {
3✔
3402
        // Need three tables so dropping one in the middle still leaves
3403
        // an "unrelated" alphabetical neighbour. Layout pre-drop (sorted):
3404
        //   accounts, notes, users
3405
        // Drop `notes`. `accounts` and `users` should keep their pages.
3406
        let path = tmp_path("diff_after_drop");
1✔
3407
        let mut db = Database::new("t".to_string());
2✔
3408
        db.source_path = Some(path.clone());
2✔
3409
        process_command(
3410
            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT);",
3411
            &mut db,
3412
        )
3413
        .unwrap();
3414
        process_command(
3415
            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
3416
            &mut db,
3417
        )
3418
        .unwrap();
3419
        process_command(
3420
            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
3421
            &mut db,
3422
        )
3423
        .unwrap();
3424
        for i in 0..5 {
1✔
3425
            process_command(
3426
                &format!("INSERT INTO accounts (label) VALUES ('a{i}');"),
2✔
3427
                &mut db,
3428
            )
3429
            .unwrap();
3430
            process_command(
3431
                &format!("INSERT INTO notes (body) VALUES ('n{i}');"),
1✔
3432
                &mut db,
3433
            )
3434
            .unwrap();
3435
            process_command(
3436
                &format!("INSERT INTO users (name) VALUES ('u{i}');"),
1✔
3437
                &mut db,
3438
            )
3439
            .unwrap();
3440
        }
3441
        save_database(&mut db, &path).expect("baseline save");
1✔
3442

3443
        // Capture page bytes for `accounts` and `users` so we can
3444
        // verify they don't change.
3445
        let pager = db.pager.as_ref().unwrap();
1✔
3446
        let acc_root = read_old_rootpages(pager, pager.header().schema_root_page)
2✔
3447
            .unwrap()
3448
            .get(&("table".to_string(), "accounts".to_string()))
2✔
3449
            .copied()
3450
            .unwrap();
3451
        let users_root = read_old_rootpages(pager, pager.header().schema_root_page)
2✔
3452
            .unwrap()
3453
            .get(&("table".to_string(), "users".to_string()))
2✔
3454
            .copied()
3455
            .unwrap();
3456
        let acc_bytes_before: Vec<u8> = pager.read_page(acc_root).unwrap().to_vec();
1✔
3457
        let users_bytes_before: Vec<u8> = pager.read_page(users_root).unwrap().to_vec();
2✔
3458

3459
        // Drop the middle table.
3460
        process_command("DROP TABLE notes;", &mut db).expect("drop notes");
2✔
3461

3462
        let pager = db.pager.as_ref().unwrap();
1✔
3463
        // `accounts` and `users` should still live at the same pages
3464
        // with byte-identical content.
3465
        let acc_after = pager.read_page(acc_root).unwrap();
1✔
3466
        let users_after = pager.read_page(users_root).unwrap();
1✔
3467
        assert_eq!(
1✔
3468
            &acc_after[..],
1✔
3469
            &acc_bytes_before[..],
1✔
3470
            "accounts root page must not be rewritten when an unrelated table is dropped"
3471
        );
3472
        assert_eq!(
1✔
3473
            &users_after[..],
2✔
3474
            &users_bytes_before[..],
1✔
3475
            "users root page must not be rewritten when an unrelated table is dropped"
3476
        );
3477

3478
        cleanup(&path);
2✔
3479
    }
3480

3481
    // ---- SQLR-10: auto-VACUUM trigger after page-releasing DDL ----
3482

3483
    /// Builds a file-backed DB with one small "keep" table and one
3484
    /// large "bloat" table, sized so the post-drop freelist will
3485
    /// comfortably cross the default 25% threshold and the
3486
    /// `MIN_PAGES_FOR_AUTO_VACUUM` floor (16 pages). Used by the
3487
    /// auto-VACUUM happy-path tests.
3488
    fn auto_vacuum_setup(path: &std::path::Path) -> Database {
1✔
3489
        let mut db = Database::new("av".to_string());
1✔
3490
        db.source_path = Some(path.to_path_buf());
2✔
3491
        process_command(
3492
            "CREATE TABLE keep (id INTEGER PRIMARY KEY, n INTEGER);",
3493
            &mut db,
3494
        )
3495
        .unwrap();
3496
        process_command("INSERT INTO keep (n) VALUES (1);", &mut db).unwrap();
1✔
3497
        process_command(
3498
            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
3499
            &mut db,
3500
        )
3501
        .unwrap();
3502
        // Wrap the bulk insert in a transaction so we pay one save at
3503
        // COMMIT instead of 5000 round-trips through auto-save.
3504
        process_command("BEGIN;", &mut db).unwrap();
1✔
3505
        for i in 0..5000 {
1✔
3506
            process_command(
3507
                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
2✔
3508
                &mut db,
3509
            )
3510
            .unwrap();
3511
        }
3512
        process_command("COMMIT;", &mut db).unwrap();
1✔
3513
        db
1✔
3514
    }
3515

3516
    /// Default threshold (0.25) is engaged for fresh `Database`s and
3517
    /// fires when a `DROP TABLE` orphans enough pages — file shrinks
3518
    /// without anyone calling `VACUUM;`.
3519
    #[test]
3520
    fn auto_vacuum_default_threshold_triggers_on_drop_table() {
3✔
3521
        let path = tmp_path("av_default_drop_table");
1✔
3522
        let mut db = auto_vacuum_setup(&path);
2✔
3523
        // Sanity: setup respects the shipped default.
3524
        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
2✔
3525

3526
        // Checkpoint before measuring `size_before` so the bloat actually
3527
        // lives in the main file and not just the WAL — otherwise
3528
        // `size_before` is the bare 2-page header and any post-vacuum
3529
        // checkpoint will look like the file *grew*.
3530
        if let Some(p) = db.pager.as_mut() {
1✔
3531
            let _ = p.checkpoint();
2✔
3532
        }
3533
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
2✔
3534
        let size_before = std::fs::metadata(&path).unwrap().len();
1✔
3535
        assert!(
×
3536
            pages_before >= MIN_PAGES_FOR_AUTO_VACUUM,
1✔
3537
            "setup should produce >= MIN_PAGES_FOR_AUTO_VACUUM ({MIN_PAGES_FOR_AUTO_VACUUM}) \
3538
             pages so the floor doesn't suppress the trigger; got {pages_before}"
3539
        );
3540

3541
        // Drop the bloat table — freelist should pass 25% of page_count
3542
        // and the auto-VACUUM hook should compact in place. Note: no
3543
        // explicit `VACUUM;` statement is issued.
3544
        process_command("DROP TABLE bloat;", &mut db).expect("drop");
2✔
3545

3546
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3547
        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
1✔
3548
        // Second checkpoint so the post-vacuum file shrinks on disk
3549
        // (auto-VACUUM stages the compact through WAL just like manual
3550
        // VACUUM does).
3551
        if let Some(p) = db.pager.as_mut() {
1✔
3552
            let _ = p.checkpoint();
2✔
3553
        }
3554
        let size_after = std::fs::metadata(&path).unwrap().len();
2✔
3555

3556
        assert!(
×
3557
            pages_after < pages_before,
1✔
3558
            "auto-VACUUM must reduce page_count: was {pages_before}, now {pages_after}"
3559
        );
3560
        assert_eq!(head_after, 0, "auto-VACUUM must clear the freelist");
1✔
3561
        assert!(
×
3562
            size_after < size_before,
1✔
3563
            "auto-VACUUM must shrink the file on disk: was {size_before}, now {size_after}"
3564
        );
3565

3566
        cleanup(&path);
2✔
3567
    }
3568

3569
    /// Setting the threshold to `None` disables the trigger entirely:
3570
    /// the same workload that shrinks under the default leaves the file
3571
    /// at its high-water mark.
3572
    #[test]
3573
    fn auto_vacuum_disabled_keeps_file_at_hwm() {
3✔
3574
        let path = tmp_path("av_disabled");
1✔
3575
        let mut db = auto_vacuum_setup(&path);
2✔
3576
        db.set_auto_vacuum_threshold(None).expect("disable");
2✔
3577
        assert_eq!(db.auto_vacuum_threshold(), None);
1✔
3578

3579
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
1✔
3580

3581
        process_command("DROP TABLE bloat;", &mut db).expect("drop");
1✔
3582

3583
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3584
        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
1✔
3585
        assert_eq!(
1✔
3586
            pages_after, pages_before,
3587
            "with auto-VACUUM disabled, drop must keep page_count at the HWM"
3588
        );
3589
        assert!(
×
3590
            head_after != 0,
1✔
3591
            "drop must still populate the freelist (manual VACUUM would be needed to reclaim)"
3592
        );
3593

3594
        cleanup(&path);
2✔
3595
    }
3596

3597
    /// `DROP INDEX` is the second of three page-releasing DDL paths
3598
    /// covered by SQLR-10. We bloat the freelist via a separate
3599
    /// `DROP TABLE` first (with auto-VACUUM disabled so it doesn't
3600
    /// compact early), then re-arm the trigger and drop a small index
3601
    /// — the cumulative freelist crosses 25% on the index drop and
3602
    /// auto-VACUUM fires.
3603
    ///
3604
    /// The detour around bloat is necessary because building a
3605
    /// secondary index on a 5000-row column would need multi-level
3606
    /// interior nodes, and the cell-decoder's interior-page support
3607
    /// is a separate work item from SQLR-10.
3608
    #[test]
3609
    fn auto_vacuum_triggers_on_drop_index() {
3✔
3610
        let path = tmp_path("av_drop_index");
1✔
3611
        let mut db = auto_vacuum_setup(&path);
2✔
3612

3613
        // Phase 1: drop the bloat table with auto-VACUUM disabled so
3614
        // its pages land on the freelist without being reclaimed.
3615
        db.set_auto_vacuum_threshold(None).expect("disable");
2✔
3616
        process_command("DROP TABLE bloat;", &mut db).expect("drop bloat");
1✔
3617
        let pages_after_bloat_drop = db.pager.as_ref().unwrap().header().page_count;
1✔
3618
        let head_after_bloat_drop = db.pager.as_ref().unwrap().header().freelist_head;
1✔
3619
        assert!(
×
3620
            head_after_bloat_drop != 0,
1✔
3621
            "bloat drop must populate the freelist (else later index drop won't trip the threshold)"
3622
        );
3623

3624
        // Phase 2: a small index on the surviving `keep` table. The
3625
        // index reuses one page from the freelist (which is fine —
3626
        // freelist still holds plenty more).
3627
        process_command("CREATE INDEX idx_keep_n ON keep (n);", &mut db).expect("create idx");
2✔
3628

3629
        // Phase 3: re-arm the trigger and drop the index. The freelist
3630
        // is already heavily populated from phase 1; this drop just
3631
        // adds the index page on top, keeping the ratio well above
3632
        // 25%, so auto-VACUUM should fire.
3633
        db.set_auto_vacuum_threshold(Some(0.25)).expect("re-arm");
1✔
3634
        process_command("DROP INDEX idx_keep_n;", &mut db).expect("drop index");
1✔
3635

3636
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3637
        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
1✔
3638
        assert!(
×
3639
            pages_after < pages_after_bloat_drop,
1✔
3640
            "DROP INDEX should fire auto-VACUUM and reduce page_count: \
3641
             was {pages_after_bloat_drop}, now {pages_after}"
3642
        );
3643
        assert_eq!(
1✔
3644
            head_after, 0,
3645
            "auto-VACUUM after DROP INDEX must clear the freelist"
3646
        );
3647

3648
        cleanup(&path);
2✔
3649
    }
3650

3651
    /// `ALTER TABLE … DROP COLUMN` releases pages too — the third path
3652
    /// the SQLR-10 trigger covers.
3653
    #[test]
3654
    fn auto_vacuum_triggers_on_alter_drop_column() {
3✔
3655
        let path = tmp_path("av_alter_drop_col");
1✔
3656
        let mut db = auto_vacuum_setup(&path);
2✔
3657
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
2✔
3658

3659
        // Drop the wide `payload` column — this rewrites every row in
3660
        // `bloat` without the column, so the old leaf pages get freed.
3661
        process_command("ALTER TABLE bloat DROP COLUMN payload;", &mut db).expect("alter drop");
1✔
3662

3663
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3664
        assert!(
×
3665
            pages_after < pages_before,
1✔
3666
            "ALTER TABLE DROP COLUMN should fire auto-VACUUM and reduce page_count: \
3667
             was {pages_before}, now {pages_after}"
3668
        );
3669
        assert_eq!(db.pager.as_ref().unwrap().header().freelist_head, 0);
2✔
3670

3671
        cleanup(&path);
1✔
3672
    }
3673

3674
    /// A high threshold (0.99) suppresses the trigger when the freelist
3675
    /// ratio is well below it — the file stays at HWM.
3676
    #[test]
3677
    fn auto_vacuum_skips_below_threshold() {
3✔
3678
        let path = tmp_path("av_below_threshold");
1✔
3679
        let mut db = auto_vacuum_setup(&path);
2✔
3680
        db.set_auto_vacuum_threshold(Some(0.99)).expect("set");
2✔
3681

3682
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
1✔
3683

3684
        process_command("DROP TABLE bloat;", &mut db).expect("drop");
1✔
3685

3686
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3687
        assert_eq!(
1✔
3688
            pages_after, pages_before,
3689
            "freelist ratio after a single drop is far below 0.99 — \
3690
             page_count must stay at the HWM"
3691
        );
3692
        assert!(
×
3693
            db.pager.as_ref().unwrap().header().freelist_head != 0,
2✔
3694
            "drop must still populate the freelist"
3695
        );
3696

3697
        cleanup(&path);
2✔
3698
    }
3699

3700
    /// Inside an explicit transaction, the page-releasing DDL doesn't
3701
    /// flush to disk yet — the freelist isn't accurate, so the trigger
3702
    /// must skip. The compact would also publish in-flight work out of
3703
    /// band, which is exactly what the manual `VACUUM;` rejection
3704
    /// inside a txn already prevents.
3705
    #[test]
3706
    fn auto_vacuum_skips_inside_transaction() {
3✔
3707
        let path = tmp_path("av_in_txn");
1✔
3708
        let mut db = auto_vacuum_setup(&path);
2✔
3709
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
2✔
3710

3711
        process_command("BEGIN;", &mut db).expect("begin");
1✔
3712
        process_command("DROP TABLE bloat;", &mut db).expect("drop in txn");
1✔
3713
        // Mid-transaction: no save has occurred, so the on-disk
3714
        // freelist_head must be unchanged and page_count must not have
3715
        // shifted from a sneaky compact.
3716
        let pages_mid = db.pager.as_ref().unwrap().header().page_count;
1✔
3717
        assert_eq!(
1✔
3718
            pages_mid, pages_before,
3719
            "auto-VACUUM must not fire mid-transaction"
3720
        );
3721

3722
        process_command("ROLLBACK;", &mut db).expect("rollback");
2✔
3723
        cleanup(&path);
1✔
3724
    }
3725

3726
    /// Tiny databases (under `MIN_PAGES_FOR_AUTO_VACUUM`) skip the
3727
    /// trigger even if the ratio would otherwise qualify — the cost of
3728
    /// rewriting a 64 KiB file isn't worth the few bytes reclaimed.
3729
    #[test]
3730
    fn auto_vacuum_skips_under_min_pages_floor() {
3✔
3731
        let path = tmp_path("av_under_floor");
1✔
3732
        let mut db = seed_db(); // small: just users + notes, ~5 pages
1✔
3733
        db.source_path = Some(path.clone());
2✔
3734
        save_database(&mut db, &path).expect("save");
1✔
3735
        // Confirm we're below the floor so the test is meaningful.
3736
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
1✔
3737
        assert!(
×
3738
            pages_before < MIN_PAGES_FOR_AUTO_VACUUM,
1✔
3739
            "test setup is too large: floor would not apply (got {pages_before} pages, \
3740
             floor is {MIN_PAGES_FOR_AUTO_VACUUM})"
3741
        );
3742

3743
        process_command("DROP TABLE users;", &mut db).expect("drop");
2✔
3744

3745
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3746
        assert_eq!(
1✔
3747
            pages_after, pages_before,
3748
            "below MIN_PAGES_FOR_AUTO_VACUUM, drop must not trigger compaction"
3749
        );
3750
        assert!(
×
3751
            db.pager.as_ref().unwrap().header().freelist_head != 0,
2✔
3752
            "drop must still populate the freelist normally"
3753
        );
3754

3755
        cleanup(&path);
2✔
3756
    }
3757

3758
    /// Setter rejects NaN, infinities, and values outside `0.0..=1.0`
3759
    /// rather than silently saturating.
3760
    #[test]
3761
    fn set_auto_vacuum_threshold_rejects_out_of_range() {
3✔
3762
        let mut db = Database::new("t".to_string());
1✔
3763
        for bad in [-0.01_f32, 1.01, f32::NAN, f32::INFINITY, f32::NEG_INFINITY] {
3✔
3764
            let err = db.set_auto_vacuum_threshold(Some(bad)).unwrap_err();
2✔
3765
            assert!(
×
3766
                format!("{err}").contains("auto_vacuum_threshold"),
3✔
3767
                "expected a typed range error for {bad}, got: {err}"
3768
            );
3769
        }
3770
        // The default survives the rejected sets unchanged.
3771
        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
1✔
3772
        // And valid values land.
3773
        db.set_auto_vacuum_threshold(Some(0.0)).unwrap();
1✔
3774
        assert_eq!(db.auto_vacuum_threshold(), Some(0.0));
1✔
3775
        db.set_auto_vacuum_threshold(Some(1.0)).unwrap();
1✔
3776
        assert_eq!(db.auto_vacuum_threshold(), Some(1.0));
1✔
3777
        db.set_auto_vacuum_threshold(None).unwrap();
1✔
3778
        assert_eq!(db.auto_vacuum_threshold(), None);
1✔
3779
    }
3780

3781
    /// VACUUM modifiers (FULL, REINDEX, table targets, …) are rejected
3782
    /// with NotImplemented — only bare `VACUUM;` is supported.
3783
    #[test]
3784
    fn vacuum_modifiers_are_rejected() {
3✔
3785
        let path = tmp_path("vacuum_modifiers");
1✔
3786
        let mut db = seed_db();
1✔
3787
        db.source_path = Some(path.clone());
2✔
3788
        save_database(&mut db, &path).expect("save");
1✔
3789
        for stmt in ["VACUUM FULL;", "VACUUM users;"] {
2✔
3790
            let err = process_command(stmt, &mut db).unwrap_err();
2✔
3791
            assert!(
×
3792
                format!("{err}").contains("VACUUM modifiers"),
3✔
3793
                "expected modifier rejection for `{stmt}`, got: {err}"
3794
            );
3795
        }
3796
        cleanup(&path);
1✔
3797
    }
3798
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc