• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

joaoh82 / rust_sqlite / 26391649718

25 May 2026 08:39AM UTC coverage: 68.845%. Remained the same
26391649718

push

github

joaoh82
Merge branch 'main' of https://github.com/joaoh82/rust_sqlite

101 of 108 new or added lines in 5 files covered. (93.52%)

376 existing lines in 5 files now uncovered.

11190 of 16254 relevant lines covered (68.84%)

1.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.58
/src/sql/pager/mod.rs
1
//! On-disk persistence for a `Database`, using fixed-size paged files.
2
//!
3
//! The file is a sequence of 4 KiB pages. Page 0 holds the header
4
//! (magic, version, page count, schema-root pointer). Every other page carries
5
//! a small per-page header (type tag + next-page pointer + payload length)
6
//! followed by a payload of up to 4089 bytes.
7
//!
8
//! **Storage strategy (format version 2, Phase 3c.5).**
9
//!
10
//! - Each `Table`'s rows live as **cells** in a chain of `TableLeaf` pages.
11
//!   Cell layout and slot directory are in `cell.rs` / `table_page.rs`;
12
//!   cells that exceed the inline threshold spill into an overflow chain
13
//!   via `overflow.rs`.
14
//! - The schema catalog is itself a regular table named `sqlrite_master`,
15
//!   with one row per user table:
16
//!       `(name TEXT PRIMARY KEY, sql TEXT NOT NULL,
17
//!         rootpage INTEGER NOT NULL, last_rowid INTEGER NOT NULL)`
18
//!   This is the SQLite-style approach: the schema of `sqlrite_master`
19
//!   itself is hardcoded into the engine so the open path can bootstrap.
20
//! - Page 0's `schema_root_page` field points at the first leaf of
21
//!   `sqlrite_master`.
22
//!
23
//! **Format version.** Version 2 is not compatible with files produced by
24
//! earlier commits. Opening a v1 file returns a clean error — users on
25
//! old files have to regenerate them from CREATE/INSERT, as there's no
26
//! production data to migrate yet.
27

28
// Data-layer modules. Not every helper in these modules is used by save/open
29
// yet — some exist for tests, some for future maintenance operations.
30
// Module-level #[allow(dead_code)] keeps the build quiet without dotting
31
// the modules with per-item attributes.
32
#[allow(dead_code)]
33
pub mod allocator;
34
#[allow(dead_code)]
35
pub mod cell;
36
pub mod file;
37
#[allow(dead_code)]
38
pub mod freelist;
39
#[allow(dead_code)]
40
pub mod fts_cell;
41
pub mod header;
42
#[allow(dead_code)]
43
pub mod hnsw_cell;
44
#[allow(dead_code)]
45
pub mod index_cell;
46
#[allow(dead_code)]
47
pub mod interior_page;
48
pub mod overflow;
49
pub mod page;
50
pub mod pager;
51
#[allow(dead_code)]
52
pub mod table_page;
53
#[allow(dead_code)]
54
pub mod varint;
55
#[allow(dead_code)]
56
pub mod wal;
57

58
use std::collections::{BTreeMap, HashMap};
59
use std::path::Path;
60
use std::sync::{Arc, Mutex};
61

62
use crate::sql::dialect::SqlriteDialect;
63
use sqlparser::parser::Parser;
64

65
use crate::error::{Result, SQLRiteError};
66
use crate::sql::db::database::Database;
67
use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
68
use crate::sql::db::table::{Column, DataType, Row, Table, Value};
69
use crate::sql::hnsw::DistanceMetric;
70
use crate::sql::pager::cell::Cell;
71
use crate::sql::pager::header::DbHeader;
72
use crate::sql::pager::index_cell::IndexCell;
73
use crate::sql::pager::interior_page::{InteriorCell, InteriorPage};
74
use crate::sql::pager::overflow::{
75
    OVERFLOW_THRESHOLD, OverflowRef, PagedEntry, read_overflow_chain, write_overflow_chain,
76
};
77
use crate::sql::pager::page::{PAGE_HEADER_SIZE, PAGE_SIZE, PAYLOAD_PER_PAGE, PageType};
78
use crate::sql::pager::pager::Pager;
79
use crate::sql::pager::table_page::TablePage;
80
use crate::sql::parser::create::CreateQuery;
81

82
// Re-export so callers can spell `sql::pager::AccessMode` without
83
// reaching into the `pager::pager::pager` submodule path.
84
pub use crate::sql::pager::pager::AccessMode;
85

86
/// Name of the internal catalog table. Reserved — user CREATEs of this
87
/// name must be rejected upstream.
88
pub const MASTER_TABLE_NAME: &str = "sqlrite_master";
89

90
/// Opens a database file in read-write mode. Shorthand for
91
/// [`open_database_with_mode`] with [`AccessMode::ReadWrite`].
92
pub fn open_database(path: &Path, db_name: String) -> Result<Database> {
2✔
93
    open_database_with_mode(path, db_name, AccessMode::ReadWrite)
2✔
94
}
95

96
/// Opens a database file in read-only mode. Acquires a shared OS-level
97
/// advisory lock, so other read-only openers coexist but any writer is
98
/// excluded. Attempts to mutate the returned `Database` (e.g. an
99
/// `INSERT`, or a `save_database` call against it) bottom out in a
100
/// `cannot commit: database is opened read-only` error from the Pager.
101
pub fn open_database_read_only(path: &Path, db_name: String) -> Result<Database> {
1✔
102
    open_database_with_mode(path, db_name, AccessMode::ReadOnly)
1✔
103
}
104

105
/// Opens a database file and reconstructs the in-memory `Database`,
106
/// leaving the long-lived `Pager` attached for subsequent auto-save
107
/// (read-write) or consistent-snapshot reads (read-only).
108
pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) -> Result<Database> {
2✔
109
    let pager = Pager::open_with_mode(path, mode)?;
5✔
110

111
    // 1. Load sqlrite_master from the tree at header.schema_root_page.
112
    let mut master = build_empty_master_table();
2✔
113
    load_table_rows(&pager, &mut master, pager.header().schema_root_page)?;
4✔
114

115
    // 2. Two passes over master rows: first build every user table, then
116
    //    attach secondary indexes. Indexes need their base table to exist
117
    //    before we can populate them. Auto-indexes are created at table
118
    //    build time so we only have to load explicit indexes from disk
119
    //    (but we also reload the auto-index CONTENT because Table::new
120
    //    built it empty).
121
    let mut db = Database::new(db_name);
2✔
122
    let mut index_rows: Vec<IndexCatalogRow> = Vec::new();
2✔
123

124
    for rowid in master.rowids() {
6✔
125
        let ty = take_text(&master, "type", rowid)?;
4✔
126
        let name = take_text(&master, "name", rowid)?;
4✔
127
        let sql = take_text(&master, "sql", rowid)?;
4✔
128
        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
4✔
129
        let last_rowid = take_integer(&master, "last_rowid", rowid)?;
2✔
130

131
        match ty.as_str() {
2✔
132
            "table" => {
2✔
133
                let (parsed_name, columns) = parse_create_sql(&sql)?;
4✔
134
                if parsed_name != name {
4✔
135
                    return Err(SQLRiteError::Internal(format!(
×
136
                        "sqlrite_master row '{name}' carries SQL for '{parsed_name}' — corrupt catalog?"
137
                    )));
138
                }
139
                let mut table = build_empty_table(&name, columns, last_rowid);
4✔
140
                if rootpage != 0 {
2✔
141
                    load_table_rows(&pager, &mut table, rootpage)?;
4✔
142
                }
143
                if last_rowid > table.last_rowid {
2✔
144
                    table.last_rowid = last_rowid;
×
145
                }
146
                db.tables.insert(name, table);
4✔
147
            }
148
            "index" => {
4✔
149
                index_rows.push(IndexCatalogRow {
4✔
150
                    name,
2✔
151
                    sql,
2✔
152
                    rootpage,
153
                });
154
            }
155
            other => {
×
156
                return Err(SQLRiteError::Internal(format!(
×
157
                    "sqlrite_master row '{name}' has unknown type '{other}'"
158
                )));
159
            }
160
        }
161
    }
162

163
    // Second pass: attach each index to its table. HNSW indexes
164
    // (Phase 7d.2) take a different code path because their persisted
165
    // form is just the CREATE INDEX SQL — the graph itself isn't
166
    // persisted yet (Phase 7d.3). Detect HNSW via the SQL's USING clause
167
    // and route to a graph-rebuild instead of the B-Tree-cell load.
168
    //
169
    // Phase 8b — same shape for FTS indexes. The posting lists aren't
170
    // persisted yet (Phase 8c), so we replay the CREATE INDEX SQL on
171
    // open and let `execute_create_index` walk current rows.
172
    for row in index_rows {
6✔
173
        if create_index_sql_uses_hnsw(&row.sql) {
4✔
174
            rebuild_hnsw_index(&mut db, &pager, &row)?;
2✔
175
        } else if create_index_sql_uses_fts(&row.sql) {
4✔
176
            rebuild_fts_index(&mut db, &pager, &row)?;
2✔
177
        } else {
178
            attach_index(&mut db, &pager, row)?;
4✔
179
        }
180
    }
181

182
    // Phase 11.9 — replay any MVCC commit batches recovered from
183
    // the WAL into the freshly-built `MvStore`, and seed the
184
    // `MvccClock` past the highest persisted timestamp. Without
185
    // this step the in-memory MVCC state would always start blank
186
    // on reopen — fine for legacy single-session workloads, but a
187
    // correctness gap once `BEGIN CONCURRENT` is in play (a
188
    // second process could hand out a `begin_ts` below an
189
    // already-committed version's `end`, breaking the visibility
190
    // rule).
191
    //
192
    // The clock seed is the larger of (header.clock_high_water,
193
    // max(commit_ts among replayed batches)) so a crash between
194
    // commits and the next checkpoint — where the header's
195
    // high-water lags reality — still produces a clock that
196
    // doesn't regress.
197
    replay_mvcc_into_db(&mut db, &pager)?;
2✔
198

199
    db.source_path = Some(path.to_path_buf());
2✔
200
    db.pager = Some(pager);
2✔
201
    Ok(db)
2✔
202
}
203

204
/// Phase 11.9 — drains every MVCC commit batch the Pager recovered
205
/// from the WAL into `db.mv_store`, and advances `db.mvcc_clock`
206
/// to at least the highest observed timestamp.
207
///
208
/// Batches are replayed in WAL order, which matches commit order
209
/// (the WAL appends sequentially). Each record's `commit_ts`
210
/// becomes the version's `begin`, with the previous latest
211
/// version's `end` capped at the same timestamp — identical to
212
/// the live-commit path's `MvStore::push_committed`.
213
fn replay_mvcc_into_db(db: &mut Database, pager: &Pager) -> Result<()> {
2✔
214
    use crate::mvcc::RowVersion;
215

216
    let mut clock_seed = pager.clock_high_water();
2✔
217
    for batch in pager.recovered_mvcc_commits() {
4✔
218
        if batch.commit_ts > clock_seed {
2✔
219
            clock_seed = batch.commit_ts;
1✔
220
        }
221
        for rec in &batch.records {
3✔
222
            let version = RowVersion::committed(batch.commit_ts, rec.payload.clone());
1✔
223
            db.mv_store
2✔
224
                .push_committed(rec.row.clone(), version)
2✔
225
                .map_err(|e| {
1✔
226
                    SQLRiteError::Internal(format!(
×
227
                        "WAL MVCC replay: push_committed failed for {}/{}: {e}",
228
                        rec.row.table, rec.row.rowid,
229
                    ))
230
                })?;
231
        }
232
    }
233
    if clock_seed > 0 {
2✔
234
        db.mvcc_clock.observe(clock_seed);
1✔
235
    }
236
    Ok(())
2✔
237
}
238

239
/// Catalog row for a secondary index — deferred until after every table is
240
/// loaded so the index's base table exists by the time we populate it.
241
struct IndexCatalogRow {
242
    name: String,
243
    sql: String,
244
    rootpage: u32,
245
}
246

247
/// Persists `db` to disk. Diff-pager skips writing pages whose bytes
248
/// haven't changed; the [`PageAllocator`] preserves per-table page
249
/// numbers across saves so unchanged tables produce zero dirty frames.
250
///
251
/// Pages that were live before this save but aren't restaged this round
252
/// (e.g., the leaves of a dropped table) move onto a persisted free
253
/// list rooted at `header.freelist_head`; subsequent saves draw from
254
/// the freelist before extending the file. `VACUUM` (see
255
/// [`vacuum_database`]) compacts the file by ignoring the freelist and
256
/// allocating linearly from page 1.
257
///
258
/// [`PageAllocator`]: crate::sql::pager::allocator::PageAllocator
259
pub fn save_database(db: &mut Database, path: &Path) -> Result<()> {
2✔
260
    save_database_with_mode(db, path, /*compact=*/ false)
2✔
261
}
262

263
/// Reclaims space by rewriting every live B-Tree contiguously from
264
/// page 1, with no freelist. Equivalent to `save_database` but ignores
265
/// the existing freelist and per-table preferred pools — every page is
266
/// allocated by extending the high-water mark — so the resulting file
267
/// is tightly packed and the freelist is empty.
268
///
269
/// Used by the SQL-level `VACUUM;` statement.
270
pub fn vacuum_database(db: &mut Database, path: &Path) -> Result<()> {
1✔
271
    save_database_with_mode(db, path, /*compact=*/ true)
1✔
272
}
273

274
/// Shared save core. `compact = false` is the normal save path (uses
275
/// the existing freelist + per-table preferred pools). `compact = true`
276
/// is the VACUUM path (empty freelist, empty preferred pools, linear
277
/// allocation from page 1).
278
fn save_database_with_mode(db: &mut Database, path: &Path, compact: bool) -> Result<()> {
2✔
279
    // Phase 7d.3 — rebuild any HNSW index that DELETE / UPDATE-on-vector
280
    // marked dirty. Done up front under the &mut Database borrow we
281
    // already hold, before the immutable iteration loops below need
282
    // their own borrow.
283
    rebuild_dirty_hnsw_indexes(db)?;
2✔
284
    // Phase 8b — same drill for FTS indexes flagged by DELETE / UPDATE.
285
    rebuild_dirty_fts_indexes(db);
2✔
286

287
    let same_path = db.source_path.as_deref() == Some(path);
2✔
288
    let mut pager = if same_path {
2✔
289
        match db.pager.take() {
2✔
290
            Some(p) => p,
2✔
291
            None if path.exists() => Pager::open(path)?,
4✔
292
            None => Pager::create(path)?,
2✔
293
        }
294
    } else if path.exists() {
3✔
295
        Pager::open(path)?
1✔
296
    } else {
297
        Pager::create(path)?
2✔
298
    };
299

300
    // Snapshot what was live BEFORE we reset staged. Used to compute the
301
    // newly-freed set after staging completes. Page 0 (the header) is
302
    // never on the freelist — it's always live.
303
    let old_header = pager.header();
2✔
304
    let old_live: std::collections::HashSet<u32> = (1..old_header.page_count).collect();
2✔
305

306
    // Read the previously-persisted freelist so its leaf pages can be
307
    // reused as preferred allocations and its trunk pages don't leak.
308
    let (old_free_leaves, old_free_trunks) = if compact || old_header.freelist_head == 0 {
6✔
309
        (Vec::new(), Vec::new())
4✔
310
    } else {
311
        crate::sql::pager::freelist::read_freelist(&pager, old_header.freelist_head)?
2✔
312
    };
313

314
    // Snapshot the previous rootpages of each table/index so we can
315
    // seed per-table preferred pools (the unchanged-table case stages
316
    // byte-identical pages → diff pager skips every write for it).
317
    let old_rootpages = if compact {
2✔
318
        HashMap::new()
2✔
319
    } else {
320
        read_old_rootpages(&pager, old_header.schema_root_page)?
4✔
321
    };
322

323
    // SQLR-1 — snapshot every prior B-Tree's page set NOW, before any
324
    // staging starts. `Pager::read_page` shadows on-disk bytes with the
325
    // current `staged` buffer, so if we deferred these walks until each
326
    // object's turn in the staging loop, a *new* index added in this
327
    // save would extend past the old high-water and overwrite the
328
    // pages of any later-staged object whose old root sits in that
329
    // range — including `sqlrite_master`, which is always staged last.
330
    // The follow-up walk would then read the wrong B-Tree's bytes and
331
    // either hand the allocator a bogus preferred pool or panic
332
    // dispatching cells (a table-cell decoder vs. an index leaf, the
333
    // shape of the original SQLR-1 panic). Walking up front pins each
334
    // map to the committed bytes that were on disk before this save
335
    // touched anything.
336
    let old_preferred_pages: HashMap<(String, String), Vec<u32>> = if compact {
2✔
337
        HashMap::new()
2✔
338
    } else {
339
        let mut map: HashMap<(String, String), Vec<u32>> = HashMap::new();
2✔
340
        for ((kind, name), &root) in &old_rootpages {
6✔
341
            // Tables can carry overflow chains; index/HNSW/FTS leaves
342
            // never overflow in the current encoding, so the cheaper
343
            // walk suffices for them.
344
            let follow = kind == "table";
4✔
345
            let pages = collect_pages_for_btree(&pager, root, follow)?;
2✔
346
            map.insert((kind.clone(), name.clone()), pages);
4✔
347
        }
348
        map
2✔
349
    };
350
    let old_master_pages: Vec<u32> = if compact || old_header.schema_root_page == 0 {
4✔
351
        Vec::new()
2✔
352
    } else {
353
        collect_pages_for_btree(
4✔
354
            &pager,
355
            old_header.schema_root_page,
2✔
356
            /*follow_overflow=*/ true,
357
        )?
358
    };
359

360
    pager.clear_staged();
2✔
361

362
    // Allocator: in normal mode, seed with the old freelist; in compact
363
    // mode, start empty so allocation extends linearly from page 1.
364
    use std::collections::VecDeque;
365
    let initial_freelist: VecDeque<u32> = if compact {
2✔
366
        VecDeque::new()
2✔
367
    } else {
368
        crate::sql::pager::freelist::freelist_to_deque(old_free_leaves.clone())
4✔
369
    };
370
    let mut alloc = crate::sql::pager::allocator::PageAllocator::new(initial_freelist, 1);
2✔
371

372
    // 1. Stage each user table's B-Tree, collecting master-row info.
373
    //    `kind` is "table" or "index" — master has one row per each.
374
    let mut master_rows: Vec<CatalogEntry> = Vec::new();
2✔
375

376
    let mut table_names: Vec<&String> = db.tables.keys().collect();
4✔
377
    table_names.sort();
4✔
378
    for name in table_names {
4✔
379
        if name == MASTER_TABLE_NAME {
4✔
380
            return Err(SQLRiteError::Internal(format!(
×
381
                "user table cannot be named '{MASTER_TABLE_NAME}' (reserved)"
382
            )));
383
        }
384
        if !compact {
2✔
385
            if let Some(prev) = old_preferred_pages.get(&("table".to_string(), name.to_string())) {
6✔
386
                alloc.set_preferred(prev.clone());
4✔
387
            }
388
        }
389
        let table = &db.tables[name];
4✔
390
        let rootpage = stage_table_btree(&mut pager, table, &mut alloc)?;
2✔
391
        alloc.finish_preferred();
2✔
392
        master_rows.push(CatalogEntry {
2✔
393
            kind: "table".into(),
2✔
394
            name: name.clone(),
2✔
395
            sql: table_to_create_sql(table),
2✔
396
            rootpage,
397
            last_rowid: table.last_rowid,
2✔
398
        });
399
    }
400

401
    // 2. Stage each secondary index's B-Tree. Indexes persist in a
402
    //    deterministic order: sorted by (owning_table, index_name).
403
    let mut index_entries: Vec<(&Table, &SecondaryIndex)> = Vec::new();
2✔
404
    for table in db.tables.values() {
4✔
405
        for idx in &table.secondary_indexes {
4✔
406
            index_entries.push((table, idx));
2✔
407
        }
408
    }
409
    index_entries
2✔
410
        .sort_by(|(ta, ia), (tb, ib)| ta.tb_name.cmp(&tb.tb_name).then(ia.name.cmp(&ib.name)));
4✔
411
    for (_table, idx) in index_entries {
4✔
412
        if !compact {
2✔
413
            if let Some(prev) =
6✔
414
                old_preferred_pages.get(&("index".to_string(), idx.name.to_string()))
415
            {
416
                alloc.set_preferred(prev.clone());
4✔
417
            }
418
        }
419
        let rootpage = stage_index_btree(&mut pager, idx, &mut alloc)?;
4✔
420
        alloc.finish_preferred();
2✔
421
        master_rows.push(CatalogEntry {
2✔
422
            kind: "index".into(),
2✔
423
            name: idx.name.clone(),
2✔
424
            sql: idx.synthesized_sql(),
2✔
425
            rootpage,
426
            last_rowid: 0,
427
        });
428
    }
429

430
    // 2b. Phase 7d.3: persist HNSW indexes as their own cell-encoded
431
    //     page trees, with the rootpage recorded in sqlrite_master.
432
    //     Reopen loads the graph back from cells (fast, exact match)
433
    //     instead of rebuilding from rows.
434
    //
435
    //     Dirty indexes (set by DELETE / UPDATE-on-vector-col) are
436
    //     rebuilt from current rows BEFORE staging, so the on-disk
437
    //     graph reflects the current row set.
438
    let mut hnsw_entries: Vec<(&Table, &crate::sql::db::table::HnswIndexEntry)> = Vec::new();
2✔
439
    for table in db.tables.values() {
4✔
440
        for entry in &table.hnsw_indexes {
4✔
441
            hnsw_entries.push((table, entry));
1✔
442
        }
443
    }
444
    hnsw_entries
2✔
445
        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
2✔
446
    for (table, entry) in hnsw_entries {
4✔
447
        if !compact {
1✔
448
            if let Some(prev) =
1✔
449
                old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
450
            {
451
                alloc.set_preferred(prev.clone());
×
452
            }
453
        }
454
        let rootpage = stage_hnsw_btree(&mut pager, &entry.index, &mut alloc)?;
2✔
455
        alloc.finish_preferred();
1✔
456
        master_rows.push(CatalogEntry {
1✔
457
            kind: "index".into(),
1✔
458
            name: entry.name.clone(),
1✔
459
            sql: synthesize_hnsw_create_index_sql(
1✔
460
                &entry.name,
1✔
461
                &table.tb_name,
1✔
462
                &entry.column_name,
1✔
463
                entry.metric,
464
            ),
465
            rootpage,
466
            last_rowid: 0,
467
        });
468
    }
469

470
    // 2c. Phase 8c — persist FTS posting lists as their own
471
    //     cell-encoded page trees, with the rootpage recorded in
472
    //     sqlrite_master. Reopen loads the postings back from cells
473
    //     (fast, exact match) instead of re-tokenizing rows.
474
    //
475
    //     Dirty indexes (set by DELETE / UPDATE-on-text-col) are
476
    //     rebuilt from current rows BEFORE staging by
477
    //     `rebuild_dirty_fts_indexes`, so the on-disk tree reflects
478
    //     the current row set.
479
    let mut fts_entries: Vec<(&Table, &crate::sql::db::table::FtsIndexEntry)> = Vec::new();
2✔
480
    for table in db.tables.values() {
4✔
481
        for entry in &table.fts_indexes {
4✔
482
            fts_entries.push((table, entry));
1✔
483
        }
484
    }
485
    fts_entries
2✔
486
        .sort_by(|(ta, ea), (tb, eb)| ta.tb_name.cmp(&tb.tb_name).then(ea.name.cmp(&eb.name)));
2✔
487
    let any_fts = !fts_entries.is_empty();
2✔
488
    for (table, entry) in fts_entries {
4✔
489
        if !compact {
1✔
490
            if let Some(prev) =
1✔
491
                old_preferred_pages.get(&("index".to_string(), entry.name.to_string()))
492
            {
493
                alloc.set_preferred(prev.clone());
×
494
            }
495
        }
496
        let rootpage = stage_fts_btree(&mut pager, &entry.index, &mut alloc)?;
2✔
497
        alloc.finish_preferred();
1✔
498
        master_rows.push(CatalogEntry {
1✔
499
            kind: "index".into(),
1✔
500
            name: entry.name.clone(),
1✔
501
            sql: format!(
2✔
502
                "CREATE INDEX {} ON {} USING fts ({})",
503
                entry.name, table.tb_name, entry.column_name
504
            ),
505
            rootpage,
506
            last_rowid: 0,
507
        });
508
    }
509

510
    // 3. Build an in-memory sqlrite_master with one row per table or index,
511
    //    then stage it via the same tree-build path. Seed master's
512
    //    preferred pool with the previous master tree's pages so the
513
    //    catalog page numbers stay stable across saves whenever the
514
    //    catalog content didn't change.
515
    let mut master = build_empty_master_table();
2✔
516
    for (i, entry) in master_rows.into_iter().enumerate() {
8✔
517
        let rowid = (i as i64) + 1;
4✔
518
        master.restore_row(
2✔
519
            rowid,
520
            vec![
4✔
521
                Some(Value::Text(entry.kind)),
2✔
522
                Some(Value::Text(entry.name)),
2✔
523
                Some(Value::Text(entry.sql)),
2✔
524
                Some(Value::Integer(entry.rootpage as i64)),
2✔
525
                Some(Value::Integer(entry.last_rowid)),
2✔
526
            ],
527
        )?;
528
    }
529
    if !compact && !old_master_pages.is_empty() {
4✔
530
        // Use the page list snapshotted before any staging touched
531
        // disk; re-walking here would read whatever a new index
532
        // already restaged on top of master's old root (SQLR-1).
533
        alloc.set_preferred(old_master_pages.clone());
2✔
534
    }
535
    let master_root = stage_table_btree(&mut pager, &master, &mut alloc)?;
4✔
536
    alloc.finish_preferred();
2✔
537

538
    // 4. Compute newly-freed pages: the previously-live set minus what
539
    //    we just restaged. The previous freelist's trunk pages get
540
    //    re-encoded too — they're in `old_live`, weren't restaged, so
541
    //    the filter naturally moves them to the new freelist.
542
    //
543
    // In `compact` mode (VACUUM), we *discard* newly_freed instead of
544
    // routing it onto the new freelist. The whole point of VACUUM is
545
    // to let the file truncate to the new high-water mark, so any page
546
    // past it gets dropped at the next checkpoint.
547
    if !compact {
2✔
548
        let used = alloc.used().clone();
4✔
549
        let mut newly_freed: Vec<u32> = old_live
550
            .iter()
551
            .copied()
552
            .filter(|p| !used.contains(p))
6✔
553
            .collect();
554
        let _ = &old_free_trunks; // silenced — handled by the old_live filter
555
        alloc.add_to_freelist(newly_freed.drain(..));
4✔
556
    }
557

558
    // 5. Encode the new freelist into trunk pages. `stage_freelist`
559
    //    consumes some of the free pages AS the trunk pages themselves —
560
    //    a trunk is just a free page borrowed for metadata. Pages that
561
    //    were on the freelist but become trunks no longer need to be
562
    //    "extension" pages; the high-water mark from the staging loop
563
    //    above is already correct.
564
    let new_free_pages = alloc.drain_freelist();
2✔
565
    let new_freelist_head =
2✔
566
        crate::sql::pager::freelist::stage_freelist(&mut pager, new_free_pages)?;
567

568
    // 6. Pick the format version. v6 is on demand: only bumps when the
569
    //    new freelist is non-empty. FTS-bearing files keep their v5
570
    //    promotion; v6 is a strict superset (v6 readers handle v4/v5/v6).
571
    use crate::sql::pager::header::{FORMAT_VERSION_V5, FORMAT_VERSION_V6};
572
    let format_version = if new_freelist_head != 0 {
3✔
573
        FORMAT_VERSION_V6
1✔
574
    } else if any_fts {
4✔
575
        // Preserve a v6 file at v6 (don't downgrade) but otherwise
576
        // bump v4 → v5 for FTS like Phase 8c does.
577
        std::cmp::max(FORMAT_VERSION_V5, old_header.format_version)
2✔
578
    } else {
579
        // Preserve whatever the file already was.
580
        old_header.format_version
2✔
581
    };
582

583
    pager.commit(DbHeader {
2✔
584
        page_count: alloc.high_water(),
2✔
585
        schema_root_page: master_root,
586
        format_version,
2✔
587
        freelist_head: new_freelist_head,
588
    })?;
589

590
    if same_path {
4✔
591
        db.pager = Some(pager);
2✔
592
    }
593
    Ok(())
2✔
594
}
595

596
/// Build material for a single row in sqlrite_master.
597
struct CatalogEntry {
598
    kind: String, // "table" or "index"
599
    name: String,
600
    sql: String,
601
    rootpage: u32,
602
    last_rowid: i64,
603
}
604

605
// -------------------------------------------------------------------------
606
// sqlrite_master — hardcoded catalog table schema
607

608
fn build_empty_master_table() -> Table {
2✔
609
    // Phase 3e: `type` is the first column, matching SQLite's convention.
610
    // It distinguishes `'table'` rows from `'index'` rows.
611
    let columns = vec![
4✔
612
        Column::new("type".into(), "text".into(), false, true, false),
4✔
613
        Column::new("name".into(), "text".into(), true, true, true),
4✔
614
        Column::new("sql".into(), "text".into(), false, true, false),
4✔
615
        Column::new("rootpage".into(), "integer".into(), false, true, false),
4✔
616
        Column::new("last_rowid".into(), "integer".into(), false, true, false),
4✔
617
    ];
618
    build_empty_table(MASTER_TABLE_NAME, columns, 0)
2✔
619
}
620

621
/// Reads a required Text column from a known-good catalog row.
622
fn take_text(table: &Table, col: &str, rowid: i64) -> Result<String> {
2✔
623
    match table.get_value(col, rowid) {
2✔
624
        Some(Value::Text(s)) => Ok(s),
2✔
625
        other => Err(SQLRiteError::Internal(format!(
×
626
            "sqlrite_master column '{col}' at rowid {rowid}: expected Text, got {other:?}"
627
        ))),
628
    }
629
}
630

631
/// Reads a required Integer column from a known-good catalog row.
632
fn take_integer(table: &Table, col: &str, rowid: i64) -> Result<i64> {
2✔
633
    match table.get_value(col, rowid) {
2✔
634
        Some(Value::Integer(v)) => Ok(v),
2✔
635
        other => Err(SQLRiteError::Internal(format!(
×
636
            "sqlrite_master column '{col}' at rowid {rowid}: expected Integer, got {other:?}"
637
        ))),
638
    }
639
}
640

641
// -------------------------------------------------------------------------
642
// CREATE-TABLE SQL synthesis and re-parsing
643

644
/// Synthesizes a CREATE TABLE SQL string that recreates the table's schema.
645
/// Deterministic: same schema → same SQL, so diffing commits stay stable.
646
fn table_to_create_sql(table: &Table) -> String {
2✔
647
    let mut parts = Vec::with_capacity(table.columns.len());
2✔
648
    for c in &table.columns {
4✔
649
        // Render the SQL type literally so the round-trip through
650
        // CREATE TABLE re-parsing recreates the same schema. Vector
651
        // carries its dimension inline.
652
        let ty: String = match &c.datatype {
2✔
653
            DataType::Integer => "INTEGER".to_string(),
4✔
654
            DataType::Text => "TEXT".to_string(),
4✔
655
            DataType::Real => "REAL".to_string(),
2✔
656
            DataType::Bool => "BOOLEAN".to_string(),
2✔
657
            DataType::Vector(dim) => format!("VECTOR({dim})"),
2✔
658
            DataType::Json => "JSON".to_string(),
2✔
659
            DataType::None | DataType::Invalid => "TEXT".to_string(),
×
660
        };
661
        let mut piece = format!("{} {}", c.column_name, ty);
4✔
662
        if c.is_pk {
2✔
663
            piece.push_str(" PRIMARY KEY");
4✔
664
        } else {
665
            if c.is_unique {
2✔
666
                piece.push_str(" UNIQUE");
2✔
667
            }
668
            if c.not_null {
2✔
669
                piece.push_str(" NOT NULL");
2✔
670
            }
671
        }
672
        if let Some(default) = &c.default {
3✔
673
            piece.push_str(" DEFAULT ");
1✔
674
            piece.push_str(&render_default_literal(default));
1✔
675
        }
676
        parts.push(piece);
2✔
677
    }
678
    format!("CREATE TABLE {} ({});", table.tb_name, parts.join(", "))
2✔
679
}
680

681
/// Renders a DEFAULT value back to SQL-literal form so the synthesized
682
/// CREATE TABLE round-trips through `parse_create_sql`. Text values get
683
/// single-quoted with single-quote doubling for escaping. Vector defaults
684
/// are not currently expressible at CREATE TABLE time, so we render them
685
/// as their bracket-array form (matches the INSERT literal grammar).
686
fn render_default_literal(value: &Value) -> String {
1✔
687
    match value {
1✔
688
        Value::Integer(i) => i.to_string(),
1✔
689
        Value::Real(f) => f.to_string(),
×
690
        Value::Bool(b) => {
×
691
            if *b {
×
692
                "TRUE".to_string()
×
693
            } else {
694
                "FALSE".to_string()
×
695
            }
696
        }
697
        Value::Text(s) => format!("'{}'", s.replace('\'', "''")),
1✔
698
        Value::Null => "NULL".to_string(),
×
699
        Value::Vector(_) => value.to_display_string(),
×
700
    }
701
}
702

703
/// Reverses `table_to_create_sql`: feeds the SQL back through `sqlparser`
704
/// and produces our internal column list. Returns `(table_name, columns)`.
705
fn parse_create_sql(sql: &str) -> Result<(String, Vec<Column>)> {
2✔
706
    let dialect = SqlriteDialect::new();
2✔
707
    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
2✔
708
    let stmt = ast.pop().ok_or_else(|| {
4✔
709
        SQLRiteError::Internal("sqlrite_master row held an empty SQL string".to_string())
×
710
    })?;
711
    let create = CreateQuery::new(&stmt)?;
4✔
712
    let columns = create
2✔
713
        .columns
714
        .into_iter()
715
        .map(|pc| {
4✔
716
            Column::with_default(
2✔
717
                pc.name,
2✔
718
                pc.datatype,
2✔
719
                pc.is_pk,
2✔
720
                pc.not_null,
2✔
721
                pc.is_unique,
2✔
722
                pc.default,
2✔
723
            )
724
        })
725
        .collect();
726
    Ok((create.table_name, columns))
2✔
727
}
728

729
// -------------------------------------------------------------------------
730
// In-memory table (re)construction
731

732
/// Builds an empty in-memory `Table` given the declared columns.
733
fn build_empty_table(name: &str, columns: Vec<Column>, last_rowid: i64) -> Table {
2✔
734
    let rows: Arc<Mutex<HashMap<String, Row>>> = Arc::new(Mutex::new(HashMap::new()));
4✔
735
    let mut secondary_indexes: Vec<SecondaryIndex> = Vec::new();
2✔
736
    {
737
        let mut map = rows.lock().expect("rows mutex poisoned");
4✔
738
        for col in &columns {
6✔
739
            // Mirror the dispatch in `Table::new` so the reconstructed
740
            // table has the same shape it'd have if it were built fresh
741
            // from SQL. Phase 7a adds the Vector arm — without it,
742
            // VECTOR columns silently restore as Row::None and every
743
            // restore_row hits a "storage None vs value Some(Vector(...))"
744
            // type mismatch.
745
            let row = match &col.datatype {
2✔
746
                DataType::Integer => Row::Integer(BTreeMap::new()),
4✔
747
                DataType::Text => Row::Text(BTreeMap::new()),
4✔
748
                DataType::Real => Row::Real(BTreeMap::new()),
2✔
749
                DataType::Bool => Row::Bool(BTreeMap::new()),
2✔
750
                DataType::Vector(_dim) => Row::Vector(BTreeMap::new()),
2✔
751
                // JSON columns reuse Text storage — see Table::new and
752
                // Phase 7e's scope-correction note.
753
                DataType::Json => Row::Text(BTreeMap::new()),
2✔
754
                DataType::None | DataType::Invalid => Row::None,
×
755
            };
756
            map.insert(col.column_name.clone(), row);
4✔
757

758
            // Auto-create UNIQUE/PK indexes so the restored table has the
759
            // same shape Table::new would have built from fresh SQL.
760
            if (col.is_pk || col.is_unique)
2✔
761
                && matches!(col.datatype, DataType::Integer | DataType::Text)
2✔
762
            {
763
                if let Ok(idx) = SecondaryIndex::new(
764
                    SecondaryIndex::auto_name(name, &col.column_name),
2✔
765
                    name.to_string(),
4✔
766
                    col.column_name.clone(),
2✔
767
                    &col.datatype,
768
                    true,
769
                    IndexOrigin::Auto,
770
                ) {
771
                    secondary_indexes.push(idx);
2✔
772
                }
773
            }
774
        }
775
    }
776

777
    let primary_key = columns
4✔
778
        .iter()
779
        .find(|c| c.is_pk)
6✔
780
        .map(|c| c.column_name.clone())
6✔
781
        .unwrap_or_else(|| "-1".to_string());
2✔
782

783
    Table {
784
        tb_name: name.to_string(),
2✔
785
        columns,
786
        rows,
787
        secondary_indexes,
788
        // HNSW indexes (Phase 7d.2) are reconstructed on open by re-
789
        // executing each `CREATE INDEX … USING hnsw` SQL stored in
790
        // `sqlrite_master`. This builder produces the empty shell;
791
        // `replay_create_index_for_hnsw` (in this same module) walks
792
        // sqlrite_master after every table is loaded and rebuilds the
793
        // graph from current row data. Persistence of the graph itself
794
        // (avoiding the on-open rebuild cost) is Phase 7d.3.
795
        hnsw_indexes: Vec::new(),
2✔
796
        // FTS indexes (Phase 8b) follow the same pattern — the
797
        // CREATE INDEX … USING fts SQL is the source of truth on open
798
        // and the in-memory posting list gets rebuilt from current
799
        // rows. Cell-encoded persistence of the postings is Phase 8c.
800
        fts_indexes: Vec::new(),
2✔
801
        last_rowid,
802
        primary_key,
803
    }
804
}
805

806
// -------------------------------------------------------------------------
807
// Leaf-chain read / write
808

809
/// Walks a table's B-Tree from `root_page`, following the leftmost-child
810
/// chain down to the first leaf, then iterating leaves via their sibling
811
/// `next_page` pointers. Every cell is decoded and replayed into `table`.
812
///
813
/// Open-path note: we eagerly materialize the entire table into `Table`'s
814
/// in-memory maps. Phase 5 will introduce a `Cursor` that hits the pager
815
/// on demand so queries can stream through the tree without a full upfront
816
/// load.
817
/// Re-parses `CREATE INDEX` SQL from sqlrite_master and restores the
818
/// index on its base table by walking the tree of index cells at
819
/// `rootpage`. The base table is expected to already be in `db.tables`.
820
fn attach_index(db: &mut Database, pager: &Pager, row: IndexCatalogRow) -> Result<()> {
2✔
821
    let (table_name, column_name, is_unique) = parse_create_index_sql(&row.sql)?;
4✔
822

823
    let table = db.get_table_mut(table_name.clone()).map_err(|_| {
4✔
824
        SQLRiteError::Internal(format!(
×
825
            "index '{}' references unknown table '{table_name}' (sqlrite_master out of sync?)",
826
            row.name
827
        ))
828
    })?;
829
    let datatype = table
6✔
830
        .columns
831
        .iter()
2✔
832
        .find(|c| c.column_name == column_name)
6✔
833
        .map(|c| clone_datatype(&c.datatype))
6✔
834
        .ok_or_else(|| {
2✔
835
            SQLRiteError::Internal(format!(
×
836
                "index '{}' references unknown column '{column_name}' on '{table_name}'",
837
                row.name
838
            ))
839
        })?;
840

841
    // An auto-index on this column may already exist (built by
842
    // build_empty_table for UNIQUE/PK columns). If the names match, reuse
843
    // the slot instead of adding a duplicate entry.
844
    let existing_slot = table
6✔
845
        .secondary_indexes
846
        .iter()
847
        .position(|i| i.name == row.name);
6✔
848
    let idx = match existing_slot {
2✔
849
        Some(i) => {
2✔
850
            // Drain any entries that may have been populated during table
851
            // restore_row calls — we're about to repopulate from the
852
            // persisted tree.
853
            table.secondary_indexes.remove(i)
4✔
854
        }
855
        None => SecondaryIndex::new(
2✔
856
            row.name.clone(),
1✔
857
            table_name.clone(),
2✔
858
            column_name.clone(),
1✔
859
            &datatype,
860
            is_unique,
861
            IndexOrigin::Explicit,
862
        )?,
863
    };
864
    let mut idx = idx;
2✔
865
    // Wipe any stale entries from the auto path so the load is idempotent.
866
    let is_unique_flag = idx.is_unique;
2✔
867
    let origin = idx.origin;
2✔
868
    idx = SecondaryIndex::new(
6✔
869
        idx.name,
2✔
870
        idx.table_name,
2✔
871
        idx.column_name,
2✔
872
        &datatype,
873
        is_unique_flag,
874
        origin,
875
    )?;
876

877
    // Populate from the index tree's cells.
878
    load_index_rows(pager, &mut idx, row.rootpage)?;
2✔
879

880
    table.secondary_indexes.push(idx);
2✔
881
    Ok(())
2✔
882
}
883

884
/// Walks the leaves of an index B-Tree rooted at `root_page` and inserts
885
/// every `(value, rowid)` pair into `idx`.
886
fn load_index_rows(pager: &Pager, idx: &mut SecondaryIndex, root_page: u32) -> Result<()> {
2✔
887
    if root_page == 0 {
2✔
888
        return Ok(());
×
889
    }
890
    let first_leaf = find_leftmost_leaf(pager, root_page)?;
2✔
891
    let mut current = first_leaf;
2✔
892
    while current != 0 {
2✔
893
        let page_buf = pager
2✔
894
            .read_page(current)
2✔
895
            .ok_or_else(|| SQLRiteError::Internal(format!("missing index leaf page {current}")))?;
2✔
896
        if page_buf[0] != PageType::TableLeaf as u8 {
2✔
897
            return Err(SQLRiteError::Internal(format!(
×
898
                "page {current} tagged {} but expected TableLeaf (index)",
899
                page_buf[0]
900
            )));
901
        }
902
        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
2✔
903
        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
4✔
904
            .try_into()
2✔
905
            .map_err(|_| SQLRiteError::Internal("index leaf payload size".to_string()))?;
2✔
906
        let leaf = TablePage::from_bytes(payload);
2✔
907

908
        for slot in 0..leaf.slot_count() {
4✔
909
            // Slots on an index page hold KIND_INDEX cells; decode directly.
910
            let offset = leaf.slot_offset_raw(slot)?;
4✔
911
            let (ic, _) = IndexCell::decode(leaf.as_bytes(), offset)?;
2✔
912
            idx.insert(&ic.value, ic.rowid)?;
4✔
913
        }
914
        current = next_leaf;
2✔
915
    }
916
    Ok(())
2✔
917
}
918

919
/// Minimal recognizer for the synthesized-or-user `CREATE INDEX` SQL we
920
/// store in sqlrite_master. Returns `(table_name, column_name, is_unique)`.
921
///
922
/// Uses sqlparser so user-supplied SQL with extra whitespace, case, etc.
923
/// still works; the only shape we accept is single-column indexes.
924
fn parse_create_index_sql(sql: &str) -> Result<(String, String, bool)> {
2✔
925
    use sqlparser::ast::{CreateIndex, Expr, Statement};
926

927
    let dialect = SqlriteDialect::new();
2✔
928
    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
2✔
929
    let Some(Statement::CreateIndex(CreateIndex {
4✔
930
        table_name,
2✔
931
        columns,
2✔
932
        unique,
2✔
933
        ..
934
    })) = ast.pop()
6✔
935
    else {
936
        return Err(SQLRiteError::Internal(format!(
×
937
            "sqlrite_master index row's SQL isn't a CREATE INDEX: {sql}"
938
        )));
939
    };
940
    if columns.len() != 1 {
4✔
941
        return Err(SQLRiteError::NotImplemented(
×
942
            "multi-column indexes aren't supported yet".to_string(),
×
943
        ));
944
    }
945
    let col = match &columns[0].column.expr {
4✔
946
        Expr::Identifier(ident) => ident.value.clone(),
4✔
947
        Expr::CompoundIdentifier(parts) => {
×
948
            parts.last().map(|p| p.value.clone()).unwrap_or_default()
×
949
        }
950
        other => {
×
951
            return Err(SQLRiteError::Internal(format!(
×
952
                "unsupported indexed column expression: {other:?}"
953
            )));
954
        }
955
    };
956
    Ok((table_name.to_string(), col, unique))
4✔
957
}
958

959
/// True iff a CREATE INDEX SQL string uses `USING hnsw` (case-insensitive).
960
/// Used by the open path to route HNSW indexes to the graph-rebuild path
961
/// instead of the standard B-Tree cell-load. Pre-Phase-7d.2 indexes
962
/// don't have a USING clause, so they all return false and continue
963
/// taking the existing path.
964
fn create_index_sql_uses_hnsw(sql: &str) -> bool {
2✔
965
    use sqlparser::ast::{CreateIndex, IndexType, Statement};
966

967
    let dialect = SqlriteDialect::new();
2✔
968
    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
4✔
969
        return false;
×
970
    };
971
    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
6✔
972
        return false;
×
973
    };
974
    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw"))
3✔
975
}
976

977
/// Phase 8b — peeks at a CREATE INDEX SQL to detect `USING fts(...)`.
978
/// Mirrors [`create_index_sql_uses_hnsw`].
979
fn create_index_sql_uses_fts(sql: &str) -> bool {
2✔
980
    use sqlparser::ast::{CreateIndex, IndexType, Statement};
981

982
    let dialect = SqlriteDialect::new();
2✔
983
    let Ok(mut ast) = Parser::parse_sql(&dialect, sql) else {
4✔
984
        return false;
×
985
    };
986
    let Some(Statement::CreateIndex(CreateIndex { using, .. })) = ast.pop() else {
6✔
987
        return false;
×
988
    };
989
    matches!(using, Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts"))
3✔
990
}
991

992
/// Phase 8c — loads (or rebuilds) an FTS index on database open. Two
993
/// paths mirror [`rebuild_hnsw_index`]:
994
///
995
///   - **rootpage != 0** (Phase 8c default): the posting list is
996
///     persisted as cell-encoded pages. Read every cell directly via
997
///     [`load_fts_postings`] and reconstruct the index — no
998
///     re-tokenization, exact bit-for-bit reproduction.
999
///
1000
///   - **rootpage == 0** (compatibility): no on-disk postings, e.g.
1001
///     for files saved by Phase 8b before persistence landed. Replay
1002
///     the CREATE INDEX SQL through `execute_create_index`, which
1003
///     walks the table's current rows and tokenizes them fresh.
1004
fn rebuild_fts_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
1✔
1005
    use crate::sql::db::table::FtsIndexEntry;
1006
    use crate::sql::executor::execute_create_index;
1007
    use crate::sql::fts::PostingList;
1008
    use sqlparser::ast::Statement;
1009

1010
    let dialect = SqlriteDialect::new();
1✔
1011
    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
1✔
1012
    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
3✔
1013
        return Err(SQLRiteError::Internal(format!(
×
1014
            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {}",
1015
            row.sql
1016
        )));
1017
    };
1018

1019
    if row.rootpage == 0 {
1✔
1020
        // Compatibility path — no persisted postings; replay rows.
1021
        execute_create_index(&stmt, db)?;
×
1022
        return Ok(());
×
1023
    }
1024

1025
    let (doc_lengths, postings) = load_fts_postings(pager, row.rootpage)?;
2✔
1026
    let index = PostingList::from_persisted_postings(doc_lengths, postings);
2✔
1027
    let (tbl_name, col_name) = parse_fts_create_index_sql(&row.sql)?;
2✔
1028
    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
2✔
1029
        SQLRiteError::Internal(format!(
×
1030
            "FTS index '{}' references unknown table '{tbl_name}'",
1031
            row.name
1032
        ))
1033
    })?;
1034
    table_mut.fts_indexes.push(FtsIndexEntry {
2✔
1035
        name: row.name.clone(),
1✔
1036
        column_name: col_name,
1✔
1037
        index,
1✔
1038
        needs_rebuild: false,
1039
    });
1040
    Ok(())
1✔
1041
}
1042

1043
/// Pulls (table_name, column_name) out of a `CREATE INDEX … USING fts(col)`
1044
/// SQL string. Same shape as `parse_hnsw_create_index_sql`.
1045
fn parse_fts_create_index_sql(sql: &str) -> Result<(String, String)> {
1✔
1046
    use sqlparser::ast::{CreateIndex, Expr, Statement};
1047

1048
    let dialect = SqlriteDialect::new();
1✔
1049
    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1✔
1050
    let Some(Statement::CreateIndex(CreateIndex {
2✔
1051
        table_name,
1✔
1052
        columns,
1✔
1053
        ..
1054
    })) = ast.pop()
3✔
1055
    else {
1056
        return Err(SQLRiteError::Internal(format!(
×
1057
            "sqlrite_master FTS row's SQL isn't a CREATE INDEX: {sql}"
1058
        )));
1059
    };
1060
    if columns.len() != 1 {
2✔
1061
        return Err(SQLRiteError::NotImplemented(
×
1062
            "multi-column FTS indexes aren't supported yet".to_string(),
×
1063
        ));
1064
    }
1065
    let col = match &columns[0].column.expr {
2✔
1066
        Expr::Identifier(ident) => ident.value.clone(),
2✔
1067
        Expr::CompoundIdentifier(parts) => {
×
1068
            parts.last().map(|p| p.value.clone()).unwrap_or_default()
×
1069
        }
1070
        other => {
×
1071
            return Err(SQLRiteError::Internal(format!(
×
1072
                "FTS CREATE INDEX has unexpected column expr: {other:?}"
1073
            )));
1074
        }
1075
    };
1076
    Ok((table_name.to_string(), col))
2✔
1077
}
1078

1079
/// Loads (or rebuilds) an HNSW index on database open. Two paths:
1080
///
1081
///   - **rootpage != 0** (Phase 7d.3 default): the graph is persisted
1082
///     as cell-encoded pages. Read every node directly via
1083
///     `load_hnsw_nodes` and reconstruct the index — fast, zero
1084
///     algorithm runs, exact bit-for-bit reproduction of what was saved.
1085
///
1086
///   - **rootpage == 0** (compatibility): no on-disk graph, e.g. for
1087
///     files saved by Phase 7d.2 before persistence landed. Replay the
1088
///     CREATE INDEX SQL through `execute_create_index`, which walks the
1089
///     table's current rows and populates a fresh graph. Slower but
1090
///     correctness-equivalent on the first save with the new code.
1091
fn rebuild_hnsw_index(db: &mut Database, pager: &Pager, row: &IndexCatalogRow) -> Result<()> {
1✔
1092
    use crate::sql::db::table::HnswIndexEntry;
1093
    use crate::sql::executor::execute_create_index;
1094
    use crate::sql::hnsw::HnswIndex;
1095
    use sqlparser::ast::Statement;
1096

1097
    let dialect = SqlriteDialect::new();
1✔
1098
    let mut ast = Parser::parse_sql(&dialect, &row.sql).map_err(SQLRiteError::from)?;
1✔
1099
    let Some(stmt @ Statement::CreateIndex(_)) = ast.pop() else {
3✔
1100
        return Err(SQLRiteError::Internal(format!(
×
1101
            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {}",
1102
            row.sql
1103
        )));
1104
    };
1105

1106
    if row.rootpage == 0 {
1✔
1107
        // Compatibility path — no persisted graph; walk current rows.
1108
        execute_create_index(&stmt, db)?;
×
1109
        return Ok(());
×
1110
    }
1111

1112
    // Persistence path — read the cell tree, deserialize. The metric
1113
    // travels through the synthesized CREATE INDEX SQL stored in
1114
    // `sqlrite_master`; pre-SQLR-28 rows omit the WITH clause and
1115
    // decode as L2, which matches what those graphs were built with.
1116
    let (tbl_name, col_name, metric) = parse_hnsw_create_index_sql(&row.sql)?;
2✔
1117
    let nodes = load_hnsw_nodes(pager, row.rootpage)?;
2✔
1118
    let index = HnswIndex::from_persisted_nodes(metric, 0xC0FFEE, nodes);
2✔
1119

1120
    // Parse the CREATE INDEX to know which table + column to attach to
1121
    // — same shape as the row-walk path; we just don't execute it.
1122
    let table_mut = db.get_table_mut(tbl_name.clone()).map_err(|_| {
3✔
1123
        SQLRiteError::Internal(format!(
×
1124
            "HNSW index '{}' references unknown table '{tbl_name}'",
1125
            row.name
1126
        ))
1127
    })?;
1128
    table_mut.hnsw_indexes.push(HnswIndexEntry {
2✔
1129
        name: row.name.clone(),
1✔
1130
        column_name: col_name,
1✔
1131
        metric,
1132
        index,
1✔
1133
        needs_rebuild: false,
1134
    });
1135
    Ok(())
1✔
1136
}
1137

1138
/// Phase 7d.3 — Phase-7d.3-side helper: walk every leaf in the HNSW
1139
/// page tree at `root_page` and decode each cell as a node. Returns
1140
/// the (node_id, layers) tuples in slot-order (already ascending by
1141
/// node_id since they were staged that way). The caller hands them to
1142
/// `HnswIndex::from_persisted_nodes`.
1143
fn load_hnsw_nodes(pager: &Pager, root_page: u32) -> Result<Vec<(i64, Vec<Vec<i64>>)>> {
1✔
1144
    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1145

1146
    let mut nodes: Vec<(i64, Vec<Vec<i64>>)> = Vec::new();
1✔
1147
    let first_leaf = find_leftmost_leaf(pager, root_page)?;
2✔
1148
    let mut current = first_leaf;
1✔
1149
    while current != 0 {
1✔
1150
        let page_buf = pager
1✔
1151
            .read_page(current)
1✔
1152
            .ok_or_else(|| SQLRiteError::Internal(format!("missing HNSW leaf page {current}")))?;
1✔
1153
        if page_buf[0] != PageType::TableLeaf as u8 {
1✔
1154
            return Err(SQLRiteError::Internal(format!(
×
1155
                "page {current} tagged {} but expected TableLeaf (HNSW)",
1156
                page_buf[0]
×
1157
            )));
1158
        }
1159
        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
2✔
1160
        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
2✔
1161
            .try_into()
1✔
1162
            .map_err(|_| SQLRiteError::Internal("HNSW leaf payload size".to_string()))?;
1✔
1163
        let leaf = TablePage::from_bytes(payload);
1✔
1164
        for slot in 0..leaf.slot_count() {
3✔
1165
            let offset = leaf.slot_offset_raw(slot)?;
2✔
1166
            let (cell, _) = HnswNodeCell::decode(leaf.as_bytes(), offset)?;
1✔
1167
            nodes.push((cell.node_id, cell.layers));
1✔
1168
        }
1169
        current = next_leaf;
1✔
1170
    }
1171
    Ok(nodes)
1✔
1172
}
1173

1174
/// Pulls `(table_name, column_name, metric)` out of a CREATE INDEX
1175
/// SQL string of the form `CREATE INDEX … USING hnsw (col) [WITH
1176
/// (metric = '<m>')]`. Used by the persistence path on open to know
1177
/// where to attach the loaded graph and which distance metric to
1178
/// rebuild it under. Pre-SQLR-28 rows omit the WITH clause and
1179
/// default to L2.
1180
fn parse_hnsw_create_index_sql(sql: &str) -> Result<(String, String, DistanceMetric)> {
1✔
1181
    use crate::sql::hnsw::DistanceMetric;
1182
    use sqlparser::ast::{BinaryOperator, CreateIndex, Expr, Statement, Value as AstValue};
1183

1184
    let dialect = SqlriteDialect::new();
1✔
1185
    let mut ast = Parser::parse_sql(&dialect, sql).map_err(SQLRiteError::from)?;
1✔
1186
    let Some(Statement::CreateIndex(CreateIndex {
2✔
1187
        table_name,
1✔
1188
        columns,
1✔
1189
        with,
1✔
1190
        ..
1191
    })) = ast.pop()
3✔
1192
    else {
1193
        return Err(SQLRiteError::Internal(format!(
×
1194
            "sqlrite_master HNSW row's SQL isn't a CREATE INDEX: {sql}"
1195
        )));
1196
    };
1197
    if columns.len() != 1 {
2✔
1198
        return Err(SQLRiteError::NotImplemented(
×
1199
            "multi-column HNSW indexes aren't supported yet".to_string(),
×
1200
        ));
1201
    }
1202
    let col = match &columns[0].column.expr {
2✔
1203
        Expr::Identifier(ident) => ident.value.clone(),
2✔
1204
        Expr::CompoundIdentifier(parts) => {
×
1205
            parts.last().map(|p| p.value.clone()).unwrap_or_default()
×
1206
        }
1207
        other => {
×
1208
            return Err(SQLRiteError::Internal(format!(
×
1209
                "unsupported HNSW indexed column expression: {other:?}"
1210
            )));
1211
        }
1212
    };
1213

1214
    // Pull the metric off the parsed WITH (...) bag. The user-facing
1215
    // CREATE INDEX path validates this in detail (typo'd metric names,
1216
    // unknown keys, etc.); here on the persistence read-path we trust
1217
    // what we previously wrote and surface a clean Internal error if
1218
    // it ever doesn't match.
1219
    let mut metric = DistanceMetric::L2;
1✔
1220
    for opt in &with {
2✔
1221
        if let Expr::BinaryOp { left, op, right } = opt {
2✔
1222
            if matches!(op, BinaryOperator::Eq) {
1✔
1223
                if let (Expr::Identifier(key), Expr::Value(v)) = (left.as_ref(), right.as_ref())
1✔
1224
                    && key.value.eq_ignore_ascii_case("metric")
1✔
1225
                {
1226
                    if let AstValue::SingleQuotedString(s) | AstValue::DoubleQuotedString(s) =
2✔
1227
                        &v.value
1228
                    {
1229
                        metric = DistanceMetric::from_sql_name(s).ok_or_else(|| {
1✔
1230
                            SQLRiteError::Internal(format!(
×
1231
                                "sqlrite_master HNSW row carries unknown metric '{s}'"
1232
                            ))
1233
                        })?;
1234
                    }
1235
                }
1236
            }
1237
        }
1238
    }
1239

1240
    Ok((table_name.to_string(), col, metric))
1✔
1241
}
1242

1243
/// Phase 7d.3 — rebuilds in-place any HnswIndexEntry whose
1244
/// `needs_rebuild` flag is set (DELETE / UPDATE-on-vector marked it).
1245
/// Walks the table's current Vec<f32> column storage and runs the
1246
/// HNSW algorithm fresh. Called at the top of `save_database` before
1247
/// any immutable borrows of `db` start.
1248
///
1249
/// Cost: O(N · ef_construction · log N) per dirty index. Fine for
1250
/// small tables, expensive for ≥100k-row tables — matches the
1251
/// trade-off SQLite makes for FTS5: dirtying-and-rebuilding is the
1252
/// MVP, more sophisticated incremental delete strategies (soft-delete
1253
/// + tombstones, neighbor reconnection) are future polish.
1254
fn rebuild_dirty_hnsw_indexes(db: &mut Database) -> Result<()> {
2✔
1255
    for table in db.tables.values_mut() {
4✔
1256
        table.rebuild_dirty_hnsw_indexes()?;
2✔
1257
    }
1258
    Ok(())
2✔
1259
}
1260

1261
/// Synthesises the CREATE INDEX SQL stored back into `sqlrite_master`
1262
/// for an HNSW index. The metric travels through the SQL via an
1263
/// optional `WITH (metric = '<m>')` clause; L2 indexes omit the clause
1264
/// for byte-identical round-trip with pre-SQLR-28 catalogs.
1265
fn synthesize_hnsw_create_index_sql(
1✔
1266
    index_name: &str,
1267
    table_name: &str,
1268
    column_name: &str,
1269
    metric: DistanceMetric,
1270
) -> String {
1271
    if matches!(metric, DistanceMetric::L2) {
1✔
1272
        format!("CREATE INDEX {index_name} ON {table_name} USING hnsw ({column_name})")
1✔
1273
    } else {
1274
        format!(
1✔
1275
            "CREATE INDEX {index_name} ON {table_name} USING hnsw ({column_name}) WITH (metric = '{}')",
1276
            metric.sql_name()
1✔
1277
        )
1278
    }
1279
}
1280

1281
/// Phase 8b — rebuild every FTS index a DELETE / UPDATE-on-text-col
1282
/// marked dirty. Mirrors [`rebuild_dirty_hnsw_indexes`]; runs at save
1283
/// time under `&mut Database`. Cheap on a clean DB (the `dirty` snapshot
1284
/// is empty so the per-table loop short-circuits).
1285
fn rebuild_dirty_fts_indexes(db: &mut Database) {
2✔
1286
    use crate::sql::fts::PostingList;
1287

1288
    for table in db.tables.values_mut() {
5✔
1289
        let dirty: Vec<(String, String)> = table
4✔
1290
            .fts_indexes
1291
            .iter()
1292
            .filter(|e| e.needs_rebuild)
4✔
1293
            .map(|e| (e.name.clone(), e.column_name.clone()))
4✔
1294
            .collect();
1295
        if dirty.is_empty() {
4✔
1296
            continue;
1297
        }
1298

1299
        for (idx_name, col_name) in dirty {
3✔
1300
            // Snapshot every (rowid, text) pair for this column under
1301
            // the row mutex, then drop the lock before re-tokenizing.
1302
            let mut docs: Vec<(i64, String)> = Vec::new();
1✔
1303
            {
1304
                let row_data = table.rows.lock().expect("rows mutex poisoned");
2✔
1305
                if let Some(Row::Text(map)) = row_data.get(&col_name) {
3✔
1306
                    for (id, v) in map.iter() {
1✔
1307
                        // "Null" sentinel is the parser's
1308
                        // null-marker for TEXT cells; skip those —
1309
                        // they'd round-trip as the literal string
1310
                        // "Null" otherwise. Aligns with insert_row's
1311
                        // typed_value gate.
1312
                        if v != "Null" {
1✔
1313
                            docs.push((*id, v.clone()));
1✔
1314
                        }
1315
                    }
1316
                }
1317
            }
1318

1319
            let mut new_idx = PostingList::new();
1✔
1320
            // Sort by id so the rebuild is deterministic across runs
1321
            // (the BTreeMap inside PostingList is order-stable, but
1322
            // doc-length aggregation order doesn't matter — sorting
1323
            // here is purely for reproducibility on inspection).
1324
            docs.sort_by_key(|(id, _)| *id);
4✔
1325
            for (id, text) in &docs {
1✔
1326
                new_idx.insert(*id, text);
2✔
1327
            }
1328

1329
            if let Some(entry) = table.fts_indexes.iter_mut().find(|e| e.name == idx_name) {
4✔
1330
                entry.index = new_idx;
1✔
1331
                entry.needs_rebuild = false;
1✔
1332
            }
1333
        }
1334
    }
1335
}
1336

1337
/// Cheap clone helper — `DataType` doesn't derive `Clone` elsewhere.
1338
fn clone_datatype(dt: &DataType) -> DataType {
2✔
1339
    match dt {
2✔
1340
        DataType::Integer => DataType::Integer,
2✔
1341
        DataType::Text => DataType::Text,
1✔
UNCOV
1342
        DataType::Real => DataType::Real,
×
UNCOV
1343
        DataType::Bool => DataType::Bool,
×
UNCOV
1344
        DataType::Vector(dim) => DataType::Vector(*dim),
×
UNCOV
1345
        DataType::Json => DataType::Json,
×
UNCOV
1346
        DataType::None => DataType::None,
×
UNCOV
1347
        DataType::Invalid => DataType::Invalid,
×
1348
    }
1349
}
1350

1351
/// Stages an index's B-Tree at `start_page`. Each leaf cell is a
1352
/// `KIND_INDEX` entry carrying `(original_rowid, value)`. Returns
1353
/// `(root_page, next_free_page)`.
1354
///
1355
/// The tree's shape matches a regular table's — leaves chained via
1356
/// `next_page`, optional interior layer above. `Cell::peek_rowid` works
1357
/// uniformly for index cells (same prefix as local cells), so the
1358
/// existing slot directory and binary search carry over.
1359
fn stage_index_btree(
2✔
1360
    pager: &mut Pager,
1361
    idx: &SecondaryIndex,
1362
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1363
) -> Result<u32> {
1364
    // Build the leaves.
1365
    let leaves = stage_index_leaves(pager, idx, alloc)?;
2✔
1366
    if leaves.len() == 1 {
4✔
1367
        return Ok(leaves[0].0);
4✔
1368
    }
1369
    let mut level: Vec<(u32, i64)> = leaves;
1✔
1370
    while level.len() > 1 {
4✔
1371
        level = stage_interior_level(pager, &level, alloc)?;
2✔
1372
    }
1373
    Ok(level[0].0)
2✔
1374
}
1375

1376
/// Packs the index's (value, rowid) entries into a sibling-chained run
1377
/// of `TableLeaf` pages. Iteration order matches `SecondaryIndex::iter_entries`
1378
/// (ascending value; rowids in insertion order within a value), which is
1379
/// also ascending by the "cell rowid" carried in each IndexCell (the
1380
/// original row's rowid) — so Cell::peek_rowid + the slot directory's
1381
/// rowid ordering stays consistent.
1382
fn stage_index_leaves(
2✔
1383
    pager: &mut Pager,
1384
    idx: &SecondaryIndex,
1385
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1386
) -> Result<Vec<(u32, i64)>> {
1387
    let mut leaves: Vec<(u32, i64)> = Vec::new();
2✔
1388
    let mut current_leaf = TablePage::empty();
4✔
1389
    let mut current_leaf_page = alloc.allocate();
4✔
1390
    let mut current_max_rowid: Option<i64> = None;
2✔
1391

1392
    // Sort the entries by original rowid so the in-page slot directory,
1393
    // which binary-searches by rowid, stays valid. (iter_entries orders by
1394
    // value; we reorder here for B-Tree correctness.)
1395
    let mut entries: Vec<(Value, i64)> = idx.iter_entries().collect();
2✔
1396
    entries.sort_by_key(|(_, r)| *r);
6✔
1397

1398
    for (value, rowid) in entries {
4✔
1399
        let cell = IndexCell::new(rowid, value);
2✔
1400
        let entry_bytes = cell.encode()?;
4✔
1401

1402
        if !current_leaf.would_fit(entry_bytes.len()) {
4✔
1403
            let next_leaf_page_num = alloc.allocate();
2✔
1404
            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1✔
1405
            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1✔
1406
            current_leaf = TablePage::empty();
1✔
1407
            current_leaf_page = next_leaf_page_num;
1✔
1408

1409
            if !current_leaf.would_fit(entry_bytes.len()) {
1✔
UNCOV
1410
                return Err(SQLRiteError::Internal(format!(
×
1411
                    "index entry of {} bytes exceeds empty-page capacity {}",
UNCOV
1412
                    entry_bytes.len(),
×
UNCOV
1413
                    current_leaf.free_space()
×
1414
                )));
1415
            }
1416
        }
1417
        current_leaf.insert_entry(rowid, &entry_bytes)?;
4✔
1418
        current_max_rowid = Some(rowid);
2✔
1419
    }
1420

1421
    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
2✔
1422
    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
2✔
1423
    Ok(leaves)
2✔
1424
}
1425

1426
/// Phase 7d.3 — stages an HNSW index's page tree at `start_page`.
1427
/// Each leaf cell is a `KIND_HNSW` entry carrying one node's
1428
/// (node_id, layers). Returns `(root_page, next_free_page)`.
1429
///
1430
/// Tree shape is identical to `stage_index_btree` — chained leaves +
1431
/// optional interior layers. The slot directory binary-searches by
1432
/// node_id (which is the cell's "rowid" in `Cell::peek_rowid` terms),
1433
/// so reads can locate any node in O(log N) once 7d.4-or-later
1434
/// optimizes the load path to lazy-fetch instead of read-all.
1435
/// Today, `load_hnsw_nodes` reads the entire tree on open.
1436
fn stage_hnsw_btree(
1✔
1437
    pager: &mut Pager,
1438
    idx: &crate::sql::hnsw::HnswIndex,
1439
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1440
) -> Result<u32> {
1441
    let leaves = stage_hnsw_leaves(pager, idx, alloc)?;
1✔
1442
    if leaves.len() == 1 {
2✔
1443
        return Ok(leaves[0].0);
2✔
1444
    }
UNCOV
1445
    let mut level: Vec<(u32, i64)> = leaves;
×
UNCOV
1446
    while level.len() > 1 {
×
UNCOV
1447
        level = stage_interior_level(pager, &level, alloc)?;
×
1448
    }
UNCOV
1449
    Ok(level[0].0)
×
1450
}
1451

1452
/// Phase 8c — stage one FTS index as a `TableLeaf`-shaped B-Tree.
1453
/// Mirrors `stage_hnsw_btree` (sibling-chained leaves, optional interior
1454
/// levels). Returns `(root_page, next_free_page)`. Each leaf is filled
1455
/// with `KIND_FTS_POSTING` cells: one sidecar cell holding the
1456
/// doc-lengths map, then one cell per term in lexicographic order.
1457
fn stage_fts_btree(
1✔
1458
    pager: &mut Pager,
1459
    idx: &crate::sql::fts::PostingList,
1460
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1461
) -> Result<u32> {
1462
    let leaves = stage_fts_leaves(pager, idx, alloc)?;
1✔
1463
    if leaves.len() == 1 {
2✔
1464
        return Ok(leaves[0].0);
2✔
1465
    }
1466
    let mut level: Vec<(u32, i64)> = leaves;
1✔
1467
    while level.len() > 1 {
4✔
1468
        level = stage_interior_level(pager, &level, alloc)?;
2✔
1469
    }
1470
    Ok(level[0].0)
2✔
1471
}
1472

1473
/// Packs FTS posting cells into a sibling-chained run of `TableLeaf`
1474
/// pages. Cell layout: a single doc-lengths sidecar at `cell_id = 1`,
1475
/// followed by one cell per term in lexicographic order with
1476
/// `cell_id = 2..=N + 1`. Sequential ids keep the slot directory's
1477
/// rowid ordering valid (the `cell_id` field is what `peek_rowid`
1478
/// returns).
1479
fn stage_fts_leaves(
1✔
1480
    pager: &mut Pager,
1481
    idx: &crate::sql::fts::PostingList,
1482
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1483
) -> Result<Vec<(u32, i64)>> {
1484
    use crate::sql::pager::fts_cell::FtsPostingCell;
1485

1486
    let mut leaves: Vec<(u32, i64)> = Vec::new();
1✔
1487
    let mut current_leaf = TablePage::empty();
2✔
1488
    let mut current_leaf_page = alloc.allocate();
2✔
1489
    let mut current_max_rowid: Option<i64> = None;
1✔
1490

1491
    // Build the cell sequence: sidecar first, then per-term cells. The
1492
    // sidecar always exists (even on an empty index) so reload sees a
1493
    // canonical "this index was persisted" marker in slot 0.
1494
    let mut cell_id: i64 = 1;
1✔
1495
    let mut cells: Vec<FtsPostingCell> = Vec::new();
1✔
1496
    cells.push(FtsPostingCell::doc_lengths(
2✔
1497
        cell_id,
1✔
1498
        idx.serialize_doc_lengths(),
1✔
1499
    ));
1500
    for (term, entries) in idx.serialize_postings() {
3✔
1501
        cell_id += 1;
2✔
1502
        cells.push(FtsPostingCell::posting(cell_id, term, entries));
2✔
1503
    }
1504

1505
    for cell in cells {
2✔
1506
        let entry_bytes = cell.encode()?;
2✔
1507

1508
        if !current_leaf.would_fit(entry_bytes.len()) {
2✔
1509
            let next_leaf_page_num = alloc.allocate();
2✔
1510
            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1✔
1511
            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1✔
1512
            current_leaf = TablePage::empty();
1✔
1513
            current_leaf_page = next_leaf_page_num;
1✔
1514

1515
            if !current_leaf.would_fit(entry_bytes.len()) {
1✔
1516
                // A single posting cell exceeds page capacity. Phase
1517
                // 8c MVP doesn't chain via overflow cells (the plan
1518
                // notes this as a stretch goal); surface a clear
1519
                // error so users know which term tripped it.
1520
                return Err(SQLRiteError::Internal(format!(
×
1521
                    "FTS posting cell {} of {} bytes exceeds empty-page capacity {} \
1522
                     (term too long or too many postings; overflow chaining is Phase 8.1)",
1523
                    cell.cell_id,
UNCOV
1524
                    entry_bytes.len(),
×
UNCOV
1525
                    current_leaf.free_space()
×
1526
                )));
1527
            }
1528
        }
1529
        current_leaf.insert_entry(cell.cell_id, &entry_bytes)?;
2✔
1530
        current_max_rowid = Some(cell.cell_id);
1✔
1531
    }
1532

1533
    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1✔
1534
    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1✔
1535
    Ok(leaves)
1✔
1536
}
1537

1538
/// (rowid, value) pairs as decoded from a single FTS cell — value is
1539
/// either term frequency (posting cell) or doc length (sidecar cell).
1540
type FtsEntries = Vec<(i64, u32)>;
1541
/// (term, posting list) pairs as decoded from non-sidecar FTS cells.
1542
type FtsPostings = Vec<(String, FtsEntries)>;
1543

1544
/// Phase 8c — read every cell of an FTS index from `root_page` back
1545
/// into the `(doc_lengths, postings)` shape `PostingList::from_persisted_postings`
1546
/// expects. Mirrors `load_hnsw_nodes`: leftmost-leaf descent, walk the
1547
/// sibling chain, decode each slot.
1548
fn load_fts_postings(pager: &Pager, root_page: u32) -> Result<(FtsEntries, FtsPostings)> {
1✔
1549
    use crate::sql::pager::fts_cell::FtsPostingCell;
1550

1551
    let mut doc_lengths: Vec<(i64, u32)> = Vec::new();
1✔
1552
    let mut postings: Vec<(String, Vec<(i64, u32)>)> = Vec::new();
1✔
1553
    let mut saw_sidecar = false;
1✔
1554

1555
    let first_leaf = find_leftmost_leaf(pager, root_page)?;
2✔
1556
    let mut current = first_leaf;
1✔
1557
    while current != 0 {
1✔
1558
        let page_buf = pager
1✔
1559
            .read_page(current)
1✔
1560
            .ok_or_else(|| SQLRiteError::Internal(format!("missing FTS leaf page {current}")))?;
1✔
1561
        if page_buf[0] != PageType::TableLeaf as u8 {
1✔
UNCOV
1562
            return Err(SQLRiteError::Internal(format!(
×
1563
                "page {current} tagged {} but expected TableLeaf (FTS)",
UNCOV
1564
                page_buf[0]
×
1565
            )));
1566
        }
1567
        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
2✔
1568
        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
2✔
1569
            .try_into()
1✔
1570
            .map_err(|_| SQLRiteError::Internal("FTS leaf payload size".to_string()))?;
1✔
1571
        let leaf = TablePage::from_bytes(payload);
1✔
1572
        for slot in 0..leaf.slot_count() {
3✔
1573
            let offset = leaf.slot_offset_raw(slot)?;
2✔
1574
            let (cell, _) = FtsPostingCell::decode(leaf.as_bytes(), offset)?;
1✔
1575
            if cell.is_doc_lengths() {
3✔
1576
                if saw_sidecar {
1✔
UNCOV
1577
                    return Err(SQLRiteError::Internal(
×
UNCOV
1578
                        "FTS index has more than one doc-lengths sidecar cell".to_string(),
×
1579
                    ));
1580
                }
1581
                saw_sidecar = true;
1✔
1582
                doc_lengths = cell.entries;
1✔
1583
            } else {
1584
                postings.push((cell.term, cell.entries));
2✔
1585
            }
1586
        }
1587
        current = next_leaf;
1✔
1588
    }
1589

1590
    if !saw_sidecar {
1✔
UNCOV
1591
        return Err(SQLRiteError::Internal(
×
UNCOV
1592
            "FTS index missing doc-lengths sidecar cell — corrupt or truncated tree".to_string(),
×
1593
        ));
1594
    }
1595
    Ok((doc_lengths, postings))
1✔
1596
}
1597

1598
/// Packs HNSW nodes into a sibling-chained run of `TableLeaf` pages.
1599
/// `serialize_nodes` already returns nodes in ascending node_id order,
1600
/// so the slot directory's rowid ordering stays valid.
1601
fn stage_hnsw_leaves(
1✔
1602
    pager: &mut Pager,
1603
    idx: &crate::sql::hnsw::HnswIndex,
1604
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1605
) -> Result<Vec<(u32, i64)>> {
1606
    use crate::sql::pager::hnsw_cell::HnswNodeCell;
1607

1608
    let mut leaves: Vec<(u32, i64)> = Vec::new();
1✔
1609
    let mut current_leaf = TablePage::empty();
2✔
1610
    let mut current_leaf_page = alloc.allocate();
2✔
1611
    let mut current_max_rowid: Option<i64> = None;
1✔
1612

1613
    let serialized = idx.serialize_nodes();
1✔
1614

1615
    // Empty index → emit a single empty leaf page so the rootpage
1616
    // pointer in sqlrite_master stays nonzero (== "graph is persisted,
1617
    // it just happens to be empty"). load_hnsw_nodes is fine with an
1618
    // empty leaf — slot_count() returns 0.
1619
    for (node_id, layers) in serialized {
2✔
1620
        let cell = HnswNodeCell::new(node_id, layers);
1✔
1621
        let entry_bytes = cell.encode()?;
2✔
1622

1623
        if !current_leaf.would_fit(entry_bytes.len()) {
2✔
UNCOV
1624
            let next_leaf_page_num = alloc.allocate();
×
UNCOV
1625
            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
×
UNCOV
1626
            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
×
UNCOV
1627
            current_leaf = TablePage::empty();
×
UNCOV
1628
            current_leaf_page = next_leaf_page_num;
×
1629

UNCOV
1630
            if !current_leaf.would_fit(entry_bytes.len()) {
×
UNCOV
1631
                return Err(SQLRiteError::Internal(format!(
×
1632
                    "HNSW node {node_id} cell of {} bytes exceeds empty-page capacity {}",
UNCOV
1633
                    entry_bytes.len(),
×
UNCOV
1634
                    current_leaf.free_space()
×
1635
                )));
1636
            }
1637
        }
1638
        current_leaf.insert_entry(node_id, &entry_bytes)?;
2✔
1639
        current_max_rowid = Some(node_id);
1✔
1640
    }
1641

1642
    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
1✔
1643
    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1✔
1644
    Ok(leaves)
1✔
1645
}
1646

1647
fn load_table_rows(pager: &Pager, table: &mut Table, root_page: u32) -> Result<()> {
2✔
1648
    let first_leaf = find_leftmost_leaf(pager, root_page)?;
2✔
1649
    let mut current = first_leaf;
2✔
1650
    while current != 0 {
2✔
1651
        let page_buf = pager
2✔
1652
            .read_page(current)
2✔
1653
            .ok_or_else(|| SQLRiteError::Internal(format!("missing leaf page {current}")))?;
2✔
1654
        if page_buf[0] != PageType::TableLeaf as u8 {
2✔
UNCOV
1655
            return Err(SQLRiteError::Internal(format!(
×
1656
                "page {current} tagged {} but expected TableLeaf",
1657
                page_buf[0]
1658
            )));
1659
        }
1660
        let next_leaf = u32::from_le_bytes(page_buf[1..5].try_into().unwrap());
2✔
1661
        let payload: &[u8; PAYLOAD_PER_PAGE] = (&page_buf[PAGE_HEADER_SIZE..])
4✔
1662
            .try_into()
2✔
1663
            .map_err(|_| SQLRiteError::Internal("leaf payload slice size".to_string()))?;
2✔
1664
        let leaf = TablePage::from_bytes(payload);
2✔
1665

1666
        for slot in 0..leaf.slot_count() {
6✔
1667
            let entry = leaf.entry_at(slot)?;
4✔
1668
            let cell = match entry {
2✔
1669
                PagedEntry::Local(c) => c,
2✔
1670
                PagedEntry::Overflow(r) => {
1✔
1671
                    let body_bytes =
2✔
1672
                        read_overflow_chain(pager, r.first_overflow_page, r.total_body_len)?;
1673
                    let (c, _) = Cell::decode(&body_bytes, 0)?;
2✔
1674
                    c
1✔
1675
                }
1676
            };
1677
            table.restore_row(cell.rowid, cell.values)?;
4✔
1678
        }
1679
        current = next_leaf;
2✔
1680
    }
1681
    Ok(())
2✔
1682
}
1683

1684
/// Walks every page reachable from `root_page` and returns their page
1685
/// numbers. Includes `root_page`, every interior page, every leaf, and
1686
/// — when `follow_overflow` is true — every overflow page chained off
1687
/// table-leaf cells. Used by `save_database` to seed each table's
1688
/// per-table preferred pool and to compute the newly-freed set.
1689
///
1690
/// `follow_overflow = true` for table B-Trees (cells may carry
1691
/// `OverflowRef`s pointing at chained overflow pages); `false` for
1692
/// secondary-index, HNSW, and FTS B-Trees, which never overflow in the
1693
/// current encoding.
1694
fn collect_pages_for_btree(
2✔
1695
    pager: &Pager,
1696
    root_page: u32,
1697
    follow_overflow: bool,
1698
) -> Result<Vec<u32>> {
1699
    if root_page == 0 {
2✔
UNCOV
1700
        return Ok(Vec::new());
×
1701
    }
1702
    let mut pages: Vec<u32> = Vec::new();
2✔
1703
    let mut stack: Vec<u32> = vec![root_page];
4✔
1704

1705
    while let Some(p) = stack.pop() {
4✔
1706
        let buf = pager.read_page(p).ok_or_else(|| {
4✔
1707
            SQLRiteError::Internal(format!(
×
1708
                "collect_pages: missing page {p} (rooted at {root_page})"
1709
            ))
1710
        })?;
1711
        pages.push(p);
2✔
1712
        match buf[0] {
2✔
1713
            t if t == PageType::InteriorNode as u8 => {
3✔
1714
                let payload: &[u8; PAYLOAD_PER_PAGE] =
2✔
1715
                    (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
UNCOV
1716
                        SQLRiteError::Internal("interior payload slice size".to_string())
×
1717
                    })?;
1718
                let interior = InteriorPage::from_bytes(payload);
1✔
1719
                // Push every divider's child + the rightmost child.
1720
                for slot in 0..interior.slot_count() {
2✔
1721
                    let cell = interior.cell_at(slot)?;
2✔
1722
                    stack.push(cell.child_page);
1✔
1723
                }
1724
                stack.push(interior.rightmost_child());
1✔
1725
            }
1726
            t if t == PageType::TableLeaf as u8 => {
6✔
1727
                if follow_overflow {
2✔
1728
                    let payload: &[u8; PAYLOAD_PER_PAGE] =
2✔
1729
                        (&buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
UNCOV
1730
                            SQLRiteError::Internal("leaf payload slice size".to_string())
×
1731
                        })?;
1732
                    let leaf = TablePage::from_bytes(payload);
2✔
1733
                    for slot in 0..leaf.slot_count() {
4✔
1734
                        match leaf.entry_at(slot)? {
4✔
1735
                            PagedEntry::Local(_) => {}
1736
                            PagedEntry::Overflow(r) => {
×
UNCOV
1737
                                let mut cur = r.first_overflow_page;
×
UNCOV
1738
                                while cur != 0 {
×
UNCOV
1739
                                    pages.push(cur);
×
UNCOV
1740
                                    let ob = pager.read_page(cur).ok_or_else(|| {
×
UNCOV
1741
                                        SQLRiteError::Internal(format!(
×
1742
                                            "collect_pages: missing overflow page {cur}"
1743
                                        ))
1744
                                    })?;
UNCOV
1745
                                    if ob[0] != PageType::Overflow as u8 {
×
UNCOV
1746
                                        return Err(SQLRiteError::Internal(format!(
×
1747
                                            "collect_pages: page {cur} expected Overflow, got tag {}",
UNCOV
1748
                                            ob[0]
×
1749
                                        )));
1750
                                    }
UNCOV
1751
                                    cur = u32::from_le_bytes(ob[1..5].try_into().unwrap());
×
1752
                                }
1753
                            }
1754
                        }
1755
                    }
1756
                }
1757
            }
UNCOV
1758
            other => {
×
UNCOV
1759
                return Err(SQLRiteError::Internal(format!(
×
1760
                    "collect_pages: unexpected page type {other} at page {p}"
1761
                )));
1762
            }
1763
        }
1764
    }
1765
    Ok(pages)
2✔
1766
}
1767

1768
/// Reads the previously-persisted `sqlrite_master` and returns a map from
1769
/// `(kind, name)` to that object's rootpage. Used by `save_database` to
1770
/// seed each table/index's per-table preferred pool with the pages it
1771
/// occupied last time round.
1772
///
1773
/// `kind` is `"table"` or `"index"` (the catalog already disambiguates
1774
/// the three index families via the SQL string, but for page-collection
1775
/// purposes a "table" tree must follow overflow refs while an "index"
1776
/// tree never does — that's the only distinction we need here).
1777
fn read_old_rootpages(pager: &Pager, schema_root: u32) -> Result<HashMap<(String, String), u32>> {
2✔
1778
    let mut out: HashMap<(String, String), u32> = HashMap::new();
2✔
1779
    if schema_root == 0 {
2✔
UNCOV
1780
        return Ok(out);
×
1781
    }
1782
    let mut master = build_empty_master_table();
2✔
1783
    load_table_rows(pager, &mut master, schema_root)?;
4✔
1784
    for rowid in master.rowids() {
6✔
1785
        let kind = take_text(&master, "type", rowid)?;
4✔
1786
        let name = take_text(&master, "name", rowid)?;
4✔
1787
        let rootpage = take_integer(&master, "rootpage", rowid)? as u32;
4✔
1788
        out.insert((kind, name), rootpage);
2✔
1789
    }
1790
    Ok(out)
2✔
1791
}
1792

1793
/// Descends from `root_page` through `InteriorNode` pages, always taking
1794
/// the leftmost child, until a `TableLeaf` is reached. Returns that leaf's
1795
/// page number. A root that's already a leaf is returned as-is.
1796
fn find_leftmost_leaf(pager: &Pager, root_page: u32) -> Result<u32> {
2✔
1797
    let mut current = root_page;
2✔
1798
    loop {
1799
        let page_buf = pager.read_page(current).ok_or_else(|| {
2✔
UNCOV
1800
            SQLRiteError::Internal(format!("missing page {current} during tree descent"))
×
1801
        })?;
1802
        match page_buf[0] {
1803
            t if t == PageType::TableLeaf as u8 => return Ok(current),
4✔
1804
            t if t == PageType::InteriorNode as u8 => {
2✔
1805
                let payload: &[u8; PAYLOAD_PER_PAGE] =
1✔
1806
                    (&page_buf[PAGE_HEADER_SIZE..]).try_into().map_err(|_| {
UNCOV
1807
                        SQLRiteError::Internal("interior payload slice size".to_string())
×
1808
                    })?;
1809
                let interior = InteriorPage::from_bytes(payload);
1✔
1810
                current = interior.leftmost_child()?;
2✔
1811
            }
UNCOV
1812
            other => {
×
UNCOV
1813
                return Err(SQLRiteError::Internal(format!(
×
1814
                    "unexpected page type {other} during tree descent at page {current}"
1815
                )));
1816
            }
1817
        }
1818
    }
1819
}
1820

1821
/// Stages a table's B-Tree, drawing every page number from `alloc`.
1822
/// Returns the root page (the topmost interior page, or the single leaf
1823
/// when the table fits in one page).
1824
///
1825
/// Builds bottom-up: pack rows into `TableLeaf` pages chained via
1826
/// `next_page`, then if more than one leaf, recursively wrap them in
1827
/// `InteriorNode` levels until one root remains.
1828
///
1829
/// Deterministic: same rows + same allocator handouts → byte-identical
1830
/// pages at the same numbers, so the diff pager skips unchanged tables.
1831
fn stage_table_btree(
2✔
1832
    pager: &mut Pager,
1833
    table: &Table,
1834
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1835
) -> Result<u32> {
1836
    let leaves = stage_leaves(pager, table, alloc)?;
2✔
1837
    if leaves.len() == 1 {
4✔
1838
        return Ok(leaves[0].0);
4✔
1839
    }
1840
    let mut level: Vec<(u32, i64)> = leaves;
1✔
1841
    while level.len() > 1 {
4✔
1842
        level = stage_interior_level(pager, &level, alloc)?;
2✔
1843
    }
1844
    Ok(level[0].0)
2✔
1845
}
1846

1847
/// Packs the table's rows into a sibling-linked chain of `TableLeaf` pages.
1848
/// Returns each leaf's `(page_number, max_rowid)` for use by the next
1849
/// interior level. Allocates leaf and overflow pages from `alloc`.
1850
fn stage_leaves(
2✔
1851
    pager: &mut Pager,
1852
    table: &Table,
1853
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1854
) -> Result<Vec<(u32, i64)>> {
1855
    let mut leaves: Vec<(u32, i64)> = Vec::new();
2✔
1856
    let mut current_leaf = TablePage::empty();
4✔
1857
    let mut current_leaf_page = alloc.allocate();
4✔
1858
    let mut current_max_rowid: Option<i64> = None;
2✔
1859

1860
    for rowid in table.rowids() {
4✔
1861
        let entry_bytes = build_row_entry(pager, table, rowid, alloc)?;
4✔
1862

1863
        if !current_leaf.would_fit(entry_bytes.len()) {
4✔
1864
            // The new leaf goes at whatever the allocator hands out
1865
            // next. Commit the current leaf with that as its sibling
1866
            // pointer.
1867
            let next_leaf_page_num = alloc.allocate();
2✔
1868
            emit_leaf(pager, current_leaf_page, &current_leaf, next_leaf_page_num);
1✔
1869
            leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
1✔
1870
            current_leaf = TablePage::empty();
1✔
1871
            current_leaf_page = next_leaf_page_num;
1✔
1872
            // current_max_rowid is reassigned by the insert below; no need
1873
            // to zero it out here.
1874

1875
            if !current_leaf.would_fit(entry_bytes.len()) {
1✔
UNCOV
1876
                return Err(SQLRiteError::Internal(format!(
×
1877
                    "entry of {} bytes exceeds empty-page capacity {}",
UNCOV
1878
                    entry_bytes.len(),
×
UNCOV
1879
                    current_leaf.free_space()
×
1880
                )));
1881
            }
1882
        }
1883
        current_leaf.insert_entry(rowid, &entry_bytes)?;
4✔
1884
        current_max_rowid = Some(rowid);
2✔
1885
    }
1886

1887
    // Final leaf: sibling next_page = 0 (end of chain).
1888
    emit_leaf(pager, current_leaf_page, &current_leaf, 0);
2✔
1889
    leaves.push((current_leaf_page, current_max_rowid.unwrap_or(i64::MIN)));
2✔
1890
    Ok(leaves)
2✔
1891
}
1892

1893
/// Encodes a single row's on-leaf entry — either the local cell bytes, or
1894
/// an `OverflowRef` pointing at a freshly-allocated overflow chain if the
1895
/// encoded cell exceeded the inline threshold. Allocates any overflow
1896
/// pages from `alloc`.
1897
fn build_row_entry(
2✔
1898
    pager: &mut Pager,
1899
    table: &Table,
1900
    rowid: i64,
1901
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1902
) -> Result<Vec<u8>> {
1903
    let values = table.extract_row(rowid);
2✔
1904
    let local_cell = Cell::new(rowid, values);
2✔
1905
    let local_bytes = local_cell.encode()?;
4✔
1906
    if local_bytes.len() > OVERFLOW_THRESHOLD {
7✔
1907
        let overflow_start = write_overflow_chain(pager, &local_bytes, alloc)?;
2✔
1908
        Ok(OverflowRef {
2✔
1909
            rowid,
1910
            total_body_len: local_bytes.len() as u64,
1✔
1911
            first_overflow_page: overflow_start,
1912
        }
1913
        .encode())
1✔
1914
    } else {
1915
        Ok(local_bytes)
2✔
1916
    }
1917
}
1918

1919
/// Builds one level of `InteriorNode` pages above the given children.
1920
/// Each interior packs as many dividers as will fit; the last child
1921
/// assigned to an interior becomes its `rightmost_child`. Returns the
1922
/// emitted interior pages as `(page_number, max_rowid_in_subtree)`.
1923
fn stage_interior_level(
1✔
1924
    pager: &mut Pager,
1925
    children: &[(u32, i64)],
1926
    alloc: &mut crate::sql::pager::allocator::PageAllocator,
1927
) -> Result<Vec<(u32, i64)>> {
1928
    let mut next_level: Vec<(u32, i64)> = Vec::new();
1✔
1929
    let mut idx = 0usize;
1✔
1930

1931
    while idx < children.len() {
1✔
1932
        let interior_page_num = alloc.allocate();
2✔
1933

1934
        // Seed the interior with the first unassigned child as its
1935
        // rightmost. As we add more children, the previous rightmost
1936
        // graduates to being a divider and the new arrival takes over
1937
        // as rightmost.
1938
        let (mut rightmost_child_page, mut rightmost_child_max) = children[idx];
1✔
1939
        idx += 1;
2✔
1940
        let mut interior = InteriorPage::empty(rightmost_child_page);
2✔
1941

1942
        while idx < children.len() {
1✔
1943
            let new_divider_cell = InteriorCell {
1944
                divider_rowid: rightmost_child_max,
1945
                child_page: rightmost_child_page,
1946
            };
1947
            let new_divider_bytes = new_divider_cell.encode();
1✔
1948
            if !interior.would_fit(new_divider_bytes.len()) {
2✔
1949
                break;
1950
            }
1951
            interior.insert_divider(rightmost_child_max, rightmost_child_page)?;
2✔
1952
            let (next_child_page, next_child_max) = children[idx];
1✔
1953
            interior.set_rightmost_child(next_child_page);
1✔
1954
            rightmost_child_page = next_child_page;
1✔
1955
            rightmost_child_max = next_child_max;
1✔
1956
            idx += 1;
1✔
1957
        }
1958

1959
        emit_interior(pager, interior_page_num, &interior);
1✔
1960
        next_level.push((interior_page_num, rightmost_child_max));
1✔
1961
    }
1962

1963
    Ok(next_level)
1✔
1964
}
1965

1966
/// Wraps a `TablePage` in the 7-byte page header and hands it to the pager.
1967
fn emit_leaf(pager: &mut Pager, page_num: u32, leaf: &TablePage, next_leaf: u32) {
2✔
1968
    let mut buf = [0u8; PAGE_SIZE];
2✔
1969
    buf[0] = PageType::TableLeaf as u8;
2✔
1970
    buf[1..5].copy_from_slice(&next_leaf.to_le_bytes());
2✔
1971
    // For leaf pages the legacy `payload_len` field isn't used — the slot
1972
    // directory self-describes. Zero it by convention.
1973
    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
2✔
1974
    buf[PAGE_HEADER_SIZE..].copy_from_slice(leaf.as_bytes());
2✔
1975
    pager.stage_page(page_num, buf);
2✔
1976
}
1977

1978
/// Wraps an `InteriorPage` in the 7-byte page header. Interior pages
1979
/// don't use `next_page` (there's no sibling chain between interiors);
1980
/// `payload_len` is also unused (the slot directory self-describes).
1981
fn emit_interior(pager: &mut Pager, page_num: u32, interior: &InteriorPage) {
1✔
1982
    let mut buf = [0u8; PAGE_SIZE];
1✔
1983
    buf[0] = PageType::InteriorNode as u8;
1✔
1984
    buf[1..5].copy_from_slice(&0u32.to_le_bytes());
1✔
1985
    buf[5..7].copy_from_slice(&0u16.to_le_bytes());
1✔
1986
    buf[PAGE_HEADER_SIZE..].copy_from_slice(interior.as_bytes());
1✔
1987
    pager.stage_page(page_num, buf);
1✔
1988
}
1989

1990
#[cfg(test)]
1991
mod tests {
1992
    use super::*;
1993
    use crate::sql::pager::freelist::MIN_PAGES_FOR_AUTO_VACUUM;
1994
    use crate::sql::process_command;
1995

1996
    fn seed_db() -> Database {
1✔
1997
        let mut db = Database::new("test".to_string());
1✔
1998
        process_command(
1999
            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, age INTEGER);",
2000
            &mut db,
2001
        )
2002
        .unwrap();
2003
        process_command(
2004
            "INSERT INTO users (name, age) VALUES ('alice', 30);",
2005
            &mut db,
2006
        )
2007
        .unwrap();
2008
        process_command("INSERT INTO users (name, age) VALUES ('bob', 25);", &mut db).unwrap();
1✔
2009
        process_command(
2010
            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
2011
            &mut db,
2012
        )
2013
        .unwrap();
2014
        process_command("INSERT INTO notes (body) VALUES ('hello');", &mut db).unwrap();
1✔
2015
        db
1✔
2016
    }
2017

2018
    fn tmp_path(name: &str) -> std::path::PathBuf {
1✔
2019
        let mut p = std::env::temp_dir();
1✔
2020
        let pid = std::process::id();
2✔
2021
        let nanos = std::time::SystemTime::now()
2✔
2022
            .duration_since(std::time::UNIX_EPOCH)
1✔
2023
            .map(|d| d.as_nanos())
3✔
2024
            .unwrap_or(0);
2025
        p.push(format!("sqlrite-{pid}-{nanos}-{name}.sqlrite"));
1✔
2026
        p
1✔
2027
    }
2028

2029
    /// Phase 4c: every .sqlrite has a `-wal` sidecar now. Delete both so
2030
    /// `/tmp` doesn't accumulate orphan WALs across test runs.
2031
    fn cleanup(path: &std::path::Path) {
1✔
2032
        let _ = std::fs::remove_file(path);
1✔
2033
        let mut wal = path.as_os_str().to_owned();
1✔
2034
        wal.push("-wal");
1✔
2035
        let _ = std::fs::remove_file(std::path::PathBuf::from(wal));
1✔
2036
    }
2037

2038
    #[test]
2039
    fn round_trip_preserves_schema_and_data() {
3✔
2040
        let path = tmp_path("roundtrip");
1✔
2041
        let mut db = seed_db();
1✔
2042
        save_database(&mut db, &path).expect("save");
2✔
2043

2044
        let loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2045
        assert_eq!(loaded.tables.len(), 2);
2✔
2046

2047
        let users = loaded.get_table("users".to_string()).expect("users table");
1✔
2048
        assert_eq!(users.columns.len(), 3);
1✔
2049
        let rowids = users.rowids();
1✔
2050
        assert_eq!(rowids.len(), 2);
2✔
2051
        let names: Vec<String> = rowids
1✔
2052
            .iter()
2053
            .filter_map(|r| match users.get_value("name", *r) {
3✔
2054
                Some(Value::Text(s)) => Some(s),
1✔
UNCOV
2055
                _ => None,
×
2056
            })
2057
            .collect();
2058
        assert!(names.contains(&"alice".to_string()));
2✔
2059
        assert!(names.contains(&"bob".to_string()));
1✔
2060

2061
        let notes = loaded.get_table("notes".to_string()).expect("notes table");
1✔
2062
        assert_eq!(notes.rowids().len(), 1);
1✔
2063

2064
        cleanup(&path);
1✔
2065
    }
2066

2067
    // -----------------------------------------------------------------
2068
    // Phase 7a — VECTOR(N) save / reopen round-trip
2069
    // -----------------------------------------------------------------
2070

2071
    #[test]
2072
    fn round_trip_preserves_vector_column() {
3✔
2073
        let path = tmp_path("vec_roundtrip");
1✔
2074

2075
        // Build, populate, save.
2076
        {
2077
            let mut db = Database::new("test".to_string());
2✔
2078
            process_command(
2079
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, embedding VECTOR(3));",
2080
                &mut db,
2081
            )
2082
            .unwrap();
2083
            process_command(
2084
                "INSERT INTO docs (embedding) VALUES ([0.1, 0.2, 0.3]);",
2085
                &mut db,
2086
            )
2087
            .unwrap();
2088
            process_command(
2089
                "INSERT INTO docs (embedding) VALUES ([1.5, -2.0, 3.5]);",
2090
                &mut db,
2091
            )
2092
            .unwrap();
2093
            save_database(&mut db, &path).expect("save");
1✔
2094
        } // db drops → its exclusive lock releases before reopen.
1✔
2095

2096
        // Reopen and verify schema + data both round-tripped.
2097
        let loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2098
        let docs = loaded.get_table("docs".to_string()).expect("docs table");
2✔
2099

2100
        // Schema preserved: column is still VECTOR(3).
2101
        let embedding_col = docs
3✔
2102
            .columns
2103
            .iter()
2104
            .find(|c| c.column_name == "embedding")
3✔
2105
            .expect("embedding column");
UNCOV
2106
        assert!(
×
2107
            matches!(embedding_col.datatype, DataType::Vector(3)),
1✔
2108
            "expected DataType::Vector(3) after round-trip, got {:?}",
2109
            embedding_col.datatype
2110
        );
2111

2112
        // Data preserved: both vectors still readable bit-for-bit.
2113
        let mut rows: Vec<Vec<f32>> = docs
1✔
2114
            .rowids()
2115
            .iter()
2116
            .filter_map(|r| match docs.get_value("embedding", *r) {
3✔
2117
                Some(Value::Vector(v)) => Some(v),
1✔
UNCOV
2118
                _ => None,
×
2119
            })
2120
            .collect();
2121
        rows.sort_by(|a, b| a[0].partial_cmp(&b[0]).unwrap());
3✔
2122
        assert_eq!(rows.len(), 2);
1✔
2123
        assert_eq!(rows[0], vec![0.1f32, 0.2, 0.3]);
1✔
2124
        assert_eq!(rows[1], vec![1.5f32, -2.0, 3.5]);
1✔
2125

2126
        cleanup(&path);
1✔
2127
    }
2128

2129
    #[test]
2130
    fn round_trip_preserves_json_column() {
3✔
2131
        // Phase 7e — JSON columns are stored as Text under the hood with
2132
        // INSERT-time validation. Save + reopen should preserve the
2133
        // schema (DataType::Json) and the underlying text bytes; a
2134
        // post-reopen json_extract should still resolve paths correctly.
2135
        let path = tmp_path("json_roundtrip");
1✔
2136

2137
        {
2138
            let mut db = Database::new("test".to_string());
2✔
2139
            process_command(
2140
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, payload JSON);",
2141
                &mut db,
2142
            )
2143
            .unwrap();
2144
            process_command(
2145
                r#"INSERT INTO docs (payload) VALUES ('{"name": "alice", "tags": ["rust","sql"]}');"#,
2146
                &mut db,
2147
            )
2148
            .unwrap();
2149
            save_database(&mut db, &path).expect("save");
1✔
2150
        }
2151

2152
        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2153
        let docs = loaded.get_table("docs".to_string()).expect("docs");
2✔
2154

2155
        // Schema: column declared as JSON, restored with the same type.
2156
        let payload_col = docs
3✔
2157
            .columns
2158
            .iter()
2159
            .find(|c| c.column_name == "payload")
3✔
2160
            .unwrap();
UNCOV
2161
        assert!(
×
2162
            matches!(payload_col.datatype, DataType::Json),
1✔
2163
            "expected DataType::Json, got {:?}",
2164
            payload_col.datatype
2165
        );
2166

2167
        // json_extract works against the reopened data — exercises the
2168
        // full Text-storage + serde_json::from_str path post-reopen.
2169
        let resp = process_command(
2170
            r#"SELECT id FROM docs WHERE json_extract(payload, '$.name') = 'alice';"#,
2171
            &mut loaded,
2172
        )
2173
        .expect("select via json_extract after reopen");
2174
        assert!(resp.contains("1 row returned"), "got: {resp}");
2✔
2175

2176
        cleanup(&path);
2✔
2177
    }
2178

2179
    #[test]
2180
    fn round_trip_rebuilds_hnsw_index_from_create_sql() {
3✔
2181
        // Phase 7d.3: HNSW indexes now persist their graph as cell-encoded
2182
        // pages. After save+reopen the index entry reattaches with the
2183
        // same column + same node count, loaded directly from disk
2184
        // instead of re-walking rows.
2185
        let path = tmp_path("hnsw_roundtrip");
1✔
2186

2187
        // Build, populate, index, save.
2188
        {
2189
            let mut db = Database::new("test".to_string());
2✔
2190
            process_command(
2191
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2192
                &mut db,
2193
            )
2194
            .unwrap();
2195
            for v in &[
1✔
2196
                "[1.0, 0.0]",
2197
                "[2.0, 0.0]",
2198
                "[0.0, 3.0]",
2199
                "[1.0, 4.0]",
2200
                "[10.0, 10.0]",
2201
            ] {
2202
                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2✔
2203
            }
2204
            process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
1✔
2205
            save_database(&mut db, &path).expect("save");
1✔
2206
        } // db drops → exclusive lock releases.
1✔
2207

2208
        // Reopen and verify the index reattached, with the same name +
2209
        // column + populated graph.
2210
        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2211
        {
2212
            let table = loaded.get_table("docs".to_string()).expect("docs");
2✔
2213
            assert_eq!(table.hnsw_indexes.len(), 1, "HNSW index should reattach");
1✔
2214
            let entry = &table.hnsw_indexes[0];
2✔
2215
            assert_eq!(entry.name, "ix_e");
1✔
2216
            assert_eq!(entry.column_name, "e");
1✔
2217
            assert_eq!(entry.index.len(), 5, "loaded graph should hold all 5 rows");
1✔
UNCOV
2218
            assert!(
×
2219
                !entry.needs_rebuild,
1✔
2220
                "fresh load should not be marked dirty"
2221
            );
2222
        }
2223

2224
        // Quick functional check: KNN query through the loaded index
2225
        // returns results.
2226
        let resp = process_command(
2227
            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
2228
            &mut loaded,
2229
        )
2230
        .unwrap();
2231
        assert!(resp.contains("3 rows returned"), "got: {resp}");
2✔
2232

2233
        cleanup(&path);
2✔
2234
    }
2235

2236
    /// SQLR-28 — the HNSW metric must round-trip across save+reopen.
2237
    /// Without this, the SQL re-synthesised into `sqlrite_master`
2238
    /// would drop the metric and a cosine-built graph would reload
2239
    /// as L2, silently breaking subsequent cosine probes.
2240
    #[test]
2241
    fn round_trip_preserves_hnsw_cosine_metric() {
3✔
2242
        use crate::sql::hnsw::DistanceMetric;
2243
        let path = tmp_path("hnsw_metric_roundtrip");
1✔
2244

2245
        {
2246
            let mut db = Database::new("test".to_string());
2✔
2247
            process_command(
2248
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2249
                &mut db,
2250
            )
2251
            .unwrap();
2252
            for v in &["[1.0, 0.0]", "[0.0, 1.0]", "[0.7071, 0.7071]"] {
1✔
2253
                process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2✔
2254
            }
2255
            process_command(
2256
                "CREATE INDEX ix_cos ON docs USING hnsw (e) WITH (metric = 'cosine');",
2257
                &mut db,
2258
            )
2259
            .unwrap();
2260
            save_database(&mut db, &path).expect("save");
1✔
2261
        }
2262

2263
        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2264
        {
2265
            let table = loaded.get_table("docs".to_string()).expect("docs");
2✔
2266
            assert_eq!(table.hnsw_indexes.len(), 1);
1✔
2267
            assert_eq!(
1✔
2268
                table.hnsw_indexes[0].metric,
1✔
2269
                DistanceMetric::Cosine,
2270
                "metric should round-trip through CREATE INDEX SQL"
2271
            );
2272
            assert_eq!(table.hnsw_indexes[0].index.distance, DistanceMetric::Cosine);
2✔
2273
        }
2274

2275
        // Cosine probe still finds the self-vector after reopen — the
2276
        // optimizer's metric gate should match the loaded entry's
2277
        // metric, so this should hit the graph shortcut.
2278
        let resp = process_command(
2279
            "SELECT id FROM docs ORDER BY vec_distance_cosine(e, [1.0, 0.0]) ASC LIMIT 1;",
2280
            &mut loaded,
2281
        )
2282
        .unwrap();
2283
        assert!(resp.contains("1 row returned"), "got: {resp}");
2✔
2284

2285
        cleanup(&path);
2✔
2286
    }
2287

2288
    #[test]
2289
    fn round_trip_rebuilds_fts_index_from_create_sql() {
3✔
2290
        // Phase 8c: FTS indexes now persist their posting lists as
2291
        // cell-encoded pages. After save+reopen the index entry
2292
        // reattaches with the same column + same posting count, loaded
2293
        // directly from disk (no re-tokenization).
2294
        let path = tmp_path("fts_roundtrip");
1✔
2295

2296
        {
2297
            let mut db = Database::new("test".to_string());
2✔
2298
            process_command(
2299
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2300
                &mut db,
2301
            )
2302
            .unwrap();
2303
            for body in &[
1✔
2304
                "rust embedded database",
2305
                "rust web framework",
2306
                "go embedded systems",
2307
                "python web framework",
2308
                "rust rust embedded power",
2309
            ] {
2310
                process_command(
2311
                    &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2✔
2312
                    &mut db,
2313
                )
2314
                .unwrap();
2315
            }
2316
            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2317
            save_database(&mut db, &path).expect("save");
1✔
2318
        } // db drops → exclusive lock releases.
1✔
2319

2320
        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2321
        {
2322
            let table = loaded.get_table("docs".to_string()).expect("docs");
2✔
2323
            assert_eq!(table.fts_indexes.len(), 1, "FTS index should reattach");
1✔
2324
            let entry = &table.fts_indexes[0];
2✔
2325
            assert_eq!(entry.name, "ix_body");
1✔
2326
            assert_eq!(entry.column_name, "body");
1✔
2327
            assert_eq!(
1✔
2328
                entry.index.len(),
1✔
2329
                5,
2330
                "rebuilt posting list should hold all 5 rows"
2331
            );
2332
            assert!(!entry.needs_rebuild);
1✔
2333
        }
2334

2335
        // Functional smoke: an FTS query through the reloaded index
2336
        // returns the expected hit count.
2337
        let resp = process_command(
2338
            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2339
            &mut loaded,
2340
        )
2341
        .unwrap();
2342
        assert!(resp.contains("3 rows returned"), "got: {resp}");
2✔
2343

2344
        cleanup(&path);
2✔
2345
    }
2346

2347
    #[test]
2348
    fn delete_then_save_then_reopen_excludes_deleted_node_from_fts() {
3✔
2349
        // Phase 8b — DELETE marks the FTS index dirty; save rebuilds it
2350
        // from current rows; reopen replays the CREATE INDEX SQL against
2351
        // the post-delete row set. The deleted rowid must not surface
2352
        // in `fts_match` results post-reopen.
2353
        let path = tmp_path("fts_delete_rebuild");
1✔
2354
        let mut db = Database::new("test".to_string());
2✔
2355
        process_command(
2356
            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2357
            &mut db,
2358
        )
2359
        .unwrap();
2360
        for body in &[
1✔
2361
            "rust embedded",
2362
            "rust framework",
2363
            "go embedded",
2364
            "python web",
2365
        ] {
2366
            process_command(
2367
                &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2✔
2368
                &mut db,
2369
            )
2370
            .unwrap();
2371
        }
2372
        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2373

2374
        // Delete row 1 ('rust embedded'); save (rebuild fires); reopen.
2375
        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
1✔
2376
        save_database(&mut db, &path).expect("save");
1✔
2377
        drop(db);
1✔
2378

2379
        let mut loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2380
        let resp = process_command(
2381
            "SELECT id FROM docs WHERE fts_match(body, 'rust');",
2382
            &mut loaded,
2383
        )
2384
        .unwrap();
2385
        // Pre-delete: 2 rows ('rust embedded', 'rust framework') had
2386
        // 'rust'. Post-delete: only id=2 remains.
2387
        assert!(resp.contains("1 row returned"), "got: {resp}");
2✔
2388

2389
        cleanup(&path);
2✔
2390
    }
2391

2392
    #[test]
2393
    fn fts_roundtrip_uses_persistence_path_not_replay() {
3✔
2394
        // Phase 8c — assert the reload didn't go through the
2395
        // rootpage=0 replay shortcut. We do this by reading the
2396
        // sqlrite_master row for the FTS index and confirming its
2397
        // rootpage field is non-zero.
2398
        let path = tmp_path("fts_persistence_path");
1✔
2399

2400
        {
2401
            let mut db = Database::new("test".to_string());
2✔
2402
            process_command(
2403
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2404
                &mut db,
2405
            )
2406
            .unwrap();
2407
            process_command(
2408
                "INSERT INTO docs (body) VALUES ('rust embedded database');",
2409
                &mut db,
2410
            )
2411
            .unwrap();
2412
            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2413
            save_database(&mut db, &path).expect("save");
1✔
2414
        }
2415

2416
        // Read raw sqlrite_master to find the FTS index row.
2417
        let pager = Pager::open(&path).expect("open pager");
1✔
2418
        let mut master = build_empty_master_table();
1✔
2419
        load_table_rows(&pager, &mut master, pager.header().schema_root_page).unwrap();
2✔
2420
        let mut found_rootpage: Option<u32> = None;
1✔
2421
        for rowid in master.rowids() {
2✔
2422
            let name = take_text(&master, "name", rowid).unwrap();
2✔
2423
            if name == "ix_body" {
3✔
2424
                let rp = take_integer(&master, "rootpage", rowid).unwrap();
1✔
2425
                found_rootpage = Some(rp as u32);
1✔
2426
            }
2427
        }
2428
        let rootpage = found_rootpage.expect("ix_body row in sqlrite_master");
1✔
UNCOV
2429
        assert!(
×
2430
            rootpage != 0,
1✔
2431
            "Phase 8c FTS save should set rootpage != 0; got {rootpage}"
2432
        );
2433

2434
        cleanup(&path);
2✔
2435
    }
2436

2437
    #[test]
2438
    fn save_without_fts_keeps_format_v4() {
3✔
2439
        // Phase 8c on-demand bump — a database with zero FTS indexes
2440
        // continues writing the v4 header. Existing v4 users must not
2441
        // see their files silently promoted to v5 by an upgrade.
2442
        use crate::sql::pager::header::FORMAT_VERSION_V4;
2443

2444
        let path = tmp_path("fts_no_bump");
1✔
2445
        let mut db = Database::new("test".to_string());
2✔
2446
        process_command(
2447
            "CREATE TABLE t (id INTEGER PRIMARY KEY, n INTEGER);",
2448
            &mut db,
2449
        )
2450
        .unwrap();
2451
        process_command("INSERT INTO t (n) VALUES (1);", &mut db).unwrap();
1✔
2452
        save_database(&mut db, &path).unwrap();
1✔
2453
        drop(db);
1✔
2454

2455
        let pager = Pager::open(&path).expect("open");
1✔
2456
        assert_eq!(
1✔
2457
            pager.header().format_version,
1✔
2458
            FORMAT_VERSION_V4,
2459
            "no-FTS save should keep v4"
2460
        );
2461
        cleanup(&path);
2✔
2462
    }
2463

2464
    #[test]
2465
    fn save_with_fts_bumps_to_v5() {
3✔
2466
        // Phase 8c on-demand bump — first FTS-bearing save promotes
2467
        // the file to v5. v5 readers handle both v4 and v5; v4
2468
        // readers correctly refuse a v5 file.
2469
        use crate::sql::pager::header::FORMAT_VERSION_V5;
2470

2471
        let path = tmp_path("fts_bump_v5");
1✔
2472
        let mut db = Database::new("test".to_string());
2✔
2473
        process_command(
2474
            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2475
            &mut db,
2476
        )
2477
        .unwrap();
2478
        process_command("INSERT INTO docs (body) VALUES ('hello');", &mut db).unwrap();
1✔
2479
        process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2480
        save_database(&mut db, &path).unwrap();
1✔
2481
        drop(db);
1✔
2482

2483
        let pager = Pager::open(&path).expect("open");
1✔
2484
        assert_eq!(
1✔
2485
            pager.header().format_version,
1✔
2486
            FORMAT_VERSION_V5,
2487
            "FTS save should promote to v5"
2488
        );
2489
        cleanup(&path);
2✔
2490
    }
2491

2492
    #[test]
2493
    fn fts_persistence_handles_empty_and_zero_token_docs() {
3✔
2494
        // Phase 8c — sidecar cell carries doc-lengths for every doc
2495
        // including any with zero tokens (so total_docs is honest
2496
        // post-reopen). Empty index also round-trips: a CREATE INDEX
2497
        // on an empty table emits a single empty leaf with just the
2498
        // (empty) sidecar.
2499
        let path = tmp_path("fts_edges");
1✔
2500

2501
        {
2502
            let mut db = Database::new("test".to_string());
2✔
2503
            process_command(
2504
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2505
                &mut db,
2506
            )
2507
            .unwrap();
2508
            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2509
            // Mix: real text, then a row that tokenizes to zero tokens
2510
            // (only punctuation), then real again.
2511
            process_command("INSERT INTO docs (body) VALUES ('rust embedded');", &mut db).unwrap();
1✔
2512
            process_command("INSERT INTO docs (body) VALUES ('!!!---???');", &mut db).unwrap();
1✔
2513
            process_command("INSERT INTO docs (body) VALUES ('go embedded');", &mut db).unwrap();
1✔
2514
            save_database(&mut db, &path).unwrap();
1✔
2515
        }
2516

2517
        let loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2518
        let table = loaded.get_table("docs".to_string()).unwrap();
2✔
2519
        let entry = &table.fts_indexes[0];
1✔
2520
        // All three rows present — including the zero-token row,
2521
        // which is critical for total_docs honesty in BM25.
2522
        assert_eq!(entry.index.len(), 3);
1✔
2523
        // 'embedded' appears in 2 rows after reload.
2524
        let res = entry
1✔
2525
            .index
2526
            .query("embedded", &crate::sql::fts::Bm25Params::default());
1✔
2527
        assert_eq!(res.len(), 2);
2✔
2528

2529
        cleanup(&path);
1✔
2530
    }
2531

2532
    #[test]
2533
    fn fts_persistence_round_trips_large_corpus() {
4✔
2534
        // Phase 8c — exercise multi-leaf staging. ~500 docs with
2535
        // single-token bodies generates enough cells to overflow a
2536
        // single 4 KiB leaf (each posting cell averages ~8 bytes).
2537
        let path = tmp_path("fts_large_corpus");
1✔
2538

2539
        let mut expected_terms: std::collections::BTreeSet<String> =
1✔
2540
            std::collections::BTreeSet::new();
2541
        {
2542
            let mut db = Database::new("test".to_string());
2✔
2543
            process_command(
2544
                "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2545
                &mut db,
2546
            )
2547
            .unwrap();
2548
            process_command("CREATE INDEX ix_body ON docs USING fts (body);", &mut db).unwrap();
1✔
2549
            // 500 docs, each one a unique term — drives unique-term
2550
            // count up so multiple leaves are required.
2551
            for i in 0..500 {
2✔
2552
                let term = format!("term{i:04}");
2✔
2553
                process_command(
2554
                    &format!("INSERT INTO docs (body) VALUES ('{term}');"),
2✔
2555
                    &mut db,
2556
                )
2557
                .unwrap();
2558
                expected_terms.insert(term);
1✔
2559
            }
2560
            save_database(&mut db, &path).unwrap();
1✔
2561
        }
2562

2563
        let loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2564
        let table = loaded.get_table("docs".to_string()).unwrap();
2✔
2565
        let entry = &table.fts_indexes[0];
1✔
2566
        assert_eq!(entry.index.len(), 500);
1✔
2567

2568
        // Spot-check a handful of terms come back with their original
2569
        // single-row posting list.
2570
        for &i in &[0_i64, 137, 248, 391, 499] {
1✔
2571
            let term = format!("term{i:04}");
2✔
2572
            let res = entry
1✔
2573
                .index
2574
                .query(&term, &crate::sql::fts::Bm25Params::default());
2✔
2575
            assert_eq!(res.len(), 1, "term {term} should match exactly 1 row");
2✔
2576
            // PrimaryKey rowids start at 1; doc i was inserted at
2577
            // rowid i+1.
2578
            assert_eq!(res[0].0, i + 1);
2✔
2579
        }
2580

2581
        cleanup(&path);
1✔
2582
    }
2583

2584
    #[test]
2585
    fn delete_then_save_then_reopen_excludes_deleted_node_from_hnsw() {
3✔
2586
        // Phase 7d.3 — DELETE marks HNSW dirty; save rebuilds it from
2587
        // current rows + serializes; reopen loads the post-delete graph.
2588
        // After all that, the deleted rowid must NOT come back from a
2589
        // KNN query.
2590
        let path = tmp_path("hnsw_delete_rebuild");
1✔
2591
        let mut db = Database::new("test".to_string());
2✔
2592
        process_command(
2593
            "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2594
            &mut db,
2595
        )
2596
        .unwrap();
2597
        for v in &["[1.0, 0.0]", "[2.0, 0.0]", "[3.0, 0.0]", "[4.0, 0.0]"] {
1✔
2598
            process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db).unwrap();
2✔
2599
        }
2600
        process_command("CREATE INDEX ix_e ON docs USING hnsw (e);", &mut db).unwrap();
1✔
2601

2602
        // Delete row 1 (the closest match to [0.5, 0.0]).
2603
        process_command("DELETE FROM docs WHERE id = 1;", &mut db).unwrap();
1✔
2604
        // Confirm it marked dirty.
2605
        let dirty_before_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
1✔
2606
        assert!(dirty_before_save, "DELETE should mark dirty");
1✔
2607

2608
        save_database(&mut db, &path).expect("save");
2✔
2609
        // Confirm save cleared the dirty flag.
2610
        let dirty_after_save = db.tables["docs"].hnsw_indexes[0].needs_rebuild;
1✔
2611
        assert!(!dirty_after_save, "save should clear dirty");
1✔
2612
        drop(db);
1✔
2613

2614
        // Reopen, query for the closest match. Row 1 is gone; row 2
2615
        // (id=2, vector [2.0, 0.0]) should now be the nearest.
2616
        let loaded = open_database(&path, "test".to_string()).expect("open");
1✔
2617
        let docs = loaded.get_table("docs".to_string()).expect("docs");
2✔
2618

2619
        // Row 1 must not appear in any storage anymore.
2620
        assert!(
1✔
2621
            !docs.rowids().contains(&1),
2✔
2622
            "deleted row 1 should not be in row storage"
2623
        );
2624
        assert_eq!(docs.rowids().len(), 3, "should have 3 surviving rows");
1✔
2625

2626
        // The HNSW index must also have shed the deleted node.
2627
        assert_eq!(
1✔
2628
            docs.hnsw_indexes[0].index.len(),
1✔
2629
            3,
2630
            "HNSW graph should have shed the deleted node"
2631
        );
2632

2633
        cleanup(&path);
2✔
2634
    }
2635

2636
    #[test]
2637
    fn round_trip_survives_writes_after_load() {
3✔
2638
        let path = tmp_path("after_load");
1✔
2639
        save_database(&mut seed_db(), &path).unwrap();
2✔
2640

2641
        {
2642
            let mut db = open_database(&path, "test".to_string()).unwrap();
1✔
2643
            process_command(
2644
                "INSERT INTO users (name, age) VALUES ('carol', 40);",
2645
                &mut db,
2646
            )
2647
            .unwrap();
2648
            save_database(&mut db, &path).unwrap();
1✔
2649
        } // db drops → its exclusive lock releases before we reopen below.
1✔
2650

2651
        let db2 = open_database(&path, "test".to_string()).unwrap();
1✔
2652
        let users = db2.get_table("users".to_string()).unwrap();
2✔
2653
        assert_eq!(users.rowids().len(), 3);
1✔
2654

2655
        cleanup(&path);
1✔
2656
    }
2657

2658
    #[test]
2659
    fn open_rejects_garbage_file() {
3✔
2660
        let path = tmp_path("bad");
1✔
2661
        std::fs::write(&path, b"not a sqlrite database, just bytes").unwrap();
2✔
2662
        let result = open_database(&path, "x".to_string());
1✔
2663
        assert!(result.is_err());
2✔
2664
        cleanup(&path);
1✔
2665
    }
2666

2667
    #[test]
2668
    fn many_small_rows_spread_across_leaves() {
3✔
2669
        let path = tmp_path("many_rows");
1✔
2670
        let mut db = Database::new("big".to_string());
2✔
2671
        process_command(
2672
            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2673
            &mut db,
2674
        )
2675
        .unwrap();
2676
        for i in 0..200 {
1✔
2677
            let body = "x".repeat(200);
1✔
2678
            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2✔
2679
            process_command(&q, &mut db).unwrap();
2✔
2680
        }
2681
        save_database(&mut db, &path).unwrap();
1✔
2682
        let loaded = open_database(&path, "big".to_string()).unwrap();
1✔
2683
        let things = loaded.get_table("things".to_string()).unwrap();
2✔
2684
        assert_eq!(things.rowids().len(), 200);
1✔
2685
        cleanup(&path);
1✔
2686
    }
2687

2688
    #[test]
2689
    fn huge_row_goes_through_overflow() {
3✔
2690
        let path = tmp_path("overflow_row");
1✔
2691
        let mut db = Database::new("big".to_string());
2✔
2692
        process_command(
2693
            "CREATE TABLE docs (id INTEGER PRIMARY KEY, body TEXT);",
2694
            &mut db,
2695
        )
2696
        .unwrap();
2697
        let body = "A".repeat(10_000);
1✔
2698
        process_command(
2699
            &format!("INSERT INTO docs (body) VALUES ('{body}');"),
2✔
2700
            &mut db,
2701
        )
2702
        .unwrap();
2703
        save_database(&mut db, &path).unwrap();
1✔
2704

2705
        let loaded = open_database(&path, "big".to_string()).unwrap();
1✔
2706
        let docs = loaded.get_table("docs".to_string()).unwrap();
2✔
2707
        let rowids = docs.rowids();
1✔
2708
        assert_eq!(rowids.len(), 1);
2✔
2709
        let stored = docs.get_value("body", rowids[0]);
1✔
2710
        match stored {
1✔
2711
            Some(Value::Text(s)) => assert_eq!(s.len(), 10_000),
1✔
UNCOV
2712
            other => panic!("expected Text, got {other:?}"),
×
2713
        }
2714
        cleanup(&path);
1✔
2715
    }
2716

2717
    #[test]
2718
    fn create_sql_synthesis_round_trips() {
3✔
2719
        // Build a table via CREATE, then verify table_to_create_sql +
2720
        // parse_create_sql reproduce an equivalent column list.
2721
        let mut db = Database::new("x".to_string());
1✔
2722
        process_command(
2723
            "CREATE TABLE t (id INTEGER PRIMARY KEY, tag TEXT UNIQUE, note TEXT NOT NULL);",
2724
            &mut db,
2725
        )
2726
        .unwrap();
2727
        let t = db.get_table("t".to_string()).unwrap();
1✔
2728
        let sql = table_to_create_sql(t);
1✔
2729
        let (name, cols) = parse_create_sql(&sql).unwrap();
2✔
2730
        assert_eq!(name, "t");
2✔
2731
        assert_eq!(cols.len(), 3);
1✔
2732
        assert!(cols[0].is_pk);
1✔
2733
        assert!(cols[1].is_unique);
1✔
2734
        assert!(cols[2].not_null);
1✔
2735
    }
2736

2737
    #[test]
2738
    fn sqlrite_master_is_not_exposed_as_a_user_table() {
3✔
2739
        // After open, the public db.tables map should not list the master.
2740
        let path = tmp_path("no_master");
1✔
2741
        save_database(&mut seed_db(), &path).unwrap();
2✔
2742
        let loaded = open_database(&path, "x".to_string()).unwrap();
1✔
2743
        assert!(!loaded.tables.contains_key(MASTER_TABLE_NAME));
2✔
2744
        cleanup(&path);
2✔
2745
    }
2746

2747
    #[test]
2748
    fn multi_leaf_table_produces_an_interior_root() {
3✔
2749
        // 200 fat rows force the table into multiple leaves, which means
2750
        // save_database must build at least one InteriorNode above them.
2751
        // The test verifies the round-trip works and confirms the root is
2752
        // indeed an interior page (not a leaf) by reading the page type
2753
        // directly out of the open pager.
2754
        let path = tmp_path("multi_leaf_interior");
1✔
2755
        let mut db = Database::new("big".to_string());
2✔
2756
        process_command(
2757
            "CREATE TABLE things (id INTEGER PRIMARY KEY, data TEXT);",
2758
            &mut db,
2759
        )
2760
        .unwrap();
2761
        for i in 0..200 {
1✔
2762
            let body = "x".repeat(200);
1✔
2763
            let q = format!("INSERT INTO things (data) VALUES ('row-{i}-{body}');");
2✔
2764
            process_command(&q, &mut db).unwrap();
2✔
2765
        }
2766
        save_database(&mut db, &path).unwrap();
1✔
2767

2768
        // Confirm the round-trip preserved all 200 rows.
2769
        let loaded = open_database(&path, "big".to_string()).unwrap();
1✔
2770
        let things = loaded.get_table("things".to_string()).unwrap();
2✔
2771
        assert_eq!(things.rowids().len(), 200);
1✔
2772

2773
        // Peek at `things`'s root page via the pager attached to the
2774
        // loaded DB and check it's an InteriorNode, not a leaf.
2775
        let pager = loaded
2✔
2776
            .pager
2777
            .as_ref()
2778
            .expect("loaded DB should have a pager");
2779
        // sqlrite_master's row for `things` holds its root page. Easiest
2780
        // way to find it: walk the leaf chain by using find_leftmost_leaf
2781
        // and then hop one level up. Simpler: read the master, scan for
2782
        // the "things" row, look up rootpage.
2783
        let mut master = build_empty_master_table();
1✔
2784
        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2✔
2785
        let things_root = master
1✔
2786
            .rowids()
2787
            .into_iter()
2788
            .find_map(|r| match master.get_value("name", r) {
3✔
2789
                Some(Value::Text(s)) if s == "things" => match master.get_value("rootpage", r) {
3✔
2790
                    Some(Value::Integer(p)) => Some(p as u32),
1✔
UNCOV
2791
                    _ => None,
×
2792
                },
UNCOV
2793
                _ => None,
×
2794
            })
2795
            .expect("things should appear in sqlrite_master");
2796
        let root_buf = pager.read_page(things_root).unwrap();
1✔
2797
        assert_eq!(
1✔
2798
            root_buf[0],
2799
            PageType::InteriorNode as u8,
2800
            "expected a multi-leaf table to have an interior root, got tag {}",
UNCOV
2801
            root_buf[0]
×
2802
        );
2803

2804
        cleanup(&path);
2✔
2805
    }
2806

2807
    #[test]
2808
    fn explicit_index_persists_across_save_and_open() {
4✔
2809
        let path = tmp_path("idx_persist");
1✔
2810
        let mut db = Database::new("idx".to_string());
2✔
2811
        process_command(
2812
            "CREATE TABLE users (id INTEGER PRIMARY KEY, tag TEXT);",
2813
            &mut db,
2814
        )
2815
        .unwrap();
2816
        for i in 1..=5 {
1✔
2817
            let tag = if i % 2 == 0 { "odd" } else { "even" };
2✔
2818
            process_command(
2819
                &format!("INSERT INTO users (tag) VALUES ('{tag}');"),
1✔
2820
                &mut db,
2821
            )
2822
            .unwrap();
2823
        }
2824
        process_command("CREATE INDEX users_tag_idx ON users (tag);", &mut db).unwrap();
1✔
2825
        save_database(&mut db, &path).unwrap();
1✔
2826

2827
        let loaded = open_database(&path, "idx".to_string()).unwrap();
1✔
2828
        let users = loaded.get_table("users".to_string()).unwrap();
2✔
2829
        let idx = users
1✔
2830
            .index_by_name("users_tag_idx")
2831
            .expect("explicit index should survive save/open");
2832
        assert_eq!(idx.column_name, "tag");
1✔
2833
        assert!(!idx.is_unique);
1✔
2834
        // 5 rows: rowids 2, 4 are "odd" (i % 2 == 0 when i is 2 or 4) — 2 entries;
2835
        // rowids 1, 3, 5 are "even" (i % 2 != 0) — 3 entries.
2836
        let even_rowids = idx.lookup(&Value::Text("even".into()));
2✔
2837
        let odd_rowids = idx.lookup(&Value::Text("odd".into()));
1✔
2838
        assert_eq!(even_rowids.len(), 3);
1✔
2839
        assert_eq!(odd_rowids.len(), 2);
1✔
2840

2841
        cleanup(&path);
1✔
2842
    }
2843

2844
    #[test]
2845
    fn auto_indexes_for_unique_columns_survive_save_open() {
3✔
2846
        let path = tmp_path("auto_idx_persist");
1✔
2847
        let mut db = Database::new("a".to_string());
2✔
2848
        process_command(
2849
            "CREATE TABLE users (id INTEGER PRIMARY KEY, email TEXT NOT NULL UNIQUE);",
2850
            &mut db,
2851
        )
2852
        .unwrap();
2853
        process_command("INSERT INTO users (email) VALUES ('a@x');", &mut db).unwrap();
1✔
2854
        process_command("INSERT INTO users (email) VALUES ('b@x');", &mut db).unwrap();
1✔
2855
        save_database(&mut db, &path).unwrap();
1✔
2856

2857
        let loaded = open_database(&path, "a".to_string()).unwrap();
1✔
2858
        let users = loaded.get_table("users".to_string()).unwrap();
2✔
2859
        // Every UNIQUE column auto-creates an index; the load path populated
2860
        // it from the persisted entries.
2861
        let auto_name = SecondaryIndex::auto_name("users", "email");
1✔
2862
        let idx = users
1✔
2863
            .index_by_name(&auto_name)
2✔
2864
            .expect("auto index should be restored");
2865
        assert!(idx.is_unique);
1✔
2866
        assert_eq!(idx.lookup(&Value::Text("a@x".into())).len(), 1);
1✔
2867
        assert_eq!(idx.lookup(&Value::Text("b@x".into())).len(), 1);
1✔
2868

2869
        cleanup(&path);
1✔
2870
    }
2871

2872
    /// SQLR-1 — `CREATE INDEX` on a wide table must round-trip when the
2873
    /// index B-tree grows past one leaf and needs an interior level.
2874
    /// Before the fix, the post-DDL auto-save panicked with
2875
    /// `Internal("unknown paged-entry kind tag 0x4 …")` because a
2876
    /// table-cell decoder was being run against an index leaf
2877
    /// (`KIND_INDEX = 0x04`).
2878
    ///
2879
    /// 5 000 rows mirror the original repro from the issue and exceed
2880
    /// every leaf-fanout cliff for the small `(rowid, value)` cells in
2881
    /// a TEXT-keyed secondary index.
2882
    #[test]
2883
    fn secondary_index_with_interior_level_round_trips() {
3✔
2884
        let path = tmp_path("sqlr1_wide_index");
1✔
2885
        let mut db = Database::new("idx".to_string());
2✔
2886
        db.source_path = Some(path.clone());
2✔
2887

2888
        process_command(
2889
            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
2890
            &mut db,
2891
        )
2892
        .unwrap();
2893
        // BEGIN/COMMIT collapses 5 000 inserts into one save (matches
2894
        // `auto_vacuum_setup` and the issue's repro shape).
2895
        process_command("BEGIN;", &mut db).unwrap();
1✔
2896
        for i in 0..5000 {
1✔
2897
            process_command(
2898
                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
2✔
2899
                &mut db,
2900
            )
2901
            .unwrap();
2902
        }
2903
        process_command("COMMIT;", &mut db).unwrap();
1✔
2904

2905
        // The DDL that used to panic.
2906
        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
1✔
2907

2908
        // Reopen and verify lookups, plus that the index tree actually
2909
        // grew an interior layer (otherwise this test wouldn't cover the
2910
        // regression).
2911
        drop(db);
1✔
2912
        let loaded = open_database(&path, "idx".to_string()).unwrap();
1✔
2913
        let bloat = loaded.get_table("bloat".to_string()).unwrap();
2✔
2914
        let idx = bloat
1✔
2915
            .index_by_name("idx_p")
2916
            .expect("idx_p should survive close/reopen");
2917
        assert!(!idx.is_unique);
1✔
2918

2919
        // Spot-check the keyspace: first, middle, last value each map
2920
        // back to exactly the row that carried them.
2921
        for &(probe_i, expected_rowid) in &[(0i64, 1i64), (2500, 2501), (4999, 5000)] {
2✔
2922
            let value = Value::Text(format!("p-{probe_i:08}"));
2✔
2923
            let hits = idx.lookup(&value);
1✔
2924
            assert_eq!(
1✔
2925
                hits,
2926
                vec![expected_rowid],
2✔
2927
                "lookup({value:?}) should yield rowid {expected_rowid}",
2928
            );
2929
        }
2930

2931
        // Confirm the index tree is multi-level (the regression's
2932
        // necessary condition) — root must be an `InteriorNode` and
2933
        // `find_leftmost_leaf` must reach a `TableLeaf` through it.
2934
        let pager = loaded.pager.as_ref().unwrap();
1✔
2935
        let mut master = build_empty_master_table();
1✔
2936
        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2✔
2937
        let idx_root = master
1✔
2938
            .rowids()
2939
            .into_iter()
2940
            .find_map(
2941
                |r| match (master.get_value("name", r), master.get_value("type", r)) {
3✔
2942
                    (Some(Value::Text(name)), Some(Value::Text(kind)))
2✔
2943
                        if name == "idx_p" && kind == "index" =>
2✔
2944
                    {
2945
                        match master.get_value("rootpage", r) {
2✔
2946
                            Some(Value::Integer(p)) => Some(p as u32),
1✔
UNCOV
2947
                            _ => None,
×
2948
                        }
2949
                    }
2950
                    _ => None,
1✔
2951
                },
2952
            )
2953
            .expect("idx_p should appear in sqlrite_master");
2954
        let root_buf = pager.read_page(idx_root).unwrap();
1✔
2955
        assert_eq!(
1✔
2956
            root_buf[0],
2957
            PageType::InteriorNode as u8,
2958
            "5 000-entry index must have an interior root — without one this test wouldn't cover SQLR-1",
2959
        );
2960
        let leaf = find_leftmost_leaf(pager, idx_root).unwrap();
2✔
2961
        let leaf_buf = pager.read_page(leaf).unwrap();
1✔
2962
        assert_eq!(leaf_buf[0], PageType::TableLeaf as u8);
1✔
2963

2964
        cleanup(&path);
1✔
2965
    }
2966

2967
    /// SQLR-1 follow-on — the page-recycling path between two large
2968
    /// versions of the same index name must not corrupt cell decoding.
2969
    /// `DROP INDEX` returns its pages to the freelist; the next
2970
    /// `CREATE INDEX` is free to reuse them. If the allocator hands an
2971
    /// old index leaf to a *table* without zeroing it, an upstream
2972
    /// table walk would see KIND_INDEX cells and panic.
2973
    #[test]
2974
    fn drop_then_recreate_wide_index_does_not_panic() {
3✔
2975
        let path = tmp_path("sqlr1_drop_recreate");
1✔
2976
        let mut db = Database::new("idx".to_string());
2✔
2977
        db.source_path = Some(path.clone());
2✔
2978

2979
        process_command(
2980
            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
2981
            &mut db,
2982
        )
2983
        .unwrap();
2984
        process_command("BEGIN;", &mut db).unwrap();
1✔
2985
        for i in 0..5000 {
1✔
2986
            process_command(
2987
                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
2✔
2988
                &mut db,
2989
            )
2990
            .unwrap();
2991
        }
2992
        process_command("COMMIT;", &mut db).unwrap();
1✔
2993

2994
        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
1✔
2995
        process_command("DROP INDEX idx_p;", &mut db).unwrap();
1✔
2996
        // Recreate from scratch — exercises the recycle path.
2997
        process_command("CREATE INDEX idx_p ON bloat (payload);", &mut db).unwrap();
1✔
2998

2999
        drop(db);
1✔
3000
        let loaded = open_database(&path, "idx".to_string()).unwrap();
1✔
3001
        let bloat = loaded.get_table("bloat".to_string()).unwrap();
2✔
3002
        let idx = bloat
1✔
3003
            .index_by_name("idx_p")
3004
            .expect("idx_p should survive drop+recreate+reopen");
3005
        assert_eq!(
1✔
3006
            idx.lookup(&Value::Text("p-00002500".into())),
1✔
3007
            vec![2501],
2✔
3008
            "post-recycle lookup must still resolve correctly",
3009
        );
3010

3011
        cleanup(&path);
1✔
3012
    }
3013

3014
    #[test]
3015
    fn deep_tree_round_trips() {
3✔
3016
        // Force a 3-level tree by bypassing process_command (which prints
3017
        // the full table on every INSERT, making large bulk loads O(N^2)
3018
        // in I/O). We build the Table directly via restore_row.
3019
        use crate::sql::db::table::Column as TableColumn;
3020

3021
        let path = tmp_path("deep_tree");
1✔
3022
        let mut db = Database::new("deep".to_string());
2✔
3023
        let columns = vec![
3✔
3024
            TableColumn::new("id".into(), "integer".into(), true, true, true),
2✔
3025
            TableColumn::new("s".into(), "text".into(), false, true, false),
2✔
3026
        ];
3027
        let mut table = build_empty_table("t", columns, 0);
1✔
3028
        // ~900-byte rows → ~4 rows per leaf. 6000 rows → ~1500 leaves,
3029
        // which with interior fanout ~400 needs 2 interior levels (3-level
3030
        // tree total, counting leaves).
3031
        for i in 1..=6_000i64 {
2✔
3032
            let body = "q".repeat(900);
1✔
3033
            table
1✔
3034
                .restore_row(
3035
                    i,
3036
                    vec![
3✔
3037
                        Some(Value::Integer(i)),
1✔
3038
                        Some(Value::Text(format!("r-{i}-{body}"))),
2✔
3039
                    ],
3040
                )
3041
                .unwrap();
3042
        }
3043
        db.tables.insert("t".to_string(), table);
1✔
3044
        save_database(&mut db, &path).unwrap();
1✔
3045

3046
        let loaded = open_database(&path, "deep".to_string()).unwrap();
1✔
3047
        let t = loaded.get_table("t".to_string()).unwrap();
2✔
3048
        assert_eq!(t.rowids().len(), 6_000);
1✔
3049

3050
        // Confirm the tree actually grew past 2 levels — i.e., the root's
3051
        // leftmost child is itself an interior page, not a leaf.
3052
        let pager = loaded.pager.as_ref().unwrap();
1✔
3053
        let mut master = build_empty_master_table();
1✔
3054
        load_table_rows(pager, &mut master, pager.header().schema_root_page).unwrap();
2✔
3055
        let t_root = master
1✔
3056
            .rowids()
3057
            .into_iter()
3058
            .find_map(|r| match master.get_value("name", r) {
3✔
3059
                Some(Value::Text(s)) if s == "t" => match master.get_value("rootpage", r) {
3✔
3060
                    Some(Value::Integer(p)) => Some(p as u32),
1✔
UNCOV
3061
                    _ => None,
×
3062
                },
UNCOV
3063
                _ => None,
×
3064
            })
3065
            .expect("t in sqlrite_master");
3066
        let root_buf = pager.read_page(t_root).unwrap();
1✔
3067
        assert_eq!(root_buf[0], PageType::InteriorNode as u8);
1✔
3068
        let root_payload: &[u8; PAYLOAD_PER_PAGE] =
1✔
3069
            (&root_buf[PAGE_HEADER_SIZE..]).try_into().unwrap();
3070
        let root_interior = InteriorPage::from_bytes(root_payload);
1✔
3071
        let child = root_interior.leftmost_child().unwrap();
2✔
3072
        let child_buf = pager.read_page(child).unwrap();
1✔
3073
        assert_eq!(
1✔
3074
            child_buf[0],
3075
            PageType::InteriorNode as u8,
3076
            "expected 3-level tree: root's leftmost child should also be InteriorNode",
3077
        );
3078

3079
        cleanup(&path);
2✔
3080
    }
3081

3082
    #[test]
3083
    fn alter_rename_table_survives_save_and_reopen() {
3✔
3084
        let path = tmp_path("alter_rename_table_roundtrip");
1✔
3085
        let mut db = seed_db();
1✔
3086
        save_database(&mut db, &path).expect("save");
2✔
3087

3088
        process_command("ALTER TABLE users RENAME TO members;", &mut db).expect("rename");
1✔
3089
        save_database(&mut db, &path).expect("save after rename");
1✔
3090

3091
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
3092
        assert!(!loaded.contains_table("users".to_string()));
2✔
3093
        assert!(loaded.contains_table("members".to_string()));
2✔
3094
        let members = loaded.get_table("members".to_string()).unwrap();
1✔
3095
        assert_eq!(members.rowids().len(), 2, "rows should survive");
1✔
3096
        // Auto-indexes followed the rename.
UNCOV
3097
        assert!(
×
3098
            members
2✔
3099
                .index_by_name("sqlrite_autoindex_members_id")
1✔
3100
                .is_some()
1✔
3101
        );
UNCOV
3102
        assert!(
×
3103
            members
2✔
3104
                .index_by_name("sqlrite_autoindex_members_name")
1✔
3105
                .is_some()
1✔
3106
        );
3107

3108
        cleanup(&path);
1✔
3109
    }
3110

3111
    #[test]
3112
    fn alter_rename_column_survives_save_and_reopen() {
3✔
3113
        let path = tmp_path("alter_rename_col_roundtrip");
1✔
3114
        let mut db = seed_db();
1✔
3115
        save_database(&mut db, &path).expect("save");
2✔
3116

3117
        process_command(
3118
            "ALTER TABLE users RENAME COLUMN name TO full_name;",
3119
            &mut db,
3120
        )
3121
        .expect("rename column");
3122
        save_database(&mut db, &path).expect("save after rename");
1✔
3123

3124
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
3125
        let users = loaded.get_table("users".to_string()).unwrap();
2✔
3126
        assert!(users.contains_column("full_name".to_string()));
1✔
3127
        assert!(!users.contains_column("name".to_string()));
1✔
3128
        // Verify a row's value survived the rename round-trip.
3129
        let alice_rowid = users
1✔
3130
            .rowids()
3131
            .into_iter()
3132
            .find(|r| users.get_value("full_name", *r) == Some(Value::Text("alice".to_string())))
3✔
3133
            .expect("alice row should be findable under renamed column");
3134
        assert_eq!(
1✔
3135
            users.get_value("full_name", alice_rowid),
1✔
3136
            Some(Value::Text("alice".to_string()))
2✔
3137
        );
3138

3139
        cleanup(&path);
1✔
3140
    }
3141

3142
    #[test]
3143
    fn alter_add_column_with_default_survives_save_and_reopen() {
3✔
3144
        let path = tmp_path("alter_add_default_roundtrip");
1✔
3145
        let mut db = seed_db();
1✔
3146
        save_database(&mut db, &path).expect("save");
2✔
3147

3148
        process_command(
3149
            "ALTER TABLE users ADD COLUMN status TEXT DEFAULT 'active';",
3150
            &mut db,
3151
        )
3152
        .expect("add column");
3153
        save_database(&mut db, &path).expect("save after add");
1✔
3154

3155
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
3156
        let users = loaded.get_table("users".to_string()).unwrap();
2✔
3157
        assert!(users.contains_column("status".to_string()));
1✔
3158
        for rowid in users.rowids() {
2✔
3159
            assert_eq!(
1✔
3160
                users.get_value("status", rowid),
1✔
3161
                Some(Value::Text("active".to_string())),
2✔
3162
                "backfilled default should round-trip for rowid {rowid}"
3163
            );
3164
        }
3165
        // The DEFAULT clause itself should still be on the column metadata
3166
        // so a subsequent INSERT picks it up.
3167
        let status_col = users
3✔
3168
            .columns
3169
            .iter()
3170
            .find(|c| c.column_name == "status")
3✔
3171
            .unwrap();
3172
        assert_eq!(status_col.default, Some(Value::Text("active".to_string())));
1✔
3173

3174
        cleanup(&path);
1✔
3175
    }
3176

3177
    #[test]
3178
    fn alter_drop_column_survives_save_and_reopen() {
3✔
3179
        let path = tmp_path("alter_drop_col_roundtrip");
1✔
3180
        let mut db = seed_db();
1✔
3181
        save_database(&mut db, &path).expect("save");
2✔
3182

3183
        process_command("ALTER TABLE users DROP COLUMN age;", &mut db).expect("drop column");
1✔
3184
        save_database(&mut db, &path).expect("save after drop");
1✔
3185

3186
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
3187
        let users = loaded.get_table("users".to_string()).unwrap();
2✔
3188
        assert!(!users.contains_column("age".to_string()));
1✔
3189
        assert!(users.contains_column("name".to_string()));
2✔
3190

3191
        cleanup(&path);
1✔
3192
    }
3193

3194
    #[test]
3195
    fn drop_table_survives_save_and_reopen() {
3✔
3196
        let path = tmp_path("drop_table_roundtrip");
1✔
3197
        let mut db = seed_db();
1✔
3198
        save_database(&mut db, &path).expect("save");
2✔
3199

3200
        // Verify both tables landed.
3201
        {
3202
            let loaded = open_database(&path, "t".to_string()).expect("open");
1✔
3203
            assert!(loaded.contains_table("users".to_string()));
2✔
3204
            assert!(loaded.contains_table("notes".to_string()));
1✔
3205
        }
3206

3207
        process_command("DROP TABLE users;", &mut db).expect("drop users");
1✔
3208
        save_database(&mut db, &path).expect("save after drop");
1✔
3209

3210
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
UNCOV
3211
        assert!(
×
3212
            !loaded.contains_table("users".to_string()),
2✔
3213
            "dropped table should not resurface on reopen"
3214
        );
UNCOV
3215
        assert!(
×
3216
            loaded.contains_table("notes".to_string()),
2✔
3217
            "untouched table should survive"
3218
        );
3219

3220
        cleanup(&path);
2✔
3221
    }
3222

3223
    #[test]
3224
    fn drop_index_survives_save_and_reopen() {
3✔
3225
        let path = tmp_path("drop_index_roundtrip");
1✔
3226
        let mut db = Database::new("t".to_string());
2✔
3227
        process_command(
3228
            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
3229
            &mut db,
3230
        )
3231
        .unwrap();
3232
        process_command("CREATE INDEX notes_body_idx ON notes (body);", &mut db).unwrap();
1✔
3233
        save_database(&mut db, &path).expect("save");
1✔
3234

3235
        process_command("DROP INDEX notes_body_idx;", &mut db).unwrap();
1✔
3236
        save_database(&mut db, &path).expect("save after drop");
1✔
3237

3238
        let loaded = open_database(&path, "t".to_string()).expect("reopen");
1✔
3239
        let notes = loaded.get_table("notes".to_string()).unwrap();
2✔
UNCOV
3240
        assert!(
×
3241
            notes.index_by_name("notes_body_idx").is_none(),
1✔
3242
            "dropped index should not resurface on reopen"
3243
        );
3244
        // The auto-index for the PK should still be there.
3245
        assert!(notes.index_by_name("sqlrite_autoindex_notes_id").is_some());
2✔
3246

3247
        cleanup(&path);
1✔
3248
    }
3249

3250
    #[test]
3251
    fn default_clause_survives_save_and_reopen() {
3✔
3252
        let path = tmp_path("default_roundtrip");
1✔
3253
        let mut db = Database::new("t".to_string());
2✔
3254

3255
        process_command(
3256
            "CREATE TABLE users (id INTEGER PRIMARY KEY, status TEXT DEFAULT 'active', score INTEGER DEFAULT 0);",
3257
            &mut db,
3258
        )
3259
        .unwrap();
3260
        save_database(&mut db, &path).expect("save");
1✔
3261

3262
        let mut loaded = open_database(&path, "t".to_string()).expect("open");
1✔
3263

3264
        // The reloaded column metadata should still carry the DEFAULT.
3265
        let users = loaded.get_table("users".to_string()).expect("users table");
2✔
3266
        let status_col = users
3✔
3267
            .columns
3268
            .iter()
3269
            .find(|c| c.column_name == "status")
3✔
3270
            .expect("status column");
3271
        assert_eq!(
2✔
3272
            status_col.default,
3273
            Some(Value::Text("active".to_string())),
1✔
3274
            "DEFAULT 'active' should round-trip"
3275
        );
3276
        let score_col = users
3✔
3277
            .columns
3278
            .iter()
3279
            .find(|c| c.column_name == "score")
3✔
3280
            .expect("score column");
3281
        assert_eq!(
1✔
3282
            score_col.default,
3283
            Some(Value::Integer(0)),
3284
            "DEFAULT 0 should round-trip"
3285
        );
3286

3287
        // Now exercise the runtime path: an INSERT that omits both DEFAULT
3288
        // columns should pick them up from the reloaded schema.
3289
        process_command("INSERT INTO users (id) VALUES (1);", &mut loaded).unwrap();
2✔
3290
        let users = loaded.get_table("users".to_string()).unwrap();
1✔
3291
        assert_eq!(
1✔
3292
            users.get_value("status", 1),
1✔
3293
            Some(Value::Text("active".to_string()))
2✔
3294
        );
3295
        assert_eq!(users.get_value("score", 1), Some(Value::Integer(0)));
1✔
3296

3297
        cleanup(&path);
1✔
3298
    }
3299

3300
    // ---------------------------------------------------------------------
3301
    // SQLR-6 — free-list + VACUUM tests
3302
    // ---------------------------------------------------------------------
3303

3304
    /// Drop a table; subsequent CREATE TABLE should reuse the freed pages
3305
    /// rather than extending the file. The page_count after drop+create
3306
    /// should be at most what it was after the original two tables —
3307
    /// proving the new table landed on freelist pages.
3308
    #[test]
3309
    fn drop_table_freelist_persists_pages_for_reuse() {
3✔
3310
        let path = tmp_path("freelist_reuse");
1✔
3311
        let mut db = seed_db();
1✔
3312
        db.source_path = Some(path.clone());
2✔
3313
        save_database(&mut db, &path).expect("save");
1✔
3314
        let pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
1✔
3315

3316
        // Drop one table; its pages go on the freelist.
3317
        process_command("DROP TABLE users;", &mut db).expect("drop users");
1✔
3318
        let pages_after_drop = db.pager.as_ref().unwrap().header().page_count;
1✔
3319
        assert_eq!(
1✔
3320
            pages_after_drop, pages_two_tables,
3321
            "page_count should not shrink on drop — the freed pages persist on the freelist"
3322
        );
3323
        let head_after_drop = db.pager.as_ref().unwrap().header().freelist_head;
2✔
UNCOV
3324
        assert!(
×
3325
            head_after_drop != 0,
1✔
3326
            "freelist_head must be non-zero after drop"
3327
        );
3328

3329
        // Re-create a similar-shaped table; should reuse freelist pages.
3330
        process_command(
3331
            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3332
            &mut db,
3333
        )
3334
        .expect("create accounts");
3335
        process_command("INSERT INTO accounts (label) VALUES ('a');", &mut db).unwrap();
1✔
3336
        process_command("INSERT INTO accounts (label) VALUES ('b');", &mut db).unwrap();
1✔
3337
        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
1✔
UNCOV
3338
        assert!(
×
3339
            pages_after_create <= pages_two_tables + 2,
1✔
3340
            "creating a similar-sized table after a drop should mostly draw from the \
3341
             freelist, not extend the file (got {pages_after_create} > {pages_two_tables} + 2)"
3342
        );
3343

3344
        cleanup(&path);
2✔
3345
    }
3346

3347
    /// `VACUUM;` after a drop must shrink the file and clear the freelist.
3348
    #[test]
3349
    fn drop_then_vacuum_shrinks_file() {
3✔
3350
        let path = tmp_path("vacuum_shrinks");
1✔
3351
        let mut db = seed_db();
1✔
3352
        db.source_path = Some(path.clone());
2✔
3353
        // Add a few more rows to make the dropped table bigger.
3354
        for i in 0..20 {
1✔
3355
            process_command(
3356
                &format!("INSERT INTO users (name, age) VALUES ('user{i}', {i});"),
2✔
3357
                &mut db,
3358
            )
3359
            .unwrap();
3360
        }
3361
        save_database(&mut db, &path).expect("save");
1✔
3362

3363
        process_command("DROP TABLE users;", &mut db).expect("drop");
1✔
3364
        let size_before_vacuum = std::fs::metadata(&path).unwrap().len();
1✔
3365
        let pages_before_vacuum = db.pager.as_ref().unwrap().header().page_count;
1✔
3366
        let head_before = db.pager.as_ref().unwrap().header().freelist_head;
1✔
3367
        assert!(head_before != 0, "drop should populate the freelist");
1✔
3368

3369
        // VACUUM (via process_command) checkpoints internally so the
3370
        // file actually shrinks on disk before we observe its size.
3371
        process_command("VACUUM;", &mut db).expect("vacuum");
2✔
3372

3373
        let size_after = std::fs::metadata(&path).unwrap().len();
1✔
3374
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3375
        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
1✔
UNCOV
3376
        assert!(
×
3377
            pages_after < pages_before_vacuum,
1✔
3378
            "VACUUM must reduce page_count: was {pages_before_vacuum}, now {pages_after}"
3379
        );
3380
        assert_eq!(head_after, 0, "VACUUM must clear the freelist");
1✔
UNCOV
3381
        assert!(
×
3382
            size_after < size_before_vacuum,
1✔
3383
            "VACUUM must shrink the file on disk: was {size_before_vacuum} bytes, now {size_after}"
3384
        );
3385

3386
        cleanup(&path);
2✔
3387
    }
3388

3389
    /// VACUUM on a non-empty multi-table DB must not lose any rows.
3390
    #[test]
3391
    fn vacuum_round_trips_data() {
3✔
3392
        let path = tmp_path("vacuum_round_trip");
1✔
3393
        let mut db = seed_db();
1✔
3394
        db.source_path = Some(path.clone());
2✔
3395
        save_database(&mut db, &path).expect("save");
1✔
3396
        process_command("VACUUM;", &mut db).expect("vacuum");
1✔
3397

3398
        // Re-open from disk to make sure the on-disk catalog round-trips.
3399
        drop(db);
1✔
3400
        let loaded = open_database(&path, "t".to_string()).expect("reopen after vacuum");
1✔
3401
        assert!(loaded.contains_table("users".to_string()));
2✔
3402
        assert!(loaded.contains_table("notes".to_string()));
1✔
3403
        let users = loaded.get_table("users".to_string()).unwrap();
1✔
3404
        // seed_db inserts two users.
3405
        assert_eq!(users.rowids().len(), 2);
1✔
3406

3407
        cleanup(&path);
1✔
3408
    }
3409

3410
    /// Format version is bumped to v6 only after a save that creates a
3411
    /// non-empty freelist. VACUUM clears the freelist but doesn't
3412
    /// downgrade — v6 is a strict superset, so once at v6 we stay.
3413
    #[test]
3414
    fn freelist_format_version_promotion() {
3✔
3415
        use crate::sql::pager::header::{FORMAT_VERSION_BASELINE, FORMAT_VERSION_V6};
3416
        let path = tmp_path("v6_promotion");
1✔
3417
        let mut db = seed_db();
1✔
3418
        db.source_path = Some(path.clone());
2✔
3419
        save_database(&mut db, &path).expect("save");
1✔
3420
        let v_after_save = db.pager.as_ref().unwrap().header().format_version;
1✔
3421
        assert_eq!(
1✔
3422
            v_after_save, FORMAT_VERSION_BASELINE,
3423
            "fresh DB without drops should stay at the baseline version"
3424
        );
3425

3426
        process_command("DROP TABLE users;", &mut db).expect("drop");
2✔
3427
        let v_after_drop = db.pager.as_ref().unwrap().header().format_version;
1✔
3428
        assert_eq!(
1✔
3429
            v_after_drop, FORMAT_VERSION_V6,
3430
            "first save with a non-empty freelist must promote to V6"
3431
        );
3432

3433
        process_command("VACUUM;", &mut db).expect("vacuum");
2✔
3434
        let v_after_vacuum = db.pager.as_ref().unwrap().header().format_version;
1✔
3435
        assert_eq!(
1✔
3436
            v_after_vacuum, FORMAT_VERSION_V6,
3437
            "VACUUM must not downgrade — V6 is a strict superset"
3438
        );
3439

3440
        cleanup(&path);
2✔
3441
    }
3442

3443
    /// Freelist persists across reopen: drop, save, close, reopen,
3444
    /// confirm the next CREATE TABLE re-uses pages from the persisted
3445
    /// freelist (rather than extending the file).
3446
    #[test]
3447
    fn freelist_round_trip_through_reopen() {
3✔
3448
        let path = tmp_path("freelist_reopen");
1✔
3449
        let pages_two_tables;
3450
        {
3451
            let mut db = seed_db();
1✔
3452
            db.source_path = Some(path.clone());
2✔
3453
            save_database(&mut db, &path).expect("save");
1✔
3454
            pages_two_tables = db.pager.as_ref().unwrap().header().page_count;
1✔
3455
            process_command("DROP TABLE users;", &mut db).expect("drop");
1✔
3456
            let head = db.pager.as_ref().unwrap().header().freelist_head;
1✔
3457
            assert!(head != 0, "drop must populate the freelist");
1✔
3458
        }
3459

3460
        // Reopen from disk — the freelist must come back.
3461
        let mut db = open_database(&path, "t".to_string()).expect("reopen");
1✔
UNCOV
3462
        assert!(
×
3463
            db.pager.as_ref().unwrap().header().freelist_head != 0,
2✔
3464
            "freelist_head must survive close/reopen"
3465
        );
3466

3467
        process_command(
3468
            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT NOT NULL UNIQUE);",
3469
            &mut db,
3470
        )
3471
        .expect("create accounts");
3472
        process_command("INSERT INTO accounts (label) VALUES ('reopened');", &mut db).unwrap();
1✔
3473
        let pages_after_create = db.pager.as_ref().unwrap().header().page_count;
1✔
UNCOV
3474
        assert!(
×
3475
            pages_after_create <= pages_two_tables + 2,
1✔
3476
            "post-reopen create should reuse freelist (got {pages_after_create} > \
3477
             {pages_two_tables} + 2 — file extended instead of reusing)"
3478
        );
3479

3480
        cleanup(&path);
2✔
3481
    }
3482

3483
    /// VACUUM inside an explicit transaction must error before touching the
3484
    /// disk. `BEGIN; VACUUM;` is the documented rejection path.
3485
    #[test]
3486
    fn vacuum_inside_transaction_is_rejected() {
3✔
3487
        let path = tmp_path("vacuum_txn");
1✔
3488
        let mut db = seed_db();
1✔
3489
        db.source_path = Some(path.clone());
2✔
3490
        save_database(&mut db, &path).expect("save");
1✔
3491

3492
        process_command("BEGIN;", &mut db).expect("begin");
1✔
3493
        let err = process_command("VACUUM;", &mut db).unwrap_err();
1✔
UNCOV
3494
        assert!(
×
3495
            format!("{err}").contains("VACUUM cannot run inside a transaction"),
3✔
3496
            "expected in-transaction rejection, got: {err}"
3497
        );
3498
        // Roll back to leave the DB in a clean state.
3499
        process_command("ROLLBACK;", &mut db).unwrap();
1✔
3500
        cleanup(&path);
1✔
3501
    }
3502

3503
    /// VACUUM on an in-memory database is a documented no-op.
3504
    #[test]
3505
    fn vacuum_on_in_memory_database_is_noop() {
3✔
3506
        let mut db = Database::new("mem".to_string());
1✔
3507
        process_command("CREATE TABLE t (id INTEGER PRIMARY KEY);", &mut db).unwrap();
2✔
3508
        let out = process_command("VACUUM;", &mut db).expect("vacuum no-op");
1✔
UNCOV
3509
        assert!(
×
3510
            out.to_lowercase().contains("no-op") || out.to_lowercase().contains("in-memory"),
2✔
3511
            "expected no-op message for in-memory VACUUM, got: {out}"
3512
        );
3513
    }
3514

3515
    /// Untouched tables shouldn't write any pages on the save that
3516
    /// follows a DROP of an unrelated table. Confirms the per-table
3517
    /// preferred pool keeps page numbers stable so the diff pager skips
3518
    /// every byte-identical leaf.
3519
    #[test]
3520
    fn unchanged_table_pages_skip_diff_after_unrelated_drop() {
3✔
3521
        // Need three tables so dropping one in the middle still leaves
3522
        // an "unrelated" alphabetical neighbour. Layout pre-drop (sorted):
3523
        //   accounts, notes, users
3524
        // Drop `notes`. `accounts` and `users` should keep their pages.
3525
        let path = tmp_path("diff_after_drop");
1✔
3526
        let mut db = Database::new("t".to_string());
2✔
3527
        db.source_path = Some(path.clone());
2✔
3528
        process_command(
3529
            "CREATE TABLE accounts (id INTEGER PRIMARY KEY, label TEXT);",
3530
            &mut db,
3531
        )
3532
        .unwrap();
3533
        process_command(
3534
            "CREATE TABLE notes (id INTEGER PRIMARY KEY, body TEXT);",
3535
            &mut db,
3536
        )
3537
        .unwrap();
3538
        process_command(
3539
            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
3540
            &mut db,
3541
        )
3542
        .unwrap();
3543
        for i in 0..5 {
1✔
3544
            process_command(
3545
                &format!("INSERT INTO accounts (label) VALUES ('a{i}');"),
2✔
3546
                &mut db,
3547
            )
3548
            .unwrap();
3549
            process_command(
3550
                &format!("INSERT INTO notes (body) VALUES ('n{i}');"),
1✔
3551
                &mut db,
3552
            )
3553
            .unwrap();
3554
            process_command(
3555
                &format!("INSERT INTO users (name) VALUES ('u{i}');"),
1✔
3556
                &mut db,
3557
            )
3558
            .unwrap();
3559
        }
3560
        save_database(&mut db, &path).expect("baseline save");
1✔
3561

3562
        // Capture page bytes for `accounts` and `users` so we can
3563
        // verify they don't change.
3564
        let pager = db.pager.as_ref().unwrap();
1✔
3565
        let acc_root = read_old_rootpages(pager, pager.header().schema_root_page)
2✔
3566
            .unwrap()
3567
            .get(&("table".to_string(), "accounts".to_string()))
2✔
3568
            .copied()
3569
            .unwrap();
3570
        let users_root = read_old_rootpages(pager, pager.header().schema_root_page)
2✔
3571
            .unwrap()
3572
            .get(&("table".to_string(), "users".to_string()))
2✔
3573
            .copied()
3574
            .unwrap();
3575
        let acc_bytes_before: Vec<u8> = pager.read_page(acc_root).unwrap().to_vec();
1✔
3576
        let users_bytes_before: Vec<u8> = pager.read_page(users_root).unwrap().to_vec();
2✔
3577

3578
        // Drop the middle table.
3579
        process_command("DROP TABLE notes;", &mut db).expect("drop notes");
2✔
3580

3581
        let pager = db.pager.as_ref().unwrap();
1✔
3582
        // `accounts` and `users` should still live at the same pages
3583
        // with byte-identical content.
3584
        let acc_after = pager.read_page(acc_root).unwrap();
1✔
3585
        let users_after = pager.read_page(users_root).unwrap();
1✔
3586
        assert_eq!(
1✔
3587
            &acc_after[..],
1✔
3588
            &acc_bytes_before[..],
1✔
3589
            "accounts root page must not be rewritten when an unrelated table is dropped"
3590
        );
3591
        assert_eq!(
1✔
3592
            &users_after[..],
2✔
3593
            &users_bytes_before[..],
1✔
3594
            "users root page must not be rewritten when an unrelated table is dropped"
3595
        );
3596

3597
        cleanup(&path);
2✔
3598
    }
3599

3600
    // ---- SQLR-10: auto-VACUUM trigger after page-releasing DDL ----
3601

3602
    /// Builds a file-backed DB with one small "keep" table and one
3603
    /// large "bloat" table, sized so the post-drop freelist will
3604
    /// comfortably cross the default 25% threshold and the
3605
    /// `MIN_PAGES_FOR_AUTO_VACUUM` floor (16 pages). Used by the
3606
    /// auto-VACUUM happy-path tests.
3607
    fn auto_vacuum_setup(path: &std::path::Path) -> Database {
1✔
3608
        let mut db = Database::new("av".to_string());
1✔
3609
        db.source_path = Some(path.to_path_buf());
2✔
3610
        process_command(
3611
            "CREATE TABLE keep (id INTEGER PRIMARY KEY, n INTEGER);",
3612
            &mut db,
3613
        )
3614
        .unwrap();
3615
        process_command("INSERT INTO keep (n) VALUES (1);", &mut db).unwrap();
1✔
3616
        process_command(
3617
            "CREATE TABLE bloat (id INTEGER PRIMARY KEY, payload TEXT);",
3618
            &mut db,
3619
        )
3620
        .unwrap();
3621
        // Wrap the bulk insert in a transaction so we pay one save at
3622
        // COMMIT instead of 5000 round-trips through auto-save.
3623
        process_command("BEGIN;", &mut db).unwrap();
1✔
3624
        for i in 0..5000 {
1✔
3625
            process_command(
3626
                &format!("INSERT INTO bloat (payload) VALUES ('p-{i:08}');"),
2✔
3627
                &mut db,
3628
            )
3629
            .unwrap();
3630
        }
3631
        process_command("COMMIT;", &mut db).unwrap();
1✔
3632
        db
1✔
3633
    }
3634

3635
    /// Default threshold (0.25) is engaged for fresh `Database`s and
3636
    /// fires when a `DROP TABLE` orphans enough pages — file shrinks
3637
    /// without anyone calling `VACUUM;`.
3638
    #[test]
3639
    fn auto_vacuum_default_threshold_triggers_on_drop_table() {
3✔
3640
        let path = tmp_path("av_default_drop_table");
1✔
3641
        let mut db = auto_vacuum_setup(&path);
2✔
3642
        // Sanity: setup respects the shipped default.
3643
        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
2✔
3644

3645
        // Checkpoint before measuring `size_before` so the bloat actually
3646
        // lives in the main file and not just the WAL — otherwise
3647
        // `size_before` is the bare 2-page header and any post-vacuum
3648
        // checkpoint will look like the file *grew*.
3649
        if let Some(p) = db.pager.as_mut() {
1✔
3650
            let _ = p.checkpoint();
2✔
3651
        }
3652
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
2✔
3653
        let size_before = std::fs::metadata(&path).unwrap().len();
1✔
UNCOV
3654
        assert!(
×
3655
            pages_before >= MIN_PAGES_FOR_AUTO_VACUUM,
1✔
3656
            "setup should produce >= MIN_PAGES_FOR_AUTO_VACUUM ({MIN_PAGES_FOR_AUTO_VACUUM}) \
3657
             pages so the floor doesn't suppress the trigger; got {pages_before}"
3658
        );
3659

3660
        // Drop the bloat table — freelist should pass 25% of page_count
3661
        // and the auto-VACUUM hook should compact in place. Note: no
3662
        // explicit `VACUUM;` statement is issued.
3663
        process_command("DROP TABLE bloat;", &mut db).expect("drop");
2✔
3664

3665
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3666
        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
1✔
3667
        // Second checkpoint so the post-vacuum file shrinks on disk
3668
        // (auto-VACUUM stages the compact through WAL just like manual
3669
        // VACUUM does).
3670
        if let Some(p) = db.pager.as_mut() {
1✔
3671
            let _ = p.checkpoint();
2✔
3672
        }
3673
        let size_after = std::fs::metadata(&path).unwrap().len();
2✔
3674

UNCOV
3675
        assert!(
×
3676
            pages_after < pages_before,
1✔
3677
            "auto-VACUUM must reduce page_count: was {pages_before}, now {pages_after}"
3678
        );
3679
        assert_eq!(head_after, 0, "auto-VACUUM must clear the freelist");
1✔
UNCOV
3680
        assert!(
×
3681
            size_after < size_before,
1✔
3682
            "auto-VACUUM must shrink the file on disk: was {size_before}, now {size_after}"
3683
        );
3684

3685
        cleanup(&path);
2✔
3686
    }
3687

3688
    /// Setting the threshold to `None` disables the trigger entirely:
3689
    /// the same workload that shrinks under the default leaves the file
3690
    /// at its high-water mark.
3691
    #[test]
3692
    fn auto_vacuum_disabled_keeps_file_at_hwm() {
3✔
3693
        let path = tmp_path("av_disabled");
1✔
3694
        let mut db = auto_vacuum_setup(&path);
2✔
3695
        db.set_auto_vacuum_threshold(None).expect("disable");
2✔
3696
        assert_eq!(db.auto_vacuum_threshold(), None);
1✔
3697

3698
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
1✔
3699

3700
        process_command("DROP TABLE bloat;", &mut db).expect("drop");
1✔
3701

3702
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3703
        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
1✔
3704
        assert_eq!(
1✔
3705
            pages_after, pages_before,
3706
            "with auto-VACUUM disabled, drop must keep page_count at the HWM"
3707
        );
UNCOV
3708
        assert!(
×
3709
            head_after != 0,
1✔
3710
            "drop must still populate the freelist (manual VACUUM would be needed to reclaim)"
3711
        );
3712

3713
        cleanup(&path);
2✔
3714
    }
3715

3716
    /// `DROP INDEX` is the second of three page-releasing DDL paths
3717
    /// covered by SQLR-10. We bloat the freelist via a separate
3718
    /// `DROP TABLE` first (with auto-VACUUM disabled so it doesn't
3719
    /// compact early), then re-arm the trigger and drop a small index
3720
    /// — the cumulative freelist crosses 25% on the index drop and
3721
    /// auto-VACUUM fires.
3722
    ///
3723
    /// The detour around bloat is necessary because building a
3724
    /// secondary index on a 5000-row column would need multi-level
3725
    /// interior nodes, and the cell-decoder's interior-page support
3726
    /// is a separate work item from SQLR-10.
3727
    #[test]
3728
    fn auto_vacuum_triggers_on_drop_index() {
3✔
3729
        let path = tmp_path("av_drop_index");
1✔
3730
        let mut db = auto_vacuum_setup(&path);
2✔
3731

3732
        // Phase 1: drop the bloat table with auto-VACUUM disabled so
3733
        // its pages land on the freelist without being reclaimed.
3734
        db.set_auto_vacuum_threshold(None).expect("disable");
2✔
3735
        process_command("DROP TABLE bloat;", &mut db).expect("drop bloat");
1✔
3736
        let pages_after_bloat_drop = db.pager.as_ref().unwrap().header().page_count;
1✔
3737
        let head_after_bloat_drop = db.pager.as_ref().unwrap().header().freelist_head;
1✔
UNCOV
3738
        assert!(
×
3739
            head_after_bloat_drop != 0,
1✔
3740
            "bloat drop must populate the freelist (else later index drop won't trip the threshold)"
3741
        );
3742

3743
        // Phase 2: a small index on the surviving `keep` table. The
3744
        // index reuses one page from the freelist (which is fine —
3745
        // freelist still holds plenty more).
3746
        process_command("CREATE INDEX idx_keep_n ON keep (n);", &mut db).expect("create idx");
2✔
3747

3748
        // Phase 3: re-arm the trigger and drop the index. The freelist
3749
        // is already heavily populated from phase 1; this drop just
3750
        // adds the index page on top, keeping the ratio well above
3751
        // 25%, so auto-VACUUM should fire.
3752
        db.set_auto_vacuum_threshold(Some(0.25)).expect("re-arm");
1✔
3753
        process_command("DROP INDEX idx_keep_n;", &mut db).expect("drop index");
1✔
3754

3755
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3756
        let head_after = db.pager.as_ref().unwrap().header().freelist_head;
1✔
UNCOV
3757
        assert!(
×
3758
            pages_after < pages_after_bloat_drop,
1✔
3759
            "DROP INDEX should fire auto-VACUUM and reduce page_count: \
3760
             was {pages_after_bloat_drop}, now {pages_after}"
3761
        );
3762
        assert_eq!(
1✔
3763
            head_after, 0,
3764
            "auto-VACUUM after DROP INDEX must clear the freelist"
3765
        );
3766

3767
        cleanup(&path);
2✔
3768
    }
3769

3770
    /// `ALTER TABLE … DROP COLUMN` releases pages too — the third path
3771
    /// the SQLR-10 trigger covers.
3772
    #[test]
3773
    fn auto_vacuum_triggers_on_alter_drop_column() {
3✔
3774
        let path = tmp_path("av_alter_drop_col");
1✔
3775
        let mut db = auto_vacuum_setup(&path);
2✔
3776
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
2✔
3777

3778
        // Drop the wide `payload` column — this rewrites every row in
3779
        // `bloat` without the column, so the old leaf pages get freed.
3780
        process_command("ALTER TABLE bloat DROP COLUMN payload;", &mut db).expect("alter drop");
1✔
3781

3782
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
UNCOV
3783
        assert!(
×
3784
            pages_after < pages_before,
1✔
3785
            "ALTER TABLE DROP COLUMN should fire auto-VACUUM and reduce page_count: \
3786
             was {pages_before}, now {pages_after}"
3787
        );
3788
        assert_eq!(db.pager.as_ref().unwrap().header().freelist_head, 0);
2✔
3789

3790
        cleanup(&path);
1✔
3791
    }
3792

3793
    /// A high threshold (0.99) suppresses the trigger when the freelist
3794
    /// ratio is well below it — the file stays at HWM.
3795
    #[test]
3796
    fn auto_vacuum_skips_below_threshold() {
3✔
3797
        let path = tmp_path("av_below_threshold");
1✔
3798
        let mut db = auto_vacuum_setup(&path);
2✔
3799
        db.set_auto_vacuum_threshold(Some(0.99)).expect("set");
2✔
3800

3801
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
1✔
3802

3803
        process_command("DROP TABLE bloat;", &mut db).expect("drop");
1✔
3804

3805
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3806
        assert_eq!(
1✔
3807
            pages_after, pages_before,
3808
            "freelist ratio after a single drop is far below 0.99 — \
3809
             page_count must stay at the HWM"
3810
        );
UNCOV
3811
        assert!(
×
3812
            db.pager.as_ref().unwrap().header().freelist_head != 0,
2✔
3813
            "drop must still populate the freelist"
3814
        );
3815

3816
        cleanup(&path);
2✔
3817
    }
3818

3819
    /// Inside an explicit transaction, the page-releasing DDL doesn't
3820
    /// flush to disk yet — the freelist isn't accurate, so the trigger
3821
    /// must skip. The compact would also publish in-flight work out of
3822
    /// band, which is exactly what the manual `VACUUM;` rejection
3823
    /// inside a txn already prevents.
3824
    #[test]
3825
    fn auto_vacuum_skips_inside_transaction() {
3✔
3826
        let path = tmp_path("av_in_txn");
1✔
3827
        let mut db = auto_vacuum_setup(&path);
2✔
3828
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
2✔
3829

3830
        process_command("BEGIN;", &mut db).expect("begin");
1✔
3831
        process_command("DROP TABLE bloat;", &mut db).expect("drop in txn");
1✔
3832
        // Mid-transaction: no save has occurred, so the on-disk
3833
        // freelist_head must be unchanged and page_count must not have
3834
        // shifted from a sneaky compact.
3835
        let pages_mid = db.pager.as_ref().unwrap().header().page_count;
1✔
3836
        assert_eq!(
1✔
3837
            pages_mid, pages_before,
3838
            "auto-VACUUM must not fire mid-transaction"
3839
        );
3840

3841
        process_command("ROLLBACK;", &mut db).expect("rollback");
2✔
3842
        cleanup(&path);
1✔
3843
    }
3844

3845
    /// Tiny databases (under `MIN_PAGES_FOR_AUTO_VACUUM`) skip the
3846
    /// trigger even if the ratio would otherwise qualify — the cost of
3847
    /// rewriting a 64 KiB file isn't worth the few bytes reclaimed.
3848
    #[test]
3849
    fn auto_vacuum_skips_under_min_pages_floor() {
3✔
3850
        let path = tmp_path("av_under_floor");
1✔
3851
        let mut db = seed_db(); // small: just users + notes, ~5 pages
1✔
3852
        db.source_path = Some(path.clone());
2✔
3853
        save_database(&mut db, &path).expect("save");
1✔
3854
        // Confirm we're below the floor so the test is meaningful.
3855
        let pages_before = db.pager.as_ref().unwrap().header().page_count;
1✔
UNCOV
3856
        assert!(
×
3857
            pages_before < MIN_PAGES_FOR_AUTO_VACUUM,
1✔
3858
            "test setup is too large: floor would not apply (got {pages_before} pages, \
3859
             floor is {MIN_PAGES_FOR_AUTO_VACUUM})"
3860
        );
3861

3862
        process_command("DROP TABLE users;", &mut db).expect("drop");
2✔
3863

3864
        let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
3865
        assert_eq!(
1✔
3866
            pages_after, pages_before,
3867
            "below MIN_PAGES_FOR_AUTO_VACUUM, drop must not trigger compaction"
3868
        );
UNCOV
3869
        assert!(
×
3870
            db.pager.as_ref().unwrap().header().freelist_head != 0,
2✔
3871
            "drop must still populate the freelist normally"
3872
        );
3873

3874
        cleanup(&path);
2✔
3875
    }
3876

3877
    /// Setter rejects NaN, infinities, and values outside `0.0..=1.0`
3878
    /// rather than silently saturating.
3879
    #[test]
3880
    fn set_auto_vacuum_threshold_rejects_out_of_range() {
3✔
3881
        let mut db = Database::new("t".to_string());
1✔
3882
        for bad in [-0.01_f32, 1.01, f32::NAN, f32::INFINITY, f32::NEG_INFINITY] {
3✔
3883
            let err = db.set_auto_vacuum_threshold(Some(bad)).unwrap_err();
2✔
UNCOV
3884
            assert!(
×
3885
                format!("{err}").contains("auto_vacuum_threshold"),
3✔
3886
                "expected a typed range error for {bad}, got: {err}"
3887
            );
3888
        }
3889
        // The default survives the rejected sets unchanged.
3890
        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
1✔
3891
        // And valid values land.
3892
        db.set_auto_vacuum_threshold(Some(0.0)).unwrap();
1✔
3893
        assert_eq!(db.auto_vacuum_threshold(), Some(0.0));
1✔
3894
        db.set_auto_vacuum_threshold(Some(1.0)).unwrap();
1✔
3895
        assert_eq!(db.auto_vacuum_threshold(), Some(1.0));
1✔
3896
        db.set_auto_vacuum_threshold(None).unwrap();
1✔
3897
        assert_eq!(db.auto_vacuum_threshold(), None);
1✔
3898
    }
3899

3900
    // ---------------------------------------------------------------
3901
    // SQLR-13 — `PRAGMA auto_vacuum` SQL-level coverage. Mirrors the
3902
    // SQLR-10 setter tests above, but routed through SQL so SDK / FFI
3903
    // / MCP consumers (which can't reach the Rust setter directly)
3904
    // get the same guarantees.
3905
    // ---------------------------------------------------------------
3906

3907
    /// `PRAGMA auto_vacuum = N;` set + `PRAGMA auto_vacuum;` read
3908
    /// round-trip the threshold, observable via `auto_vacuum_threshold`.
3909
    #[test]
3910
    fn pragma_auto_vacuum_set_and_read_via_sql() {
3✔
3911
        let mut db = Database::new("t".to_string());
1✔
3912

3913
        let resp = process_command("PRAGMA auto_vacuum = 0.5;", &mut db).expect("set");
2✔
UNCOV
3914
        assert!(
×
3915
            resp.contains("PRAGMA"),
2✔
3916
            "set form should produce a PRAGMA status, got: {resp}"
3917
        );
3918
        assert_eq!(db.auto_vacuum_threshold(), Some(0.5));
2✔
3919

3920
        // Read form — status mentions a returned row.
3921
        let resp = process_command("PRAGMA auto_vacuum;", &mut db).expect("read");
1✔
3922
        assert!(resp.contains("1 row"), "expected a 1-row read, got: {resp}");
2✔
3923
    }
3924

3925
    /// `PRAGMA auto_vacuum = OFF;` (bare identifier — sqlparser's own
3926
    /// pragma-value parser would reject this, the SQLR-13 dispatcher
3927
    /// must accept it) and `= NONE;` both disable the trigger. So does
3928
    /// the quoted form `'OFF'`.
3929
    #[test]
3930
    fn pragma_auto_vacuum_off_disables_trigger() {
3✔
3931
        for raw in ["OFF", "off", "NONE", "none", "'OFF'", "'NONE'"] {
2✔
3932
            let mut db = Database::new("t".to_string());
2✔
3933
            assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
2✔
3934

3935
            let stmt = format!("PRAGMA auto_vacuum = {raw};");
1✔
3936
            process_command(&stmt, &mut db)
2✔
3937
                .unwrap_or_else(|e| panic!("`{stmt}` should disable: {e}"));
1✔
3938
            assert_eq!(
1✔
3939
                db.auto_vacuum_threshold(),
1✔
3940
                None,
3941
                "`{stmt}` should clear the threshold"
3942
            );
3943
        }
3944
    }
3945

3946
    /// Out-of-range numeric values surface as a typed error via the
3947
    /// shared `set_auto_vacuum_threshold` validator — no silent
3948
    /// saturation. Mirrors the SQLR-10 setter coverage.
3949
    #[test]
3950
    fn pragma_auto_vacuum_rejects_out_of_range_via_sql() {
3✔
3951
        let mut db = Database::new("t".to_string());
1✔
3952
        for bad in ["-0.01", "1.01", "1.5"] {
3✔
3953
            let stmt = format!("PRAGMA auto_vacuum = {bad};");
2✔
3954
            let err = process_command(&stmt, &mut db).unwrap_err();
2✔
UNCOV
3955
            assert!(
×
3956
                format!("{err}").contains("auto_vacuum_threshold"),
3✔
3957
                "expected range error for `{stmt}`, got: {err}"
3958
            );
3959
        }
3960
        // Default survives all the rejected sets.
3961
        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
1✔
3962
    }
3963

3964
    /// Junk strings (anything that isn't a number or `OFF`/`NONE`) are
3965
    /// rejected at parse time with a typed error, not silently treated
3966
    /// as "disable".
3967
    #[test]
3968
    fn pragma_auto_vacuum_rejects_unknown_strings_via_sql() {
3✔
3969
        let mut db = Database::new("t".to_string());
1✔
3970
        let err = process_command("PRAGMA auto_vacuum = WAL;", &mut db).unwrap_err();
2✔
UNCOV
3971
        assert!(
×
3972
            format!("{err}").contains("OFF/NONE"),
3✔
3973
            "expected OFF/NONE-style error, got: {err}"
3974
        );
3975
        // Default unaffected.
3976
        assert_eq!(db.auto_vacuum_threshold(), Some(0.25));
1✔
3977
    }
3978

3979
    /// Pragmas SQLRite doesn't know about return `NotImplemented` —
3980
    /// not a generic parser error. Future pragmas plug in here.
3981
    /// (Phase 11.3 made `journal_mode` a recognised pragma; this
3982
    /// test uses a name that's still unsupported.)
3983
    #[test]
3984
    fn pragma_unknown_returns_not_implemented() {
3✔
3985
        let mut db = Database::new("t".to_string());
1✔
3986
        let err = process_command("PRAGMA synchronous = NORMAL;", &mut db).unwrap_err();
2✔
UNCOV
3987
        assert!(
×
3988
            matches!(err, SQLRiteError::NotImplemented(_)),
1✔
3989
            "unknown pragma must surface NotImplemented, got: {err:?}"
3990
        );
3991
    }
3992

3993
    /// Setting the threshold via SQL must produce identical behavior to
3994
    /// the Rust setter on the actual auto-VACUUM trigger: `= 0.99`
3995
    /// suppresses, `= OFF` disables, default fires. Sanity-checks that
3996
    /// `process_command_with_render`'s pre-parse step doesn't desync
3997
    /// the in-memory state from the file.
3998
    #[test]
3999
    fn pragma_auto_vacuum_drives_real_trigger() {
3✔
4000
        // Sub-case A — `PRAGMA auto_vacuum = OFF;` keeps file at HWM.
4001
        {
4002
            let path = tmp_path("av_pragma_off");
1✔
4003
            let mut db = auto_vacuum_setup(&path);
2✔
4004
            process_command("PRAGMA auto_vacuum = OFF;", &mut db).expect("disable via PRAGMA");
2✔
4005
            assert_eq!(db.auto_vacuum_threshold(), None);
1✔
4006

4007
            let pages_before = db.pager.as_ref().unwrap().header().page_count;
1✔
4008
            process_command("DROP TABLE bloat;", &mut db).expect("drop");
1✔
4009
            let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
4010
            assert_eq!(
1✔
4011
                pages_after, pages_before,
4012
                "PRAGMA-driven OFF must keep page_count at the HWM"
4013
            );
4014
            cleanup(&path);
2✔
4015
        }
4016

4017
        // Sub-case B — high threshold via PRAGMA suppresses the
4018
        // trigger on a single drop.
4019
        {
4020
            let path = tmp_path("av_pragma_high");
1✔
4021
            let mut db = auto_vacuum_setup(&path);
2✔
4022
            process_command("PRAGMA auto_vacuum = 0.99;", &mut db).expect("set high");
2✔
4023
            assert_eq!(db.auto_vacuum_threshold(), Some(0.99));
1✔
4024

4025
            let pages_before = db.pager.as_ref().unwrap().header().page_count;
1✔
4026
            process_command("DROP TABLE bloat;", &mut db).expect("drop");
1✔
4027
            let pages_after = db.pager.as_ref().unwrap().header().page_count;
1✔
4028
            assert_eq!(
1✔
4029
                pages_after, pages_before,
4030
                "high PRAGMA threshold must suppress the trigger"
4031
            );
4032
            cleanup(&path);
2✔
4033
        }
4034

4035
        // Sub-case C — re-arm via PRAGMA after disable: the trigger
4036
        // fires again on the next page-releasing DDL.
4037
        {
4038
            let path = tmp_path("av_pragma_rearm");
1✔
4039
            let mut db = auto_vacuum_setup(&path);
2✔
4040
            process_command("PRAGMA auto_vacuum = OFF;", &mut db).unwrap();
2✔
4041
            // Drop with the trigger off — pages land on the freelist
4042
            // but the file stays at HWM.
4043
            process_command("DROP TABLE bloat;", &mut db).unwrap();
1✔
4044
            let pages_after_off_drop = db.pager.as_ref().unwrap().header().page_count;
1✔
4045
            assert!(db.pager.as_ref().unwrap().header().freelist_head != 0);
1✔
4046

4047
            // Re-arm via PRAGMA, then drop one more thing — the
4048
            // accumulated freelist still exceeds 25%, so auto-VACUUM
4049
            // fires.
4050
            process_command("PRAGMA auto_vacuum = 0.25;", &mut db).expect("re-arm");
1✔
4051
            process_command("CREATE INDEX idx_keep_n ON keep (n);", &mut db).unwrap();
1✔
4052
            process_command("DROP INDEX idx_keep_n;", &mut db).expect("drop index");
1✔
4053

4054
            let pages_after_rearm = db.pager.as_ref().unwrap().header().page_count;
1✔
UNCOV
4055
            assert!(
×
4056
                pages_after_rearm < pages_after_off_drop,
1✔
4057
                "re-armed PRAGMA must let auto-VACUUM fire: was {pages_after_off_drop}, \
4058
                 now {pages_after_rearm}"
4059
            );
4060
            assert_eq!(db.pager.as_ref().unwrap().header().freelist_head, 0);
2✔
4061
            cleanup(&path);
1✔
4062
        }
4063
    }
4064

4065
    /// VACUUM modifiers (FULL, REINDEX, table targets, …) are rejected
4066
    /// with NotImplemented — only bare `VACUUM;` is supported.
4067
    #[test]
4068
    fn vacuum_modifiers_are_rejected() {
3✔
4069
        let path = tmp_path("vacuum_modifiers");
1✔
4070
        let mut db = seed_db();
1✔
4071
        db.source_path = Some(path.clone());
2✔
4072
        save_database(&mut db, &path).expect("save");
1✔
4073
        for stmt in ["VACUUM FULL;", "VACUUM users;"] {
2✔
4074
            let err = process_command(stmt, &mut db).unwrap_err();
2✔
UNCOV
4075
            assert!(
×
4076
                format!("{err}").contains("VACUUM modifiers"),
3✔
4077
                "expected modifier rejection for `{stmt}`, got: {err}"
4078
            );
4079
        }
4080
        cleanup(&path);
1✔
4081
    }
4082
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc