• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jzombie / rust-triplets / 23176875807

17 Mar 2026 03:21AM UTC coverage: 94.18% (-0.5%) from 94.685%
23176875807

Pull #21

github

web-flow
Merge 891e810d3 into 8d9051cd6
Pull Request #21: Arbitrary save paths; improve HF integration

3468 of 3851 new or added lines in 11 files covered. (90.05%)

36 existing lines in 2 files now uncovered.

18902 of 20070 relevant lines covered (94.18%)

109322.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.87
/src/splits.rs
1
use serde::{Deserialize, Serialize};
2
use simd_r_drive::storage_engine::DataStore;
3
use simd_r_drive::storage_engine::traits::{DataStoreReader, DataStoreWriter};
4
use std::collections::HashMap;
5
use std::fmt;
6
use std::fs;
7
use std::hash::{Hash, Hasher};
8
use std::io;
9
use std::path::{Path, PathBuf};
10
use std::sync::RwLock;
11
use tempfile::TempDir;
12

13
use crate::constants::splits::{
14
    ALL_SPLITS, BITCODE_PREFIX, EPOCH_HASH_RECORD_VERSION, EPOCH_HASHES_PREFIX, EPOCH_META_PREFIX,
15
    EPOCH_META_RECORD_VERSION, EPOCH_RECORD_TOMBSTONE, META_KEY, SAMPLER_STATE_KEY,
16
    SAMPLER_STATE_RECORD_VERSION, SPLIT_PREFIX, STORE_VERSION,
17
};
18
use crate::data::RecordId;
19
use crate::errors::SamplerError;
20
use crate::types::SourceId;
21

22
/// Logical dataset partitions used during sampling.
23
#[derive(
24
    Clone,
25
    Copy,
26
    Debug,
27
    PartialEq,
28
    Eq,
29
    Hash,
30
    Serialize,
31
    Deserialize,
32
    bitcode::Encode,
33
    bitcode::Decode,
×
34
)]
35
pub enum SplitLabel {
36
    /// Training split.
37
    Train,
38
    /// Validation split.
39
    Validation,
40
    /// Test split.
41
    Test,
42
}
43

44
/// Ratio configuration for train/validation/test assignment.
45
#[derive(Clone, Copy, Debug, Serialize, Deserialize, bitcode::Encode, bitcode::Decode)]
×
46
pub struct SplitRatios {
47
    /// Fraction assigned to train.
48
    pub train: f32,
49
    /// Fraction assigned to validation.
50
    pub validation: f32,
51
    /// Fraction assigned to test.
52
    pub test: f32,
53
}
54

55
impl Default for SplitRatios {
56
    fn default() -> Self {
588✔
57
        Self {
588✔
58
            train: 0.8,
588✔
59
            validation: 0.1,
588✔
60
            test: 0.1,
588✔
61
        }
588✔
62
    }
588✔
63
}
64

65
impl SplitRatios {
66
    /// Validate that ratios sum to `1.0` (within epsilon).
67
    pub fn normalized(self) -> Result<Self, SamplerError> {
224✔
68
        let sum = self.train + self.validation + self.test;
224✔
69
        if (sum - 1.0).abs() > 1e-6 {
224✔
70
            return Err(SamplerError::Configuration(
2✔
71
                "split ratios must sum to 1.0".to_string(),
2✔
72
            ));
2✔
73
        }
222✔
74
        Ok(self)
222✔
75
    }
224✔
76
}
77

78
pub use crate::constants::splits::EPOCH_STATE_VERSION;
79

80
/// Persisted epoch cursor metadata for one split.
81
#[derive(Clone, Debug, bitcode::Encode, bitcode::Decode)]
×
82
pub struct PersistedSplitMeta {
83
    /// Current epoch for this split.
84
    pub epoch: u64,
85
    /// Cursor offset within the epoch hash list.
86
    pub offset: u64,
87
    /// Checksum of the persisted hash list.
88
    pub hashes_checksum: u64,
89
}
90

91
/// Persisted deterministic epoch hash ordering for one split.
92
#[derive(Clone, Debug, bitcode::Encode, bitcode::Decode)]
×
93
pub struct PersistedSplitHashes {
94
    /// Checksum of `hashes`.
95
    pub checksum: u64,
96
    /// Deterministic per-epoch hash ordering.
97
    pub hashes: Vec<u64>,
98
}
99

100
/// Persisted sampler runtime state (cursors, recipe indices, RNG).
101
#[derive(Clone, Debug, bitcode::Encode, bitcode::Decode)]
×
102
pub struct PersistedSamplerState {
103
    /// Source-cycle round-robin index.
104
    pub source_cycle_idx: u64,
105
    /// Per-source record cursors.
106
    pub source_record_cursors: Vec<(SourceId, u64)>,
107
    /// Current source epoch used for deterministic reshuffle.
108
    pub source_epoch: u64,
109
    /// Deterministic RNG internal state.
110
    pub rng_state: u64,
111
    /// Round-robin index for triplet recipes.
112
    pub triplet_recipe_rr_idx: u64,
113
    /// Round-robin index for text recipes.
114
    pub text_recipe_rr_idx: u64,
115
    /// Persisted source stream refresh cursors.
116
    pub source_stream_cursors: Vec<(SourceId, u64)>,
117
}
118

119
/// Split assignment backend.
120
///
121
/// Implementations map `RecordId` values to split labels deterministically.
122
pub trait SplitStore: Send + Sync {
123
    /// Return split label for `id` if known/derivable.
124
    fn label_for(&self, id: &RecordId) -> Option<SplitLabel>;
125
    /// Persist an explicit split assignment for `id`.
126
    fn upsert(&self, id: RecordId, label: SplitLabel) -> Result<(), SamplerError>;
127
    /// Return configured split ratios.
128
    fn ratios(&self) -> SplitRatios;
129
    /// Return the split label for `id`, creating/deriving one when needed.
130
    fn ensure(&self, id: RecordId) -> Result<SplitLabel, SamplerError>;
131
}
132

133
/// Persistence backend for epoch metadata and epoch hash orderings.
134
pub trait EpochStateStore: Send + Sync {
135
    /// Load split→epoch metadata map.
136
    fn load_epoch_meta(&self) -> Result<HashMap<SplitLabel, PersistedSplitMeta>, SamplerError>;
137
    /// Load persisted epoch hashes for one split, if available.
138
    fn load_epoch_hashes(
139
        &self,
140
        label: SplitLabel,
141
    ) -> Result<Option<PersistedSplitHashes>, SamplerError>;
142
    /// Persist split→epoch metadata map.
143
    fn save_epoch_meta(
144
        &self,
145
        meta: &HashMap<SplitLabel, PersistedSplitMeta>,
146
    ) -> Result<(), SamplerError>;
147
    /// Persist epoch hash list for one split.
148
    fn save_epoch_hashes(
149
        &self,
150
        label: SplitLabel,
151
        hashes: &PersistedSplitHashes,
152
    ) -> Result<(), SamplerError>;
153
}
154

155
/// Persistence backend for sampler runtime state.
156
pub trait SamplerStateStore: Send + Sync {
157
    /// Load persisted sampler runtime state, if present.
158
    fn load_sampler_state(&self) -> Result<Option<PersistedSamplerState>, SamplerError>;
159
    /// Save sampler runtime state, optionally mirroring to `save_path`.
160
    fn save_sampler_state(
161
        &self,
162
        state: &PersistedSamplerState,
163
        save_path: Option<&Path>,
164
    ) -> Result<(), SamplerError>;
165
}
166

167
/// In-memory split store with deterministic assignment derivation.
168
pub struct DeterministicSplitStore {
169
    ratios: SplitRatios,
170
    assignments: RwLock<HashMap<RecordId, SplitLabel>>,
171
    seed: u64,
172
    epoch_meta: RwLock<HashMap<SplitLabel, PersistedSplitMeta>>,
173
    epoch_hashes: RwLock<HashMap<SplitLabel, PersistedSplitHashes>>,
174
    sampler_state: RwLock<Option<PersistedSamplerState>>,
175
}
176

177
impl DeterministicSplitStore {
178
    /// Create an in-memory split store configured with `ratios` and `seed`.
179
    pub fn new(ratios: SplitRatios, seed: u64) -> Result<Self, SamplerError> {
144✔
180
        ratios.normalized()?;
144✔
181
        Ok(Self {
143✔
182
            ratios,
143✔
183
            assignments: RwLock::new(HashMap::new()),
143✔
184
            seed,
143✔
185
            epoch_meta: RwLock::new(HashMap::new()),
143✔
186
            epoch_hashes: RwLock::new(HashMap::new()),
143✔
187
            sampler_state: RwLock::new(None),
143✔
188
        })
143✔
189
    }
144✔
190

191
    fn derive_label(&self, id: &RecordId) -> SplitLabel {
943,656✔
192
        derive_label_for_id(id, self.seed, self.ratios)
943,656✔
193
    }
943,656✔
194
}
195

196
impl SplitStore for DeterministicSplitStore {
197
    fn label_for(&self, id: &RecordId) -> Option<SplitLabel> {
943,347✔
198
        if let Some(label) = self.assignments.read().ok()?.get(id).copied() {
943,347✔
199
            return Some(label);
25✔
200
        }
943,322✔
201
        Some(self.derive_label(id))
943,322✔
202
    }
943,347✔
203

204
    fn upsert(&self, id: RecordId, label: SplitLabel) -> Result<(), SamplerError> {
3✔
205
        let mut guard = self
3✔
206
            .assignments
3✔
207
            .write()
3✔
208
            .map_err(|_| SamplerError::SplitStore("lock poisoned".into()))?;
3✔
209
        guard.insert(id, label);
3✔
210
        Ok(())
3✔
211
    }
3✔
212

213
    fn ratios(&self) -> SplitRatios {
1✔
214
        self.ratios
1✔
215
    }
1✔
216

217
    fn ensure(&self, id: RecordId) -> Result<SplitLabel, SamplerError> {
334✔
218
        Ok(self.derive_label(&id))
334✔
219
    }
334✔
220
}
221

222
impl EpochStateStore for DeterministicSplitStore {
223
    fn load_epoch_meta(&self) -> Result<HashMap<SplitLabel, PersistedSplitMeta>, SamplerError> {
129✔
224
        self.epoch_meta
129✔
225
            .read()
129✔
226
            .map_err(|_| SamplerError::SplitStore("epoch meta lock poisoned".into()))
129✔
227
            .map(|guard| guard.clone())
129✔
228
    }
129✔
229

230
    fn load_epoch_hashes(
2✔
231
        &self,
2✔
232
        label: SplitLabel,
2✔
233
    ) -> Result<Option<PersistedSplitHashes>, SamplerError> {
2✔
234
        Ok(self
2✔
235
            .epoch_hashes
2✔
236
            .read()
2✔
237
            .map_err(|_| SamplerError::SplitStore("epoch hashes lock poisoned".into()))?
2✔
238
            .get(&label)
2✔
239
            .cloned())
2✔
240
    }
2✔
241

242
    fn save_epoch_meta(
5✔
243
        &self,
5✔
244
        meta: &HashMap<SplitLabel, PersistedSplitMeta>,
5✔
245
    ) -> Result<(), SamplerError> {
5✔
246
        *self
5✔
247
            .epoch_meta
5✔
248
            .write()
5✔
249
            .map_err(|_| SamplerError::SplitStore("epoch meta lock poisoned".into()))? =
5✔
250
            meta.clone();
5✔
251
        Ok(())
5✔
252
    }
5✔
253

254
    fn save_epoch_hashes(
1✔
255
        &self,
1✔
256
        label: SplitLabel,
1✔
257
        hashes: &PersistedSplitHashes,
1✔
258
    ) -> Result<(), SamplerError> {
1✔
259
        self.epoch_hashes
1✔
260
            .write()
1✔
261
            .map_err(|_| SamplerError::SplitStore("epoch hashes lock poisoned".into()))?
1✔
262
            .insert(label, hashes.clone());
1✔
263
        Ok(())
1✔
264
    }
1✔
265
}
266

267
impl SamplerStateStore for DeterministicSplitStore {
268
    fn load_sampler_state(&self) -> Result<Option<PersistedSamplerState>, SamplerError> {
240✔
269
        self.sampler_state
240✔
270
            .read()
240✔
271
            .map_err(|_| SamplerError::SplitStore("sampler state lock poisoned".into()))
240✔
272
            .map(|guard| guard.clone())
240✔
273
    }
240✔
274

275
    fn save_sampler_state(
1✔
276
        &self,
1✔
277
        state: &PersistedSamplerState,
1✔
278
        _save_path: Option<&Path>,
1✔
279
    ) -> Result<(), SamplerError> {
1✔
280
        *self
1✔
281
            .sampler_state
1✔
282
            .write()
1✔
283
            .map_err(|_| SamplerError::SplitStore("sampler state lock poisoned".into()))? =
1✔
284
            Some(state.clone());
1✔
285
        Ok(())
1✔
286
    }
1✔
287
}
288

289
#[derive(Clone, Copy, Debug, bitcode::Encode, bitcode::Decode)]
×
290
/// Versioned metadata header stored in file-backed split stores.
291
struct StoreMeta {
292
    version: u8,
293
    seed: u64,
294
    ratios: SplitRatios,
295
}
296

297
fn encode_store_meta(meta: &StoreMeta) -> Vec<u8> {
49✔
298
    encode_bitcode_payload(&bitcode::encode(meta))
49✔
299
}
49✔
300

301
fn decode_store_meta(bytes: &[u8]) -> Result<StoreMeta, SamplerError> {
34✔
302
    let raw = decode_bitcode_payload(bytes)?;
34✔
303
    bitcode::decode(&raw).map_err(|err| {
33✔
304
        SamplerError::SplitStore(format!("failed to decode split store metadata: {err}"))
1✔
305
    })
1✔
306
}
34✔
307

308
/// File-backed split store for persistent runs.
309
///
310
/// Persists assignment metadata, epoch state, and sampler runtime state.
311
///
312
/// The store **always** works against a private temporary copy of the source
313
/// snapshot.  All reads and mutations accumulate in the temp file.
314
/// State is published to permanent storage only when
315
/// [`SamplerStateStore::save_sampler_state`] is called:
316
///
317
/// * `save_to == None`  → publish temp to `save_path` (may overwrite).
318
/// * `save_to == Some(p)` → publish temp to `p` only; `save_path` is left
319
///   untouched.
320
///
321
/// This guarantees that the original source file is never modified and that
322
/// no partial state leaks to the target before an explicit save.
323
pub struct FileSplitStore {
324
    store: DataStore,
325
    /// Working path: always a private temp file; all reads and writes go here.
326
    path: PathBuf,
327
    /// Declared save destination; published to on `save_sampler_state(None)`.
328
    save_path: PathBuf,
329
    ratios: SplitRatios,
330
    seed: u64,
331
    /// Keeps the temporary directory alive for the lifetime of this store.
332
    _temp_dir: TempDir,
333
}
334

335
impl fmt::Debug for FileSplitStore {
336
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1✔
337
        f.debug_struct("FileSplitStore")
1✔
338
            .field("path", &self.save_path)
1✔
339
            .field("ratios", &self.ratios)
1✔
340
            .field("seed", &self.seed)
1✔
341
            .finish()
1✔
342
    }
1✔
343
}
344

345
impl FileSplitStore {
346
    /// Open (or create) a file-backed split store at `path`.
347
    pub fn open<P: Into<PathBuf>>(
73✔
348
        path: P,
73✔
349
        ratios: SplitRatios,
73✔
350
        seed: u64,
73✔
351
    ) -> Result<Self, SamplerError> {
73✔
352
        Self::open_with_load_path(None::<PathBuf>, path, ratios, seed)
73✔
353
    }
73✔
354

355
    /// Open (or create) a file-backed split store at `save_path`, optionally
356
    /// bootstrapping initial state from `load_path`.
357
    ///
358
    /// Always stages through a **private temporary file**.  The source used to
359
    /// seed the temp is chosen as follows:
360
    ///
361
    /// 1. `load_path` if supplied and it exists.
362
    /// 2. `save_path` if it already exists.
363
    /// 3. Nothing — a fresh empty store is created in the temp.
364
    ///
365
    /// All mutations accumulate in the temp.  State is published to permanent
366
    /// storage only when [`SamplerStateStore::save_sampler_state`] is called.
367
    /// The original source file is never modified.
368
    pub fn open_with_load_path<LP: Into<PathBuf>, SP: Into<PathBuf>>(
80✔
369
        load_path: Option<LP>,
80✔
370
        save_path: SP,
80✔
371
        ratios: SplitRatios,
80✔
372
        seed: u64,
80✔
373
    ) -> Result<Self, SamplerError> {
80✔
374
        let ratios = ratios.normalized()?;
80✔
375
        let save_path = coerce_store_path(save_path.into());
79✔
376
        ensure_parent_dir(&save_path)?;
79✔
377

378
        // Determine source to seed the working temp from.
379
        // Priority: explicit load_path > existing save_path > fresh (nothing).
380
        let source: Option<PathBuf> = if let Some(lp) = load_path {
79✔
381
            let lp = coerce_store_path(lp.into());
7✔
382
            if lp.exists() { Some(lp) } else { None }
7✔
383
        } else if save_path.exists() {
72✔
384
            Some(save_path.clone())
29✔
385
        } else {
386
            None
43✔
387
        };
388

389
        let temp_dir = tempfile::tempdir().map_err(|err| {
79✔
NEW
390
            SamplerError::SplitStore(format!("failed to create temp dir for split store: {err}"))
×
NEW
391
        })?;
×
392
        let working_path = temp_dir.path().join("working_store.bin");
79✔
393
        if let Some(src) = &source {
79✔
394
            fs::copy(src, &working_path).map_err(|err| {
33✔
395
                SamplerError::SplitStore(format!(
1✔
396
                    "failed to copy split store from '{}' to temp: {err}",
1✔
397
                    src.display()
1✔
398
                ))
1✔
399
            })?;
1✔
400
        }
46✔
401

402
        let raw_store = DataStore::open(&working_path).map_err(map_store_err)?;
78✔
403
        let store = Self {
78✔
404
            store: raw_store,
78✔
405
            path: working_path,
78✔
406
            save_path,
78✔
407
            ratios,
78✔
408
            seed,
78✔
409
            _temp_dir: temp_dir,
78✔
410
        };
78✔
411
        store.verify_metadata()?;
78✔
412
        Ok(store)
75✔
413
    }
80✔
414

415
    fn verify_metadata(&self) -> Result<(), SamplerError> {
78✔
416
        match read_bytes(&self.store, META_KEY)? {
78✔
417
            Some(bytes) => {
31✔
418
                let meta = decode_store_meta(&bytes)?;
31✔
419
                if meta.version != STORE_VERSION {
31✔
420
                    return Err(SamplerError::SplitStore(format!(
1✔
421
                        "split store version mismatch (expected {}, found {})",
1✔
422
                        STORE_VERSION, meta.version
1✔
423
                    )));
1✔
424
                }
30✔
425
                if meta.seed != self.seed {
30✔
426
                    return Err(SamplerError::SplitStore(format!(
1✔
427
                        "split store seed mismatch (expected {}, found {})",
1✔
428
                        self.seed, meta.seed
1✔
429
                    )));
1✔
430
                }
29✔
431
                if !ratios_close(meta.ratios, self.ratios) {
29✔
432
                    return Err(SamplerError::SplitStore(
1✔
433
                        "split store ratios mismatch".into(),
1✔
434
                    ));
1✔
435
                }
28✔
436
            }
437
            None => {
438
                let blob = StoreMeta {
47✔
439
                    version: STORE_VERSION,
47✔
440
                    seed: self.seed,
47✔
441
                    ratios: self.ratios,
47✔
442
                };
47✔
443
                let payload = encode_store_meta(&blob);
47✔
444
                write_bytes(&self.store, META_KEY, &payload)?;
47✔
445
            }
446
        }
447
        Ok(())
75✔
448
    }
78✔
449

450
    fn read_epoch_meta_entry(
108✔
451
        &self,
108✔
452
        label: SplitLabel,
108✔
453
    ) -> Result<Option<PersistedSplitMeta>, SamplerError> {
108✔
454
        let key = epoch_meta_key(label);
108✔
455
        let entry = self.store.read(&key).map_err(map_store_err)?;
108✔
456
        match entry {
108✔
457
            None => Ok(None),
63✔
458
            Some(bytes) => decode_epoch_meta(bytes.as_ref()),
45✔
459
        }
460
    }
108✔
461

462
    fn write_epoch_meta_entry(
75✔
463
        &self,
75✔
464
        label: SplitLabel,
75✔
465
        meta: Option<&PersistedSplitMeta>,
75✔
466
    ) -> Result<(), SamplerError> {
75✔
467
        let key = epoch_meta_key(label);
75✔
468
        let payload = encode_epoch_meta(meta);
75✔
469
        self.store
75✔
470
            .write(&key, payload.as_slice())
75✔
471
            .map_err(map_store_err)?;
75✔
472
        Ok(())
75✔
473
    }
75✔
474

475
    fn read_epoch_hashes_entry(
3✔
476
        &self,
3✔
477
        label: SplitLabel,
3✔
478
    ) -> Result<Option<PersistedSplitHashes>, SamplerError> {
3✔
479
        let key = epoch_hashes_key(label);
3✔
480
        let entry = self.store.read(&key).map_err(map_store_err)?;
3✔
481
        match entry {
3✔
482
            None => Ok(None),
1✔
483
            Some(bytes) => decode_epoch_hashes(bytes.as_ref()),
2✔
484
        }
485
    }
3✔
486

487
    fn write_epoch_hashes_entry(
1✔
488
        &self,
1✔
489
        label: SplitLabel,
1✔
490
        hashes: &PersistedSplitHashes,
1✔
491
    ) -> Result<(), SamplerError> {
1✔
492
        let key = epoch_hashes_key(label);
1✔
493
        let payload = encode_epoch_hashes(hashes);
1✔
494
        self.store
1✔
495
            .write(&key, payload.as_slice())
1✔
496
            .map_err(map_store_err)?;
1✔
497
        Ok(())
1✔
498
    }
1✔
499
}
500

501
impl SplitStore for FileSplitStore {
502
    fn label_for(&self, id: &RecordId) -> Option<SplitLabel> {
3,635✔
503
        let key = split_key(id);
3,635✔
504
        if let Ok(Some(value)) = self.store.read(&key)
3,635✔
505
            && let Ok(label) = decode_label(value.as_ref())
5✔
506
        {
507
            return Some(label);
4✔
508
        }
3,631✔
509
        Some(derive_label_for_id(id, self.seed, self.ratios))
3,631✔
510
    }
3,635✔
511

512
    fn upsert(&self, id: RecordId, label: SplitLabel) -> Result<(), SamplerError> {
1✔
513
        let _ = (id, label);
1✔
514
        Ok(())
1✔
515
    }
1✔
516

517
    fn ratios(&self) -> SplitRatios {
1✔
518
        self.ratios
1✔
519
    }
1✔
520

521
    fn ensure(&self, id: RecordId) -> Result<SplitLabel, SamplerError> {
16✔
522
        Ok(derive_label_for_id(&id, self.seed, self.ratios))
16✔
523
    }
16✔
524
}
525

526
impl EpochStateStore for FileSplitStore {
527
    fn load_epoch_meta(&self) -> Result<HashMap<SplitLabel, PersistedSplitMeta>, SamplerError> {
36✔
528
        let mut meta = HashMap::new();
36✔
529
        for label in ALL_SPLITS {
108✔
530
            if let Some(entry) = self.read_epoch_meta_entry(label)? {
108✔
531
                meta.insert(label, entry);
15✔
532
            }
93✔
533
        }
534
        Ok(meta)
36✔
535
    }
36✔
536

537
    fn load_epoch_hashes(
3✔
538
        &self,
3✔
539
        label: SplitLabel,
3✔
540
    ) -> Result<Option<PersistedSplitHashes>, SamplerError> {
3✔
541
        self.read_epoch_hashes_entry(label)
3✔
542
    }
3✔
543

544
    fn save_epoch_meta(
25✔
545
        &self,
25✔
546
        meta: &HashMap<SplitLabel, PersistedSplitMeta>,
25✔
547
    ) -> Result<(), SamplerError> {
25✔
548
        for label in ALL_SPLITS {
75✔
549
            self.write_epoch_meta_entry(label, meta.get(&label))?;
75✔
550
        }
551
        Ok(())
25✔
552
    }
25✔
553

554
    fn save_epoch_hashes(
1✔
555
        &self,
1✔
556
        label: SplitLabel,
1✔
557
        hashes: &PersistedSplitHashes,
1✔
558
    ) -> Result<(), SamplerError> {
1✔
559
        self.write_epoch_hashes_entry(label, hashes)
1✔
560
    }
1✔
561
}
562

563
impl SamplerStateStore for FileSplitStore {
564
    fn load_sampler_state(&self) -> Result<Option<PersistedSamplerState>, SamplerError> {
78✔
565
        match read_bytes(&self.store, SAMPLER_STATE_KEY)? {
78✔
566
            Some(bytes) => decode_sampler_state(bytes.as_ref()),
36✔
567
            None => Ok(None),
42✔
568
        }
569
    }
78✔
570

571
    /// Persist `state` to the working temp store and publish to the destination.
572
    ///
573
    /// * `save_to == None`  → publish temp to `save_path` (may overwrite).
574
    /// * `save_to == Some(p)` → publish temp to `p` only; `save_path` is left
575
    ///   untouched.  `p` must not already exist.
576
    fn save_sampler_state(
43✔
577
        &self,
43✔
578
        state: &PersistedSamplerState,
43✔
579
        save_to: Option<&Path>,
43✔
580
    ) -> Result<(), SamplerError> {
43✔
581
        // Determine publish destination before writing anything.
582
        let dest = if let Some(p) = save_to {
43✔
583
            coerce_store_path(p.to_path_buf())
8✔
584
        } else {
585
            self.save_path.clone()
35✔
586
        };
587

588
        // Refuse to overwrite an explicitly-named destination that already exists.
589
        // Saving back to the canonical save_path (None) is always allowed.
590
        if save_to.is_some() && dest.exists() {
43✔
591
            return Err(SamplerError::SplitStore(format!(
1✔
592
                "refusing to overwrite existing split store '{}'; choose a new path",
1✔
593
                dest.display()
1✔
594
            )));
1✔
595
        }
42✔
596

597
        // Write state into the working temp store.
598
        let payload = encode_sampler_state(state);
42✔
599
        write_bytes(&self.store, SAMPLER_STATE_KEY, &payload)?;
42✔
600

601
        // Publish: copy the temp store to the destination.
602
        ensure_parent_dir(&dest)?;
42✔
603
        fs::copy(&self.path, &dest).map_err(|err| {
42✔
NEW
604
            SamplerError::SplitStore(format!(
×
NEW
605
                "failed to publish split store to '{}': {err}",
×
NEW
606
                dest.display()
×
NEW
607
            ))
×
NEW
608
        })?;
×
609

610
        Ok(())
42✔
611
    }
43✔
612
}
613

614
fn decode_label(bytes: &[u8]) -> Result<SplitLabel, SamplerError> {
9✔
615
    match bytes.first() {
9✔
616
        Some(b'0') => Ok(SplitLabel::Train),
1✔
617
        Some(b'1') => Ok(SplitLabel::Validation),
5✔
618
        Some(b'2') => Ok(SplitLabel::Test),
1✔
619
        _ => Err(SamplerError::SplitStore("invalid split label".into())),
2✔
620
    }
621
}
9✔
622

623
fn derive_label_for_id(id: &RecordId, seed: u64, ratios: SplitRatios) -> SplitLabel {
947,304✔
624
    let mut hasher = std::collections::hash_map::DefaultHasher::new();
947,304✔
625
    id.hash(&mut hasher);
947,304✔
626
    seed.hash(&mut hasher);
947,304✔
627
    let value = hasher.finish() as f64 / u64::MAX as f64;
947,304✔
628
    let train_cut = ratios.train as f64;
947,304✔
629
    let val_cut = train_cut + ratios.validation as f64;
947,304✔
630
    if value < train_cut {
947,304✔
631
        SplitLabel::Train
682,201✔
632
    } else if value < val_cut {
265,103✔
633
        SplitLabel::Validation
132,064✔
634
    } else {
635
        SplitLabel::Test
133,039✔
636
    }
637
}
947,304✔
638

639
fn ratios_close(a: SplitRatios, b: SplitRatios) -> bool {
29✔
640
    ((a.train - b.train).abs() + (a.validation - b.validation).abs() + (a.test - b.test).abs())
29✔
641
        < 1e-5
29✔
642
}
29✔
643

644
fn split_key(id: &RecordId) -> Vec<u8> {
3,638✔
645
    let mut key = Vec::with_capacity(SPLIT_PREFIX.len() + id.len());
3,638✔
646
    key.extend_from_slice(SPLIT_PREFIX);
3,638✔
647
    key.extend_from_slice(id.as_bytes());
3,638✔
648
    key
3,638✔
649
}
3,638✔
650

651
fn read_bytes(store: &DataStore, key: &[u8]) -> Result<Option<Vec<u8>>, SamplerError> {
156✔
652
    store
156✔
653
        .read(key)
156✔
654
        .map_err(map_store_err)?
156✔
655
        .map(|entry| Ok(entry.as_ref().to_vec()))
156✔
656
        .transpose()
156✔
657
}
156✔
658

659
fn write_bytes(store: &DataStore, key: &[u8], payload: &[u8]) -> Result<(), SamplerError> {
89✔
660
    store.write(key, payload).map_err(map_store_err)?;
89✔
661
    Ok(())
89✔
662
}
89✔
663

664
fn epoch_meta_key(label: SplitLabel) -> Vec<u8> {
185✔
665
    let mut key = Vec::with_capacity(EPOCH_META_PREFIX.len() + 12);
185✔
666
    key.extend_from_slice(EPOCH_META_PREFIX);
185✔
667
    let suffix = match label {
185✔
668
        SplitLabel::Train => b"train".as_ref(),
62✔
669
        SplitLabel::Validation => b"validation".as_ref(),
62✔
670
        SplitLabel::Test => b"test".as_ref(),
61✔
671
    };
672
    key.extend_from_slice(suffix);
185✔
673
    key
185✔
674
}
185✔
675

676
fn epoch_hashes_key(label: SplitLabel) -> Vec<u8> {
7✔
677
    let mut key = Vec::with_capacity(EPOCH_HASHES_PREFIX.len() + 12);
7✔
678
    key.extend_from_slice(EPOCH_HASHES_PREFIX);
7✔
679
    let suffix = match label {
7✔
680
        SplitLabel::Train => b"train".as_ref(),
1✔
681
        SplitLabel::Validation => b"validation".as_ref(),
4✔
682
        SplitLabel::Test => b"test".as_ref(),
2✔
683
    };
684
    key.extend_from_slice(suffix);
7✔
685
    key
7✔
686
}
7✔
687

688
fn encode_epoch_meta(meta: Option<&PersistedSplitMeta>) -> Vec<u8> {
76✔
689
    match meta {
76✔
690
        None => vec![EPOCH_RECORD_TOMBSTONE],
50✔
691
        Some(meta) => {
26✔
692
            let payload = encode_bitcode_payload(&bitcode::encode(meta));
26✔
693
            let mut buf = Vec::with_capacity(1 + payload.len());
26✔
694
            buf.push(EPOCH_META_RECORD_VERSION);
26✔
695
            buf.extend_from_slice(&payload);
26✔
696
            buf
26✔
697
        }
698
    }
699
}
76✔
700

701
fn decode_epoch_meta(bytes: &[u8]) -> Result<Option<PersistedSplitMeta>, SamplerError> {
50✔
702
    if bytes.is_empty() || bytes[0] == EPOCH_RECORD_TOMBSTONE {
50✔
703
        return Ok(None);
32✔
704
    }
18✔
705
    if bytes[0] != EPOCH_META_RECORD_VERSION {
18✔
706
        return Err(SamplerError::SplitStore(
1✔
707
            "epoch meta record version mismatch".into(),
1✔
708
        ));
1✔
709
    }
17✔
710
    let raw = decode_bitcode_payload(&bytes[1..])?;
17✔
711
    bitcode::decode(&raw)
17✔
712
        .map(Some)
17✔
713
        .map_err(|err| SamplerError::SplitStore(format!("corrupt epoch meta record: {err}")))
17✔
714
}
50✔
715

716
fn encode_epoch_hashes(hashes: &PersistedSplitHashes) -> Vec<u8> {
2✔
717
    let payload = encode_bitcode_payload(&bitcode::encode(hashes));
2✔
718
    let mut buf = Vec::with_capacity(1 + payload.len());
2✔
719
    buf.push(EPOCH_HASH_RECORD_VERSION);
2✔
720
    buf.extend_from_slice(&payload);
2✔
721
    buf
2✔
722
}
2✔
723

724
fn decode_epoch_hashes(bytes: &[u8]) -> Result<Option<PersistedSplitHashes>, SamplerError> {
7✔
725
    if bytes.is_empty() || bytes[0] == EPOCH_RECORD_TOMBSTONE {
7✔
726
        return Ok(None);
2✔
727
    }
5✔
728
    if bytes[0] != EPOCH_HASH_RECORD_VERSION {
5✔
729
        return Err(SamplerError::SplitStore(
1✔
730
            "epoch hashes record version mismatch".into(),
1✔
731
        ));
1✔
732
    }
4✔
733
    let raw = decode_bitcode_payload(&bytes[1..])?;
4✔
734
    bitcode::decode(&raw)
4✔
735
        .map(Some)
4✔
736
        .map_err(|err| SamplerError::SplitStore(format!("corrupt epoch hashes record: {err}")))
4✔
737
}
7✔
738

739
fn encode_sampler_state(state: &PersistedSamplerState) -> Vec<u8> {
43✔
740
    let payload = encode_bitcode_payload(&bitcode::encode(state));
43✔
741
    let mut buf = Vec::with_capacity(1 + payload.len());
43✔
742
    buf.push(SAMPLER_STATE_RECORD_VERSION);
43✔
743
    buf.extend_from_slice(&payload);
43✔
744
    buf
43✔
745
}
43✔
746

747
fn decode_sampler_state(bytes: &[u8]) -> Result<Option<PersistedSamplerState>, SamplerError> {
40✔
748
    if bytes.is_empty() {
40✔
749
        return Ok(None);
1✔
750
    }
39✔
751
    if bytes[0] != SAMPLER_STATE_RECORD_VERSION {
39✔
752
        return Err(SamplerError::SplitStore(
1✔
753
            "sampler state record version mismatch".into(),
1✔
754
        ));
1✔
755
    }
38✔
756
    let raw = decode_bitcode_payload(&bytes[1..])?;
38✔
757
    bitcode::decode(&raw)
38✔
758
        .map(Some)
38✔
759
        .map_err(|err| SamplerError::SplitStore(format!("corrupt sampler state record: {err}")))
38✔
760
}
40✔
761

762
fn encode_bitcode_payload(bytes: &[u8]) -> Vec<u8> {
120✔
763
    let mut out = Vec::with_capacity(1 + bytes.len());
120✔
764
    out.push(BITCODE_PREFIX);
120✔
765
    out.extend_from_slice(bytes);
120✔
766
    out
120✔
767
}
120✔
768

769
fn decode_bitcode_payload(bytes: &[u8]) -> Result<Vec<u8>, SamplerError> {
94✔
770
    if bytes.first().copied() != Some(BITCODE_PREFIX) {
94✔
771
        return Err(SamplerError::SplitStore(
2✔
772
            "bitcode payload missing expected prefix".into(),
2✔
773
        ));
2✔
774
    }
92✔
775
    Ok(bytes[1..].to_vec())
92✔
776
}
94✔
777

778
fn coerce_store_path(path: PathBuf) -> PathBuf {
96✔
779
    path
96✔
780
}
96✔
781

782
fn ensure_parent_dir(path: &Path) -> Result<(), SamplerError> {
123✔
783
    if let Some(parent) = path.parent()
123✔
784
        && !parent.as_os_str().is_empty()
123✔
785
    {
786
        fs::create_dir_all(parent)?;
122✔
787
    }
1✔
788
    Ok(())
123✔
789
}
123✔
790

791
fn map_store_err(err: io::Error) -> SamplerError {
1✔
792
    SamplerError::SplitStore(err.to_string())
1✔
793
}
1✔
794

795
#[cfg(test)]
796
mod tests {
797
    use super::*;
798
    use std::collections::HashMap;
799
    use tempfile::tempdir;
800

801
    #[test]
802
    fn split_ratios_reject_non_unit_sum() {
1✔
803
        let invalid = SplitRatios {
1✔
804
            train: 0.6,
1✔
805
            validation: 0.3,
1✔
806
            test: 0.3,
1✔
807
        };
1✔
808

809
        let err = DeterministicSplitStore::new(invalid, 1)
1✔
810
            .err()
1✔
811
            .expect("expected non-unit split ratios to fail");
1✔
812
        assert!(matches!(
1✔
813
            err,
1✔
814
            SamplerError::Configuration(ref msg) if msg.contains("split ratios must sum to 1.0")
1✔
815
        ));
816

817
        let dir = tempdir().unwrap();
1✔
818
        let path = dir.path().join("split_store.bin");
1✔
819
        let err = FileSplitStore::open(&path, invalid, 1).unwrap_err();
1✔
820
        assert!(matches!(
1✔
821
            err,
1✔
822
            SamplerError::Configuration(ref msg) if msg.contains("split ratios must sum to 1.0")
1✔
823
        ));
824
    }
1✔
825

826
    #[test]
827
    fn zero_test_ratio_never_assigns_test_labels() {
1✔
828
        let ratios = SplitRatios {
1✔
829
            train: 0.5,
1✔
830
            validation: 0.5,
1✔
831
            test: 0.0,
1✔
832
        };
1✔
833
        let store = DeterministicSplitStore::new(ratios, 7).unwrap();
1✔
834

835
        let mut saw_train = false;
1✔
836
        let mut saw_validation = false;
1✔
837
        for idx in 0..20_000 {
2✔
838
            let id = format!("record_{idx}");
2✔
839
            let label = store.ensure(id).unwrap();
2✔
840
            assert_ne!(label, SplitLabel::Test);
2✔
841
            saw_train |= label == SplitLabel::Train;
2✔
842
            saw_validation |= label == SplitLabel::Validation;
2✔
843
            if saw_train && saw_validation {
2✔
844
                break;
1✔
845
            }
1✔
846
        }
847

848
        assert!(saw_train);
1✔
849
        assert!(saw_validation);
1✔
850
    }
1✔
851

852
    /// `save(None)` must publish the temp to `save_path` on disk.  A fresh
853
    /// `open` of that path must see everything that was written before the save.
854
    #[test]
855
    fn save_none_publishes_to_save_path_and_reloads_cleanly() {
1✔
856
        let dir = tempdir().unwrap();
1✔
857
        let path = dir.path().join("persist.bin");
1✔
858
        let ratios = SplitRatios::default();
1✔
859

860
        {
861
            let store = FileSplitStore::open(&path, ratios, 123).unwrap();
1✔
862
            let mut meta = HashMap::new();
1✔
863
            meta.insert(
1✔
864
                SplitLabel::Train,
1✔
865
                PersistedSplitMeta {
1✔
866
                    epoch: 3,
1✔
867
                    offset: 7,
1✔
868
                    hashes_checksum: 42,
1✔
869
                },
1✔
870
            );
871
            store.save_epoch_meta(&meta).unwrap();
1✔
872
            let state = PersistedSamplerState {
1✔
873
                source_cycle_idx: 11,
1✔
874
                source_record_cursors: vec![("s".to_string(), 1)],
1✔
875
                source_epoch: 5,
1✔
876
                rng_state: 99,
1✔
877
                triplet_recipe_rr_idx: 2,
1✔
878
                text_recipe_rr_idx: 3,
1✔
879
                source_stream_cursors: vec![],
1✔
880
            };
1✔
881
            assert!(!path.exists(), "save_path must not exist before save(None)");
1✔
882
            store.save_sampler_state(&state, None).unwrap();
1✔
883
            assert!(
1✔
884
                path.exists(),
1✔
885
                "save(None) must publish to save_path on disk"
886
            );
887
        }
888

889
        // Fresh open must read everything back from disk.
890
        let reopened = FileSplitStore::open(&path, ratios, 123).unwrap();
1✔
891
        let loaded_state = reopened.load_sampler_state().unwrap().unwrap();
1✔
892
        assert_eq!(loaded_state.source_cycle_idx, 11);
1✔
893
        assert_eq!(loaded_state.source_epoch, 5);
1✔
894
        assert_eq!(loaded_state.rng_state, 99);
1✔
895
        let loaded_meta = reopened.load_epoch_meta().unwrap();
1✔
896
        assert_eq!(loaded_meta.get(&SplitLabel::Train).unwrap().epoch, 3);
1✔
897
    }
1✔
898

899
    #[test]
900
    fn file_store_rejects_seed_mismatch() {
1✔
901
        let dir = tempdir().unwrap();
1✔
902
        let path = dir.path().join("splits.json");
1✔
903
        let ratios = SplitRatios::default();
1✔
904
        let store = FileSplitStore::open(&path, ratios, 123).unwrap();
1✔
905
        store.ensure("abc".to_string()).unwrap();
1✔
906
        // Publish to disk so the seed is committed before the next open.
907
        store
1✔
908
            .save_sampler_state(
1✔
909
                &PersistedSamplerState {
1✔
910
                    source_cycle_idx: 0,
1✔
911
                    source_record_cursors: vec![],
1✔
912
                    source_epoch: 0,
1✔
913
                    rng_state: 0,
1✔
914
                    triplet_recipe_rr_idx: 0,
1✔
915
                    text_recipe_rr_idx: 0,
1✔
916
                    source_stream_cursors: vec![],
1✔
917
                },
1✔
918
                None,
1✔
919
            )
920
            .unwrap();
1✔
921
        drop(store);
1✔
922

923
        let err = FileSplitStore::open(&path, ratios, 999).unwrap_err();
1✔
924
        assert!(matches!(
1✔
925
            err,
1✔
926
            SamplerError::SplitStore(msg) if msg.contains("seed")
1✔
927
        ));
928
    }
1✔
929

930
    #[test]
931
    fn file_store_accepts_directory_path() {
1✔
932
        let dir = tempdir().unwrap();
1✔
933
        let ratios = SplitRatios::default();
1✔
934
        let err = FileSplitStore::open(dir.path(), ratios, 777).unwrap_err();
1✔
935
        assert!(matches!(err, SamplerError::SplitStore(_)));
1✔
936
    }
1✔
937

938
    #[test]
939
    fn bitcode_payload_requires_prefix() {
1✔
940
        let err = decode_bitcode_payload(&[0x00, 0x01]).unwrap_err();
1✔
941
        assert!(
1✔
942
            matches!(err, SamplerError::SplitStore(msg) if msg.contains("missing expected prefix"))
1✔
943
        );
944
    }
1✔
945

946
    #[test]
947
    fn file_store_round_trips_epoch_and_sampler_state() {
1✔
948
        let dir = tempdir().unwrap();
1✔
949
        let path = dir.path().join("epoch_sampler_state.bin");
1✔
950
        let store = FileSplitStore::open(&path, SplitRatios::default(), 222).unwrap();
1✔
951

952
        assert!(store.load_epoch_hashes(SplitLabel::Test).unwrap().is_none());
1✔
953

954
        let mut epoch_meta = HashMap::new();
1✔
955
        epoch_meta.insert(
1✔
956
            SplitLabel::Train,
1✔
957
            PersistedSplitMeta {
1✔
958
                epoch: 3,
1✔
959
                offset: 7,
1✔
960
                hashes_checksum: 42,
1✔
961
            },
1✔
962
        );
963
        store.save_epoch_meta(&epoch_meta).unwrap();
1✔
964

965
        let loaded_meta = store.load_epoch_meta().unwrap();
1✔
966
        let loaded_train = loaded_meta.get(&SplitLabel::Train).unwrap();
1✔
967
        assert_eq!(loaded_train.epoch, 3);
1✔
968
        assert_eq!(loaded_train.offset, 7);
1✔
969
        assert_eq!(loaded_train.hashes_checksum, 42);
1✔
970

971
        let hashes = PersistedSplitHashes {
1✔
972
            checksum: 99,
1✔
973
            hashes: vec![10, 20, 30],
1✔
974
        };
1✔
975
        store
1✔
976
            .save_epoch_hashes(SplitLabel::Validation, &hashes)
1✔
977
            .unwrap();
1✔
978
        let loaded_hashes = store
1✔
979
            .load_epoch_hashes(SplitLabel::Validation)
1✔
980
            .unwrap()
1✔
981
            .unwrap();
1✔
982
        assert_eq!(loaded_hashes.checksum, 99);
1✔
983
        assert_eq!(loaded_hashes.hashes, vec![10, 20, 30]);
1✔
984

985
        let state = PersistedSamplerState {
1✔
986
            source_cycle_idx: 11,
1✔
987
            source_record_cursors: vec![("source_a".to_string(), 1)],
1✔
988
            source_epoch: 8,
1✔
989
            rng_state: 1234,
1✔
990
            triplet_recipe_rr_idx: 2,
1✔
991
            text_recipe_rr_idx: 5,
1✔
992
            source_stream_cursors: vec![("source_a".to_string(), 9)],
1✔
993
        };
1✔
994
        store.save_sampler_state(&state, None).unwrap();
1✔
995
        let loaded_state = store.load_sampler_state().unwrap().unwrap();
1✔
996
        assert_eq!(loaded_state.source_cycle_idx, 11);
1✔
997
        assert_eq!(loaded_state.source_epoch, 8);
1✔
998
        assert_eq!(loaded_state.rng_state, 1234);
1✔
999
        assert_eq!(loaded_state.triplet_recipe_rr_idx, 2);
1✔
1000
        assert_eq!(loaded_state.text_recipe_rr_idx, 5);
1✔
1001
        assert_eq!(
1✔
1002
            loaded_state.source_record_cursors,
1003
            vec![("source_a".to_string(), 1)]
1✔
1004
        );
1005
        assert_eq!(
1✔
1006
            loaded_state.source_stream_cursors,
1007
            vec![("source_a".to_string(), 9)]
1✔
1008
        );
1009

1010
        // Verify the same data survives a drop + fresh open from disk.
1011
        drop(store);
1✔
1012
        let reopened = FileSplitStore::open(&path, SplitRatios::default(), 222).unwrap();
1✔
1013
        let disk_state = reopened.load_sampler_state().unwrap().unwrap();
1✔
1014
        assert_eq!(disk_state.source_cycle_idx, 11);
1✔
1015
        assert_eq!(disk_state.source_epoch, 8);
1✔
1016
        assert_eq!(disk_state.rng_state, 1234);
1✔
1017
        let disk_meta = reopened.load_epoch_meta().unwrap();
1✔
1018
        assert_eq!(disk_meta.get(&SplitLabel::Train).unwrap().epoch, 3);
1✔
1019
        let disk_hashes = reopened
1✔
1020
            .load_epoch_hashes(SplitLabel::Validation)
1✔
1021
            .unwrap()
1✔
1022
            .unwrap();
1✔
1023
        assert_eq!(disk_hashes.checksum, 99);
1✔
1024
    }
1✔
1025

1026
    #[test]
1027
    fn split_keys_and_labels_cover_helper_paths() {
1✔
1028
        let key = split_key(&"abc".to_string());
1✔
1029
        assert!(key.starts_with(SPLIT_PREFIX));
1✔
1030

1031
        assert!(matches!(decode_label(b"0"), Ok(SplitLabel::Train)));
1✔
1032
        assert!(matches!(decode_label(b"1"), Ok(SplitLabel::Validation)));
1✔
1033
        assert!(matches!(decode_label(b"2"), Ok(SplitLabel::Test)));
1✔
1034
        assert!(decode_label(b"x").is_err());
1✔
1035

1036
        let epoch_meta_train = epoch_meta_key(SplitLabel::Train);
1✔
1037
        let epoch_hashes_train = epoch_hashes_key(SplitLabel::Train);
1✔
1038
        let epoch_hashes_test = epoch_hashes_key(SplitLabel::Test);
1✔
1039
        assert!(epoch_meta_train.starts_with(EPOCH_META_PREFIX));
1✔
1040
        assert!(epoch_hashes_train.starts_with(EPOCH_HASHES_PREFIX));
1✔
1041
        assert!(epoch_hashes_test.starts_with(EPOCH_HASHES_PREFIX));
1✔
1042
    }
1✔
1043

1044
    #[test]
1045
    fn encode_decode_store_meta_roundtrip_and_corrupt_prefix_error() {
1✔
1046
        let meta = StoreMeta {
1✔
1047
            version: STORE_VERSION,
1✔
1048
            seed: 55,
1✔
1049
            ratios: SplitRatios::default(),
1✔
1050
        };
1✔
1051
        let encoded = encode_store_meta(&meta);
1✔
1052
        let decoded = decode_store_meta(&encoded).unwrap();
1✔
1053
        assert_eq!(decoded.version, STORE_VERSION);
1✔
1054
        assert_eq!(decoded.seed, 55);
1✔
1055

1056
        let err = decode_store_meta(&[0x00, 0x01]).unwrap_err();
1✔
1057
        assert!(matches!(
1✔
1058
            err,
1✔
1059
            SamplerError::SplitStore(msg) if msg.contains("missing expected prefix")
1✔
1060
        ));
1061
    }
1✔
1062

1063
    #[test]
1064
    fn epoch_and_sampler_decoders_cover_tombstone_and_version_mismatch() {
1✔
1065
        assert!(decode_epoch_meta(&[]).unwrap().is_none());
1✔
1066
        assert!(
1✔
1067
            decode_epoch_meta(&[EPOCH_RECORD_TOMBSTONE])
1✔
1068
                .unwrap()
1✔
1069
                .is_none()
1✔
1070
        );
1071
        assert!(decode_epoch_hashes(&[]).unwrap().is_none());
1✔
1072
        assert!(
1✔
1073
            decode_epoch_hashes(&[EPOCH_RECORD_TOMBSTONE])
1✔
1074
                .unwrap()
1✔
1075
                .is_none()
1✔
1076
        );
1077
        assert!(decode_sampler_state(&[]).unwrap().is_none());
1✔
1078

1079
        let meta_mismatch = decode_epoch_meta(&[EPOCH_META_RECORD_VERSION.wrapping_add(1), 1]);
1✔
1080
        assert!(matches!(
1✔
1081
            meta_mismatch,
1✔
1082
            Err(SamplerError::SplitStore(msg)) if msg.contains("version mismatch")
1✔
1083
        ));
1084
        let hashes_mismatch = decode_epoch_hashes(&[EPOCH_HASH_RECORD_VERSION.wrapping_add(1), 1]);
1✔
1085
        assert!(matches!(
1✔
1086
            hashes_mismatch,
1✔
1087
            Err(SamplerError::SplitStore(msg)) if msg.contains("version mismatch")
1✔
1088
        ));
1089
        let state_mismatch =
1✔
1090
            decode_sampler_state(&[SAMPLER_STATE_RECORD_VERSION.wrapping_add(1), 1]);
1✔
1091
        assert!(matches!(
1✔
1092
            state_mismatch,
1✔
1093
            Err(SamplerError::SplitStore(msg)) if msg.contains("version mismatch")
1✔
1094
        ));
1095
    }
1✔
1096

1097
    #[test]
1098
    fn split_store_trait_methods_and_path_helpers_are_exercised() {
1✔
1099
        let dir = tempdir().unwrap();
1✔
1100
        let file_path = dir.path().join("nested").join("store.bin");
1✔
1101
        ensure_parent_dir(&file_path).unwrap();
1✔
1102
        assert!(file_path.parent().unwrap().exists());
1✔
1103

1104
        let existing_dir_path = coerce_store_path(dir.path().to_path_buf());
1✔
1105
        assert_eq!(existing_dir_path, dir.path().to_path_buf());
1✔
1106

1107
        let ratios = SplitRatios::default();
1✔
1108
        let store = FileSplitStore::open(&file_path, ratios, 444).unwrap();
1✔
1109
        assert!((store.ratios().train - ratios.train).abs() < 1e-6);
1✔
1110
        store
1✔
1111
            .upsert("record_1".to_string(), SplitLabel::Validation)
1✔
1112
            .unwrap();
1✔
1113
        let ensured = store.ensure("record_1".to_string()).unwrap();
1✔
1114
        assert!(matches!(
1✔
1115
            ensured,
1✔
1116
            SplitLabel::Train | SplitLabel::Validation | SplitLabel::Test
1117
        ));
1118

1119
        let mapped = map_store_err(io::Error::other("boom"));
1✔
1120
        assert!(matches!(mapped, SamplerError::SplitStore(msg) if msg.contains("boom")));
1✔
1121
    }
1✔
1122

1123
    #[test]
1124
    fn epoch_and_sampler_encode_decode_roundtrips() {
1✔
1125
        let meta = PersistedSplitMeta {
1✔
1126
            epoch: 4,
1✔
1127
            offset: 9,
1✔
1128
            hashes_checksum: 21,
1✔
1129
        };
1✔
1130
        let encoded_meta = encode_epoch_meta(Some(&meta));
1✔
1131
        let decoded_meta = decode_epoch_meta(&encoded_meta).unwrap().unwrap();
1✔
1132
        assert_eq!(decoded_meta.epoch, 4);
1✔
1133
        assert_eq!(decoded_meta.offset, 9);
1✔
1134

1135
        let hashes = PersistedSplitHashes {
1✔
1136
            checksum: 7,
1✔
1137
            hashes: vec![1, 2, 3],
1✔
1138
        };
1✔
1139
        let encoded_hashes = encode_epoch_hashes(&hashes);
1✔
1140
        let decoded_hashes = decode_epoch_hashes(&encoded_hashes).unwrap().unwrap();
1✔
1141
        assert_eq!(decoded_hashes.checksum, 7);
1✔
1142
        assert_eq!(decoded_hashes.hashes, vec![1, 2, 3]);
1✔
1143

1144
        let state = PersistedSamplerState {
1✔
1145
            source_cycle_idx: 1,
1✔
1146
            source_record_cursors: vec![("s".to_string(), 2)],
1✔
1147
            source_epoch: 3,
1✔
1148
            rng_state: 4,
1✔
1149
            triplet_recipe_rr_idx: 5,
1✔
1150
            text_recipe_rr_idx: 6,
1✔
1151
            source_stream_cursors: vec![("s".to_string(), 7)],
1✔
1152
        };
1✔
1153
        let encoded_state = encode_sampler_state(&state);
1✔
1154
        let decoded_state = decode_sampler_state(&encoded_state).unwrap().unwrap();
1✔
1155
        assert_eq!(decoded_state.source_cycle_idx, 1);
1✔
1156
        assert_eq!(decoded_state.source_epoch, 3);
1✔
1157
        assert_eq!(decoded_state.rng_state, 4);
1✔
1158
    }
1✔
1159

1160
    #[test]
1161
    fn deterministic_store_trait_methods_work() {
1✔
1162
        let ratios = SplitRatios::default();
1✔
1163
        let store = DeterministicSplitStore::new(ratios, 999).unwrap();
1✔
1164

1165
        assert_eq!(store.ratios().train, ratios.train);
1✔
1166

1167
        let id = "source::record".to_string();
1✔
1168
        let derived = store.label_for(&id).unwrap();
1✔
1169
        store.upsert(id.clone(), SplitLabel::Validation).unwrap();
1✔
1170
        assert_eq!(store.label_for(&id), Some(SplitLabel::Validation));
1✔
1171
        assert!(matches!(
1✔
1172
            derived,
1✔
1173
            SplitLabel::Train | SplitLabel::Validation | SplitLabel::Test
1174
        ));
1175

1176
        let mut meta = HashMap::new();
1✔
1177
        meta.insert(
1✔
1178
            SplitLabel::Test,
1✔
1179
            PersistedSplitMeta {
1✔
1180
                epoch: 1,
1✔
1181
                offset: 2,
1✔
1182
                hashes_checksum: 3,
1✔
1183
            },
1✔
1184
        );
1185
        store.save_epoch_meta(&meta).unwrap();
1✔
1186
        let loaded_meta = store.load_epoch_meta().unwrap();
1✔
1187
        assert_eq!(loaded_meta.get(&SplitLabel::Test).unwrap().offset, 2);
1✔
1188

1189
        assert!(
1✔
1190
            store
1✔
1191
                .load_epoch_hashes(SplitLabel::Train)
1✔
1192
                .unwrap()
1✔
1193
                .is_none()
1✔
1194
        );
1195
        store
1✔
1196
            .save_epoch_hashes(
1✔
1197
                SplitLabel::Train,
1✔
1198
                &PersistedSplitHashes {
1✔
1199
                    checksum: 11,
1✔
1200
                    hashes: vec![4, 5],
1✔
1201
                },
1✔
1202
            )
1203
            .unwrap();
1✔
1204
        assert_eq!(
1✔
1205
            store
1✔
1206
                .load_epoch_hashes(SplitLabel::Train)
1✔
1207
                .unwrap()
1✔
1208
                .unwrap()
1✔
1209
                .checksum,
1210
            11
1211
        );
1212

1213
        assert!(store.load_sampler_state().unwrap().is_none());
1✔
1214
        let sampler_state = PersistedSamplerState {
1✔
1215
            source_cycle_idx: 1,
1✔
1216
            source_record_cursors: vec![("s1".to_string(), 2)],
1✔
1217
            source_epoch: 3,
1✔
1218
            rng_state: 4,
1✔
1219
            triplet_recipe_rr_idx: 5,
1✔
1220
            text_recipe_rr_idx: 6,
1✔
1221
            source_stream_cursors: vec![("s1".to_string(), 7)],
1✔
1222
        };
1✔
1223
        store.save_sampler_state(&sampler_state, None).unwrap();
1✔
1224
        assert_eq!(
1✔
1225
            store.load_sampler_state().unwrap().unwrap().source_epoch,
1✔
1226
            sampler_state.source_epoch
1227
        );
1228
    }
1✔
1229

1230
    #[test]
1231
    fn open_with_load_path_bootstraps_state_explicitly() {
1✔
1232
        let dir = tempdir().unwrap();
1✔
1233
        let path_a = dir.path().join("snapshot_a.bin");
1✔
1234
        let path_b = dir.path().join("snapshot_b.bin");
1✔
1235
        let ratios = SplitRatios::default();
1✔
1236

1237
        let store_a = FileSplitStore::open(&path_a, ratios, 42).unwrap();
1✔
1238

1239
        let mut meta = HashMap::new();
1✔
1240
        meta.insert(
1✔
1241
            SplitLabel::Train,
1✔
1242
            PersistedSplitMeta {
1✔
1243
                epoch: 5,
1✔
1244
                offset: 3,
1✔
1245
                hashes_checksum: 999,
1✔
1246
            },
1✔
1247
        );
1248
        store_a.save_epoch_meta(&meta).unwrap();
1✔
1249

1250
        let sampler_state = PersistedSamplerState {
1✔
1251
            source_cycle_idx: 1,
1✔
1252
            source_record_cursors: vec![("s1".to_string(), 2)],
1✔
1253
            source_epoch: 7,
1✔
1254
            rng_state: 123,
1✔
1255
            triplet_recipe_rr_idx: 4,
1✔
1256
            text_recipe_rr_idx: 6,
1✔
1257
            source_stream_cursors: vec![("s1".to_string(), 8)],
1✔
1258
        };
1✔
1259
        store_a.save_sampler_state(&sampler_state, None).unwrap();
1✔
1260
        drop(store_a);
1✔
1261

1262
        let store_b =
1✔
1263
            FileSplitStore::open_with_load_path(Some(path_a.clone()), &path_b, ratios, 42).unwrap();
1✔
1264
        assert_eq!(
1✔
1265
            store_b
1✔
1266
                .load_epoch_meta()
1✔
1267
                .unwrap()
1✔
1268
                .get(&SplitLabel::Train)
1✔
1269
                .unwrap()
1✔
1270
                .epoch,
1271
            5
1272
        );
1273
        assert_eq!(
1✔
1274
            store_b.load_sampler_state().unwrap().unwrap().source_epoch,
1✔
1275
            7
1276
        );
1277

1278
        let store_a_again = FileSplitStore::open(&path_a, ratios, 42).unwrap();
1✔
1279
        assert_eq!(
1✔
1280
            store_a_again
1✔
1281
                .load_epoch_meta()
1✔
1282
                .unwrap()
1✔
1283
                .get(&SplitLabel::Train)
1✔
1284
                .unwrap()
1✔
1285
                .epoch,
1286
            5
1287
        );
1288
        assert_eq!(
1✔
1289
            store_a_again
1✔
1290
                .load_sampler_state()
1✔
1291
                .unwrap()
1✔
1292
                .unwrap()
1✔
1293
                .source_epoch,
1294
            7
1295
        );
1296
    }
1✔
1297

1298
    #[test]
1299
    fn save_sampler_state_to_new_path_copies_existing_store_first() {
1✔
1300
        let dir = tempdir().unwrap();
1✔
1301
        let path_a = dir.path().join("source_store.bin");
1✔
1302
        let path_b = dir.path().join("mirror_store.bin");
1✔
1303
        let ratios = SplitRatios::default();
1✔
1304

1305
        let store_a = FileSplitStore::open(&path_a, ratios, 42).unwrap();
1✔
1306

1307
        let assigned_id = "record_with_assignment".to_string();
1✔
1308
        let assigned_key = split_key(&assigned_id);
1✔
1309
        store_a.store.write(&assigned_key, b"1").unwrap();
1✔
1310

1311
        let sampler_state = PersistedSamplerState {
1✔
1312
            source_cycle_idx: 1,
1✔
1313
            source_record_cursors: vec![("s1".to_string(), 2)],
1✔
1314
            source_epoch: 9,
1✔
1315
            rng_state: 123,
1✔
1316
            triplet_recipe_rr_idx: 4,
1✔
1317
            text_recipe_rr_idx: 6,
1✔
1318
            source_stream_cursors: vec![("s1".to_string(), 8)],
1✔
1319
        };
1✔
1320

1321
        store_a
1✔
1322
            .save_sampler_state(&sampler_state, Some(path_b.as_path()))
1✔
1323
            .unwrap();
1✔
1324

1325
        // Destination gets the existing store data AND the new sampler state.
1326
        let store_b = FileSplitStore::open(&path_b, ratios, 42).unwrap();
1✔
1327
        assert_eq!(
1✔
1328
            store_b.label_for(&assigned_id),
1✔
1329
            Some(SplitLabel::Validation)
1330
        );
1331
        assert_eq!(
1✔
1332
            store_b.load_sampler_state().unwrap().unwrap().source_epoch,
1✔
1333
            9
1334
        );
1335

1336
        // save_to=Some(path_b) must not publish to path_a (the canonical save_path).
1337
        assert!(
1✔
1338
            !path_a.exists(),
1✔
1339
            "save_to=Some(...) must not publish to the canonical save_path"
1340
        );
1341
    }
1✔
1342

1343
    /// Saving to a custom path on a plain `open` store must not mutate the
1344
    /// working store file -- i.e. a previously-saved state is preserved.
1345
    #[test]
1346
    fn save_some_on_regular_open_does_not_modify_working_store() {
1✔
1347
        let dir = tempdir().unwrap();
1✔
1348
        let path_a = dir.path().join("working_store.bin");
1✔
1349
        let path_b = dir.path().join("checkpoint_store.bin");
1✔
1350
        let ratios = SplitRatios::default();
1✔
1351

1352
        let store_a = FileSplitStore::open(&path_a, ratios, 42).unwrap();
1✔
1353

1354
        // Establish a baseline state in the working store.
1355
        let initial_state = PersistedSamplerState {
1✔
1356
            source_cycle_idx: 1,
1✔
1357
            source_record_cursors: vec![],
1✔
1358
            source_epoch: 1,
1✔
1359
            rng_state: 0,
1✔
1360
            triplet_recipe_rr_idx: 0,
1✔
1361
            text_recipe_rr_idx: 0,
1✔
1362
            source_stream_cursors: vec![],
1✔
1363
        };
1✔
1364
        store_a.save_sampler_state(&initial_state, None).unwrap();
1✔
1365

1366
        // Snapshot a newer state to a separate checkpoint path.
1367
        let checkpoint_state = PersistedSamplerState {
1✔
1368
            source_cycle_idx: 99,
1✔
1369
            source_record_cursors: vec![],
1✔
1370
            source_epoch: 99,
1✔
1371
            rng_state: 42,
1✔
1372
            triplet_recipe_rr_idx: 0,
1✔
1373
            text_recipe_rr_idx: 0,
1✔
1374
            source_stream_cursors: vec![],
1✔
1375
        };
1✔
1376
        store_a
1✔
1377
            .save_sampler_state(&checkpoint_state, Some(path_b.as_path()))
1✔
1378
            .unwrap();
1✔
1379

1380
        // The on-disk save_path (path_a) must not have been overwritten by save_to=Some(...).
1381
        // Re-open from disk to verify; path_a was last published by save_to=None above.
1382
        drop(store_a);
1✔
1383
        let store_a_disk = FileSplitStore::open(&path_a, ratios, 42).unwrap();
1✔
1384
        assert_eq!(
1✔
1385
            store_a_disk
1✔
1386
                .load_sampler_state()
1✔
1387
                .unwrap()
1✔
1388
                .unwrap()
1✔
1389
                .source_epoch,
1390
            1,
1391
            "save_to=Some(...) must not overwrite the on-disk save_path"
1392
        );
1393

1394
        // The checkpoint must hold the new state.
1395
        let store_b = FileSplitStore::open(&path_b, ratios, 42).unwrap();
1✔
1396
        let state_from_b = store_b.load_sampler_state().unwrap().unwrap();
1✔
1397
        assert_eq!(
1✔
1398
            state_from_b.source_epoch, 99,
1399
            "checkpoint store must hold the snapshotted state"
1400
        );
1401
    }
1✔
1402

1403
    #[test]
1404
    fn file_store_metadata_mismatch_and_debug_paths_are_covered() {
1✔
1405
        let dir = tempdir().unwrap();
1✔
1406
        let path = dir.path().join("meta_mismatch.bin");
1✔
1407
        let ratios = SplitRatios::default();
1✔
1408
        let store = FileSplitStore::open(&path, ratios, 123).unwrap();
1✔
1409

1410
        let debug_repr = format!("{store:?}");
1✔
1411
        assert!(debug_repr.contains("FileSplitStore"));
1✔
1412

1413
        let wrong_version = StoreMeta {
1✔
1414
            version: STORE_VERSION.wrapping_add(1),
1✔
1415
            seed: 123,
1✔
1416
            ratios,
1✔
1417
        };
1✔
1418
        let payload = encode_store_meta(&wrong_version);
1✔
1419
        store.store.write(META_KEY, &payload).unwrap();
1✔
1420
        // Publish the corrupted temp to disk so the next open reads the bad version.
1421
        store
1✔
1422
            .save_sampler_state(
1✔
1423
                &PersistedSamplerState {
1✔
1424
                    source_cycle_idx: 0,
1✔
1425
                    source_record_cursors: vec![],
1✔
1426
                    source_epoch: 0,
1✔
1427
                    rng_state: 0,
1✔
1428
                    triplet_recipe_rr_idx: 0,
1✔
1429
                    text_recipe_rr_idx: 0,
1✔
1430
                    source_stream_cursors: vec![],
1✔
1431
                },
1✔
1432
                None,
1✔
1433
            )
1434
            .unwrap();
1✔
1435
        drop(store);
1✔
1436

1437
        let err = FileSplitStore::open(&path, ratios, 123).unwrap_err();
1✔
1438
        assert!(matches!(err, SamplerError::SplitStore(msg) if msg.contains("version mismatch")));
1✔
1439

1440
        let ratio_path = dir.path().join("ratio_mismatch.bin");
1✔
1441
        let baseline = FileSplitStore::open(&ratio_path, ratios, 777).unwrap();
1✔
1442
        // Publish so ratio_path exists on disk before reopening with different ratios.
1443
        baseline
1✔
1444
            .save_sampler_state(
1✔
1445
                &PersistedSamplerState {
1✔
1446
                    source_cycle_idx: 0,
1✔
1447
                    source_record_cursors: vec![],
1✔
1448
                    source_epoch: 0,
1✔
1449
                    rng_state: 0,
1✔
1450
                    triplet_recipe_rr_idx: 0,
1✔
1451
                    text_recipe_rr_idx: 0,
1✔
1452
                    source_stream_cursors: vec![],
1✔
1453
                },
1✔
1454
                None,
1✔
1455
            )
1456
            .unwrap();
1✔
1457
        drop(baseline);
1✔
1458

1459
        let different_ratios = SplitRatios {
1✔
1460
            train: 0.7,
1✔
1461
            validation: 0.2,
1✔
1462
            test: 0.1,
1✔
1463
        };
1✔
1464
        let err = FileSplitStore::open(&ratio_path, different_ratios, 777).unwrap_err();
1✔
1465
        assert!(matches!(err, SamplerError::SplitStore(msg) if msg.contains("ratios mismatch")));
1✔
1466
    }
1✔
1467

1468
    #[test]
1469
    fn split_decode_helpers_reject_corrupt_bitcode_payloads() {
1✔
1470
        let store_meta_err = decode_store_meta(&[BITCODE_PREFIX, 0xFF, 0xEE]).unwrap_err();
1✔
1471
        assert!(matches!(
1✔
1472
            store_meta_err,
1✔
1473
            SamplerError::SplitStore(msg) if msg.contains("failed to decode split store metadata")
1✔
1474
        ));
1475

1476
        let epoch_meta_err =
1✔
1477
            decode_epoch_meta(&[EPOCH_META_RECORD_VERSION, BITCODE_PREFIX, 0xFF]).unwrap_err();
1✔
1478
        assert!(
1✔
1479
            matches!(epoch_meta_err, SamplerError::SplitStore(msg) if msg.contains("corrupt epoch meta record"))
1✔
1480
        );
1481

1482
        let epoch_hashes_err =
1✔
1483
            decode_epoch_hashes(&[EPOCH_HASH_RECORD_VERSION, BITCODE_PREFIX, 0xFF]).unwrap_err();
1✔
1484
        assert!(matches!(
1✔
1485
            epoch_hashes_err,
1✔
1486
            SamplerError::SplitStore(msg) if msg.contains("corrupt epoch hashes record")
1✔
1487
        ));
1488

1489
        let sampler_state_err =
1✔
1490
            decode_sampler_state(&[SAMPLER_STATE_RECORD_VERSION, BITCODE_PREFIX, 0xFF])
1✔
1491
                .unwrap_err();
1✔
1492
        assert!(matches!(
1✔
1493
            sampler_state_err,
1✔
1494
            SamplerError::SplitStore(msg) if msg.contains("corrupt sampler state record")
1✔
1495
        ));
1496
    }
1✔
1497

1498
    #[test]
1499
    fn file_store_label_fallback_and_validation_keys_are_covered() {
1✔
1500
        let dir = tempdir().unwrap();
1✔
1501
        let path = dir.path().join("labels.bin");
1✔
1502
        let store = FileSplitStore::open(&path, SplitRatios::default(), 42).unwrap();
1✔
1503

1504
        let id = "bad_label_record".to_string();
1✔
1505
        let expected = derive_label_for_id(&id, 42, SplitRatios::default());
1✔
1506
        let key = split_key(&id);
1✔
1507

1508
        store.store.write(&key, b"x").unwrap();
1✔
1509
        assert_eq!(store.label_for(&id), Some(expected));
1✔
1510

1511
        store.store.write(&key, b"1").unwrap();
1✔
1512
        assert_eq!(store.label_for(&id), Some(SplitLabel::Validation));
1✔
1513

1514
        let meta_validation = epoch_meta_key(SplitLabel::Validation);
1✔
1515
        let hashes_validation = epoch_hashes_key(SplitLabel::Validation);
1✔
1516
        assert!(meta_validation.starts_with(EPOCH_META_PREFIX));
1✔
1517
        assert!(hashes_validation.starts_with(EPOCH_HASHES_PREFIX));
1✔
1518
        assert!(meta_validation.ends_with(b"validation"));
1✔
1519
        assert!(hashes_validation.ends_with(b"validation"));
1✔
1520
    }
1✔
1521

1522
    #[test]
1523
    fn ensure_parent_dir_allows_plain_file_names() {
1✔
1524
        ensure_parent_dir(Path::new("split_store_local.bin")).unwrap();
1✔
1525
        let coerced = coerce_store_path(PathBuf::from("explicit_store.bin"));
1✔
1526
        assert_eq!(coerced, PathBuf::from("explicit_store.bin"));
1✔
1527
    }
1✔
1528

1529
    // -----------------------------------------------------------------------
1530
    // Temp-dir bootstrap contract
1531
    // -----------------------------------------------------------------------
1532

1533
    /// `open_with_load_path` must NOT modify the source file while the store is open.
1534
    #[test]
1535
    fn load_path_source_is_never_modified_while_open() {
1✔
1536
        let dir = tempdir().unwrap();
1✔
1537
        let source = dir.path().join("source.bin");
1✔
1538
        let dest = dir.path().join("dest.bin");
1✔
1539
        let ratios = SplitRatios::default();
1✔
1540

1541
        // Seed the source with known state.
1542
        let seeded = FileSplitStore::open(&source, ratios, 77).unwrap();
1✔
1543
        let state = PersistedSamplerState {
1✔
1544
            source_cycle_idx: 5,
1✔
1545
            source_record_cursors: vec![("s".to_string(), 3)],
1✔
1546
            source_epoch: 9,
1✔
1547
            rng_state: 42,
1✔
1548
            triplet_recipe_rr_idx: 1,
1✔
1549
            text_recipe_rr_idx: 2,
1✔
1550
            source_stream_cursors: vec![("s".to_string(), 4)],
1✔
1551
        };
1✔
1552
        seeded.save_sampler_state(&state, None).unwrap();
1✔
1553
        drop(seeded);
1✔
1554

1555
        let source_size_before = std::fs::metadata(&source).unwrap().len();
1✔
1556

1557
        // Open bootstrapped store and write mutations to it.
1558
        let bootstrapped =
1✔
1559
            FileSplitStore::open_with_load_path(Some(&source), &dest, ratios, 77).unwrap();
1✔
1560
        let new_state = PersistedSamplerState {
1✔
1561
            source_cycle_idx: 99,
1✔
1562
            source_record_cursors: vec![("s".to_string(), 77)],
1✔
1563
            source_epoch: 100,
1✔
1564
            rng_state: 0,
1✔
1565
            triplet_recipe_rr_idx: 0,
1✔
1566
            text_recipe_rr_idx: 0,
1✔
1567
            source_stream_cursors: vec![],
1✔
1568
        };
1✔
1569
        bootstrapped.save_sampler_state(&new_state, None).unwrap();
1✔
1570
        drop(bootstrapped);
1✔
1571

1572
        // Source must be byte-identical to before bootstrap.
1573
        let source_size_after = std::fs::metadata(&source).unwrap().len();
1✔
1574
        assert_eq!(
1✔
1575
            source_size_before, source_size_after,
1576
            "source file was modified during bootstrapped open"
1577
        );
1578

1579
        // Verify source still holds the original state.
1580
        let verify_source = FileSplitStore::open(&source, ratios, 77).unwrap();
1✔
1581
        let loaded = verify_source.load_sampler_state().unwrap().unwrap();
1✔
1582
        assert_eq!(loaded.source_cycle_idx, 5);
1✔
1583
        assert_eq!(loaded.source_epoch, 9);
1✔
1584
    }
1✔
1585

1586
    /// `save_sampler_state(None)` on a bootstrapped store must publish to the
1587
    /// declared `save_path` only, not to the source and not to a temp path.
1588
    #[test]
1589
    fn save_none_on_bootstrapped_store_publishes_to_save_path() {
1✔
1590
        let dir = tempdir().unwrap();
1✔
1591
        let source = dir.path().join("load.bin");
1✔
1592
        let dest = dir.path().join("save.bin");
1✔
1593
        let ratios = SplitRatios::default();
1✔
1594

1595
        let _ = FileSplitStore::open(&source, ratios, 11).unwrap();
1✔
1596

1597
        assert!(!dest.exists(), "dest must not exist before first save");
1✔
1598

1599
        let store = FileSplitStore::open_with_load_path(Some(&source), &dest, ratios, 11).unwrap();
1✔
1600
        let state = PersistedSamplerState {
1✔
1601
            source_cycle_idx: 7,
1✔
1602
            source_record_cursors: vec![],
1✔
1603
            source_epoch: 2,
1✔
1604
            rng_state: 1,
1✔
1605
            triplet_recipe_rr_idx: 0,
1✔
1606
            text_recipe_rr_idx: 0,
1✔
1607
            source_stream_cursors: vec![],
1✔
1608
        };
1✔
1609
        store.save_sampler_state(&state, None).unwrap();
1✔
1610
        drop(store);
1✔
1611

1612
        assert!(dest.exists(), "dest must exist after save(None)");
1✔
1613

1614
        let loaded_dest = FileSplitStore::open(&dest, ratios, 11).unwrap();
1✔
1615
        assert_eq!(
1✔
1616
            loaded_dest
1✔
1617
                .load_sampler_state()
1✔
1618
                .unwrap()
1✔
1619
                .unwrap()
1✔
1620
                .source_cycle_idx,
1621
            7
1622
        );
1623
    }
1✔
1624

1625
    /// `save_sampler_state(Some(other))` on a bootstrapped store must publish to
1626
    /// `other` only — the declared `save_path` must remain absent.
1627
    #[test]
1628
    fn save_some_on_bootstrapped_store_publishes_to_explicit_path_only() {
1✔
1629
        let dir = tempdir().unwrap();
1✔
1630
        let source = dir.path().join("load.bin");
1✔
1631
        let save = dir.path().join("save.bin"); // canonical — should stay empty
1✔
1632
        let other = dir.path().join("other.bin"); // explicit target
1✔
1633
        let ratios = SplitRatios::default();
1✔
1634

1635
        let _ = FileSplitStore::open(&source, ratios, 22).unwrap();
1✔
1636

1637
        let store = FileSplitStore::open_with_load_path(Some(&source), &save, ratios, 22).unwrap();
1✔
1638
        let state = PersistedSamplerState {
1✔
1639
            source_cycle_idx: 3,
1✔
1640
            source_record_cursors: vec![],
1✔
1641
            source_epoch: 1,
1✔
1642
            rng_state: 0,
1✔
1643
            triplet_recipe_rr_idx: 0,
1✔
1644
            text_recipe_rr_idx: 0,
1✔
1645
            source_stream_cursors: vec![],
1✔
1646
        };
1✔
1647
        store
1✔
1648
            .save_sampler_state(&state, Some(other.as_path()))
1✔
1649
            .unwrap();
1✔
1650
        drop(store);
1✔
1651

1652
        assert!(
1✔
1653
            !save.exists(),
1✔
1654
            "canonical save_path must not be created when saving to explicit path"
1655
        );
1656
        assert!(other.exists(), "explicit target must be created");
1✔
1657

1658
        let loaded_other = FileSplitStore::open(&other, ratios, 22).unwrap();
1✔
1659
        assert_eq!(
1✔
1660
            loaded_other
1✔
1661
                .load_sampler_state()
1✔
1662
                .unwrap()
1✔
1663
                .unwrap()
1✔
1664
                .source_cycle_idx,
1665
            3
1666
        );
1667
    }
1✔
1668

1669
    /// Repeated `save(None)` calls on a bootstrapped store are idempotent: the
1670
    /// canonical save_path is overwritten cleanly each time.
1671
    #[test]
1672
    fn repeated_save_none_on_bootstrapped_store_is_idempotent() {
1✔
1673
        let dir = tempdir().unwrap();
1✔
1674
        let source = dir.path().join("load.bin");
1✔
1675
        let dest = dir.path().join("save.bin");
1✔
1676
        let ratios = SplitRatios::default();
1✔
1677

1678
        let _ = FileSplitStore::open(&source, ratios, 33).unwrap();
1✔
1679

1680
        let store = FileSplitStore::open_with_load_path(Some(&source), &dest, ratios, 33).unwrap();
1✔
1681

1682
        for cycle_idx in [1_u64, 2, 3] {
3✔
1683
            let state = PersistedSamplerState {
3✔
1684
                source_cycle_idx: cycle_idx,
3✔
1685
                source_record_cursors: vec![],
3✔
1686
                source_epoch: 0,
3✔
1687
                rng_state: 0,
3✔
1688
                triplet_recipe_rr_idx: 0,
3✔
1689
                text_recipe_rr_idx: 0,
3✔
1690
                source_stream_cursors: vec![],
3✔
1691
            };
3✔
1692
            store.save_sampler_state(&state, None).unwrap();
3✔
1693
        }
3✔
1694
        drop(store);
1✔
1695

1696
        let reloaded = FileSplitStore::open(&dest, ratios, 33).unwrap();
1✔
1697
        assert_eq!(
1✔
1698
            reloaded
1✔
1699
                .load_sampler_state()
1✔
1700
                .unwrap()
1✔
1701
                .unwrap()
1✔
1702
                .source_cycle_idx,
1703
            3,
1704
            "dest should hold the last-saved state"
1705
        );
1706
    }
1✔
1707
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc