• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jzombie / rust-triplets / 22361174421

24 Feb 2026 04:57PM UTC coverage: 93.296% (+0.6%) from 92.675%
22361174421

push

github

web-flow
Add HF source (#7)

5314 of 5790 new or added lines in 8 files covered. (91.78%)

1 existing line in 1 file now uncovered.

14751 of 15811 relevant lines covered (93.3%)

2502.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.7
/src/source/mod.rs
1
//! Data source interfaces and paging helpers.
2
//!
3
//! Ownership model:
4
//! - `DataSource` is the sampler-facing interface that produces batches.
5
//! - `IndexableSource` exposes stable, index-based access into a corpus.
6
//! - `IndexablePager` owns the deterministic pseudo-random paging logic and
7
//!   can page any indexable source without retaining per-record state.
8

9
use chrono::{DateTime, Utc};
10
use std::hash::Hash;
11
use std::sync::Arc;
12
use std::time::{Duration, Instant};
13

14
use crate::config::{SamplerConfig, TripletRecipe};
15
use crate::data::DataRecord;
16
use crate::errors::SamplerError;
17
use crate::hash::stable_hash_with;
18
use crate::types::SourceId;
19

20
/// Source implementation modules.
21
pub mod backends;
22
/// Utility helpers used by source implementations.
23
pub mod indexing;
24
pub use backends::file_source::{
25
    FileSource, FileSourceConfig, SectionBuilder, TaxonomyBuilder, anchor_context_sections,
26
    taxonomy_from_path,
27
};
28
#[cfg(feature = "huggingface")]
29
pub use backends::huggingface_source::{HuggingFaceRowSource, HuggingFaceRowsConfig};
30

31
/// Source-owned incremental refresh position.
32
///
33
/// The sampler stores and returns this value between refresh calls.
34
/// `revision` is opaque to the sampler and interpreted only by the source.
35
#[derive(Clone, Debug)]
36
pub struct SourceCursor {
37
    /// Most recent observation timestamp produced by the source.
38
    pub last_seen: DateTime<Utc>,
39
    /// Opaque paging position token used to continue incremental refresh.
40
    pub revision: u64,
41
}
42

43
/// Result of a single source refresh call.
44
///
45
/// Pass the returned `cursor` back into the next refresh to continue paging.
46
#[derive(Clone, Debug)]
47
pub struct SourceSnapshot {
48
    /// Records returned by the refresh operation.
49
    pub records: Vec<DataRecord>,
50
    /// Next cursor to pass into a future refresh call.
51
    pub cursor: SourceCursor,
52
}
53

54
/// Sampler-facing data source interface.
55
///
56
/// Implementations may be streaming or index-backed. For a fixed dataset state
57
/// and cursor, refresh output should be deterministic.
58
pub trait DataSource: Send + Sync {
59
    /// Stable source identifier used in records, metrics, and persistence state.
60
    fn id(&self) -> &str;
61
    /// Fetch up to `limit` records starting from `cursor` state.
62
    ///
63
    /// Return the next cursor position in `SourceSnapshot.cursor`.
64
    fn refresh(
65
        &self,
66
        config: &SamplerConfig,
67
        cursor: Option<&SourceCursor>,
68
        limit: Option<usize>,
69
    ) -> Result<SourceSnapshot, SamplerError>;
70

71
    /// Exact metadata record count reported by the source.
72
    ///
73
    /// This is intended for estimators that must avoid iterating records.
74
    /// Implementations should return `Ok(count)` only when the count is
75
    /// exact for the source scope. Return `Err` when exact counting is not
76
    /// possible or the source is unavailable.
77
    ///
78
    /// Keep this consistent with `refresh` by using the same backend scope,
79
    /// filtering, and logical corpus definition.
80
    fn reported_record_count(&self, config: &SamplerConfig) -> Result<u128, SamplerError>;
81

82
    /// Optional source-provided default triplet recipes.
83
    ///
84
    /// Used when sampler config does not provide explicit recipes.
85
    fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
25✔
86
        Vec::new()
25✔
87
    }
25✔
88
}
89

90
/// Index-addressable source interface used by deterministic pagers.
91
///
92
/// `len_hint` must be stable within an epoch, and `record_at` must return the
93
/// record corresponding to the same index across runs.
94
///
95
/// Dense indexing is strongly recommended: implement indices as `0..len_hint`
96
/// with minimal gaps. Sparse indexes (returning `None` for many positions)
97
/// still work but waste paging capacity and reduce batch fill rates.
98
pub trait IndexableSource: Send + Sync {
99
    /// Stable source identifier.
100
    fn id(&self) -> &str;
101
    /// Current index domain size, typically `Some(total_records)`.
102
    fn len_hint(&self) -> Option<usize>;
103
    /// Return the record at index `idx`, or `None` for sparse/missing positions.
104
    fn record_at(&self, idx: usize) -> Result<Option<DataRecord>, SamplerError>;
105
}
106

107
/// Deterministic pager for `IndexableSource`.
108
///
109
/// Encapsulates shuffle seed and cursor math so callers can reuse a stable
110
/// paging algorithm without implementing permutation logic themselves.
111
pub struct IndexablePager {
112
    source_id: SourceId,
113
}
114

115
impl IndexablePager {
116
    /// Create a new deterministic pager for `source_id`.
117
    pub fn new(source_id: impl Into<SourceId>) -> Self {
9✔
118
        Self {
9✔
119
            source_id: source_id.into(),
9✔
120
        }
9✔
121
    }
9✔
122

123
    /// Page records from an `IndexableSource` using the provided cursor.
124
    pub fn refresh(
6✔
125
        &self,
6✔
126
        source: &dyn IndexableSource,
6✔
127
        cursor: Option<&SourceCursor>,
6✔
128
        limit: Option<usize>,
6✔
129
    ) -> Result<SourceSnapshot, SamplerError> {
6✔
130
        let total = source
6✔
131
            .len_hint()
6✔
132
            .ok_or_else(|| SamplerError::SourceInconsistent {
6✔
133
                source_id: source.id().to_string(),
1✔
134
                details: "indexable source did not provide len_hint".into(),
1✔
135
            })?;
1✔
136
        self.refresh_with(total, cursor, limit, |idx| source.record_at(idx))
76✔
137
    }
6✔
138

139
    /// Page records using a custom index fetcher.
140
    ///
141
    /// Useful when records are indexable but not exposed through `IndexableSource`
142
    /// (for example, temporary index stores or precomputed path lists).
143
    pub fn refresh_with(
8✔
144
        &self,
8✔
145
        total: usize,
8✔
146
        cursor: Option<&SourceCursor>,
8✔
147
        limit: Option<usize>,
8✔
148
        mut fetch: impl FnMut(usize) -> Result<Option<DataRecord>, SamplerError>,
8✔
149
    ) -> Result<SourceSnapshot, SamplerError> {
8✔
150
        if total == 0 {
8✔
151
            return Ok(SourceSnapshot {
1✔
152
                records: Vec::new(),
1✔
153
                cursor: SourceCursor {
1✔
154
                    last_seen: Utc::now(),
1✔
155
                    revision: 0,
1✔
156
                },
1✔
157
            });
1✔
158
        }
7✔
159
        let mut start = cursor.map(|cursor| cursor.revision as usize).unwrap_or(0);
7✔
160
        if start >= total {
7✔
161
            start = 0;
1✔
162
        }
6✔
163
        let max = limit.unwrap_or(total);
7✔
164
        let mut records = Vec::new();
7✔
165
        let seed = Self::seed_for(&self.source_id, total);
7✔
166
        let mut permutation = IndexPermutation::new(total, seed, start as u64);
7✔
167
        let report_every = Duration::from_millis(750);
7✔
168
        let refresh_start = Instant::now();
7✔
169
        let mut last_report = refresh_start;
7✔
170
        let mut attempts = 0usize;
7✔
171
        let should_report = total >= 10_000 || max >= 1_024;
7✔
172
        if should_report {
7✔
173
            eprintln!(
1✔
174
                "[triplets:source] refresh start source='{}' total={} target={}",
1✔
175
                self.source_id, total, max
1✔
176
            );
1✔
177
        }
6✔
178
        for _ in 0..total {
7✔
179
            attempts += 1;
86✔
180
            if records.len() >= max {
86✔
181
                break;
5✔
182
            }
81✔
183
            let idx = permutation.next();
81✔
184
            if let Some(record) = fetch(idx)? {
81✔
185
                records.push(record);
80✔
186
            }
80✔
187
            if should_report && last_report.elapsed() >= report_every {
80✔
NEW
188
                eprintln!(
×
NEW
189
                    "[triplets:source] refresh progress source='{}' attempted={}/{} fetched={}/{} elapsed={:.1}s",
×
NEW
190
                    self.source_id,
×
NEW
191
                    attempts,
×
NEW
192
                    total,
×
NEW
193
                    records.len(),
×
NEW
194
                    max,
×
NEW
195
                    refresh_start.elapsed().as_secs_f64()
×
NEW
196
                );
×
NEW
197
                last_report = Instant::now();
×
198
            }
80✔
199
        }
200
        if should_report {
6✔
201
            eprintln!(
1✔
202
                "[triplets:source] refresh done source='{}' attempted={} fetched={} elapsed={:.2}s",
1✔
203
                self.source_id,
1✔
204
                attempts,
1✔
205
                records.len(),
1✔
206
                refresh_start.elapsed().as_secs_f64()
1✔
207
            );
1✔
208
        }
5✔
209
        let last_seen = records
6✔
210
            .iter()
6✔
211
            .map(|record| record.updated_at)
6✔
212
            .max()
6✔
213
            .unwrap_or_else(Utc::now);
6✔
214
        let next_start = permutation.cursor();
6✔
215
        Ok(SourceSnapshot {
6✔
216
            records,
6✔
217
            cursor: SourceCursor {
6✔
218
                last_seen,
6✔
219
                revision: next_start as u64,
6✔
220
            },
6✔
221
        })
6✔
222
    }
8✔
223

224
    /// Build a deterministic seed for a source and total size.
225
    pub(crate) fn seed_for(source_id: &SourceId, total: usize) -> u64 {
28✔
226
        Self::stable_index_shuffle_key(source_id, 0)
28✔
227
            ^ Self::stable_index_shuffle_key(source_id, total)
28✔
228
    }
28✔
229

230
    /// Build a deterministic seed for a source/total pair with explicit sampler seed.
231
    #[cfg(any(test, feature = "huggingface"))]
232
    pub(crate) fn seed_for_sampler(source_id: &SourceId, total: usize, sampler_seed: u64) -> u64 {
18✔
233
        Self::seed_for(source_id, total)
18✔
234
            ^ stable_hash_with(|hasher| {
18✔
235
                "triplets_sampler_seed".hash(hasher);
18✔
236
                source_id.hash(hasher);
18✔
237
                total.hash(hasher);
18✔
238
                sampler_seed.hash(hasher);
18✔
239
            })
18✔
240
    }
18✔
241

242
    fn stable_index_shuffle_key(source_id: &SourceId, idx: usize) -> u64 {
56✔
243
        stable_hash_with(|hasher| {
56✔
244
            source_id.hash(hasher);
56✔
245
            idx.hash(hasher);
56✔
246
        })
56✔
247
    }
56✔
248
}
249

250
/// DataSource adapter that pages an `IndexableSource` via `IndexablePager`.
251
pub struct IndexableAdapter<T: IndexableSource> {
252
    inner: T,
253
}
254

255
impl<T: IndexableSource> IndexableAdapter<T> {
256
    /// Wrap an `IndexableSource` so it can be registered as a `DataSource`.
257
    pub fn new(inner: T) -> Self {
3✔
258
        Self { inner }
3✔
259
    }
3✔
260
}
261

262
impl<T: IndexableSource> DataSource for IndexableAdapter<T> {
263
    fn id(&self) -> &str {
×
264
        self.inner.id()
×
265
    }
×
266

267
    fn refresh(
5✔
268
        &self,
5✔
269
        _config: &SamplerConfig,
5✔
270
        cursor: Option<&SourceCursor>,
5✔
271
        limit: Option<usize>,
5✔
272
    ) -> Result<SourceSnapshot, SamplerError> {
5✔
273
        let pager = IndexablePager::new(self.inner.id());
5✔
274
        pager.refresh(&self.inner, cursor, limit)
5✔
275
    }
5✔
276

277
    fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
1✔
278
        self.inner
1✔
279
            .len_hint()
1✔
280
            .map(|value| value as u128)
1✔
281
            .ok_or_else(|| SamplerError::SourceInconsistent {
1✔
282
                source_id: self.inner.id().to_string(),
1✔
283
                details: "indexable source did not provide len_hint".into(),
1✔
284
            })
1✔
285
    }
1✔
286
}
287

288
/// Internal permutation used by `IndexablePager`.
289
pub(crate) struct IndexPermutation {
290
    total: u64,
291
    domain_bits: u32,
292
    domain_size: u64,
293
    seed: u64,
294
    counter: u64,
295
}
296

297
impl IndexPermutation {
298
    fn new(total: usize, seed: u64, counter: u64) -> Self {
58✔
299
        let total_u64 = total as u64;
58✔
300
        let domain_bits = (64 - (total_u64 - 1).leading_zeros()).max(1);
58✔
301
        let domain_size = 1u64 << domain_bits;
58✔
302
        Self {
58✔
303
            total: total_u64,
58✔
304
            domain_bits,
58✔
305
            domain_size,
58✔
306
            seed,
58✔
307
            counter,
58✔
308
        }
58✔
309
    }
58✔
310

311
    fn next(&mut self) -> usize {
669✔
312
        loop {
313
            let v =
736✔
314
                Self::permute_bits(self.counter % self.domain_size, self.domain_bits, self.seed);
736✔
315
            self.counter = self.counter.wrapping_add(1);
736✔
316
            if v < self.total {
736✔
317
                return v as usize;
669✔
318
            }
67✔
319
        }
320
    }
669✔
321

322
    fn cursor(&self) -> usize {
28✔
323
        (self.counter as usize) % (self.total as usize)
28✔
324
    }
28✔
325
    fn permute_bits(value: u64, bits: u32, seed: u64) -> u64 {
738✔
326
        if bits == 0 {
738✔
327
            return 0;
1✔
328
        }
737✔
329
        let mask = if bits == 64 {
737✔
330
            u64::MAX
×
331
        } else {
332
            (1u64 << bits) - 1
737✔
333
        };
334
        let mut a = (seed | 1) & mask;
737✔
335
        if a == 0 {
737✔
336
            a = 1;
×
337
        }
737✔
338
        let b = (seed >> 1) & mask;
737✔
339
        a.wrapping_mul(value).wrapping_add(b) & mask
737✔
340
    }
738✔
341
}
342

343
/// In-memory data source for tests and small datasets.
344
pub struct InMemorySource {
345
    id: SourceId,
346
    records: Arc<Vec<DataRecord>>,
347
}
348

349
impl InMemorySource {
350
    /// Create an in-memory source from prebuilt records.
351
    pub fn new(id: impl Into<SourceId>, records: Vec<DataRecord>) -> Self {
193✔
352
        Self {
193✔
353
            id: id.into(),
193✔
354
            records: Arc::new(records),
193✔
355
        }
193✔
356
    }
193✔
357
}
358

359
impl DataSource for InMemorySource {
360
    fn id(&self) -> &str {
3,733✔
361
        &self.id
3,733✔
362
    }
3,733✔
363

364
    fn refresh(
1,366✔
365
        &self,
1,366✔
366
        _config: &SamplerConfig,
1,366✔
367
        cursor: Option<&SourceCursor>,
1,366✔
368
        limit: Option<usize>,
1,366✔
369
    ) -> Result<SourceSnapshot, SamplerError> {
1,366✔
370
        let records = &*self.records;
1,366✔
371
        let total = records.len();
1,366✔
372
        let mut start = cursor.map(|cursor| cursor.revision as usize).unwrap_or(0);
1,366✔
373
        if total > 0 && start >= total {
1,366✔
374
            start = 0;
1✔
375
        }
1,365✔
376
        let max = limit.unwrap_or(total);
1,366✔
377
        let mut filtered = Vec::new();
1,366✔
378
        for idx in 0..total {
9,468✔
379
            if filtered.len() >= max {
9,468✔
380
                break;
1,154✔
381
            }
8,314✔
382
            let pos = (start + idx) % total;
8,314✔
383
            filtered.push(records[pos].clone());
8,314✔
384
        }
385
        let last_seen = filtered
1,366✔
386
            .iter()
1,366✔
387
            .map(|record| record.updated_at)
1,366✔
388
            .max()
1,366✔
389
            .unwrap_or_else(Utc::now);
1,366✔
390
        let next_start = if total == 0 {
1,366✔
391
            0
×
392
        } else {
393
            (start + filtered.len()) % total
1,366✔
394
        };
395
        Ok(SourceSnapshot {
1,366✔
396
            records: filtered,
1,366✔
397
            cursor: SourceCursor {
1,366✔
398
                last_seen,
1,366✔
399
                revision: next_start as u64,
1,366✔
400
            },
1,366✔
401
        })
1,366✔
402
    }
1,366✔
403

NEW
404
    fn reported_record_count(&self, _config: &SamplerConfig) -> Result<u128, SamplerError> {
×
NEW
405
        Ok(self.records.len() as u128)
×
NEW
406
    }
×
407
}
408

409
#[cfg(test)]
410
mod tests {
411
    use super::*;
412
    use crate::data::{QualityScore, RecordSection, SectionRole};
413
    use crate::types::RecordId;
414
    use chrono::Duration;
415

416
    /// Minimal `IndexableSource` test fixture.
417
    struct IndexableStub {
418
        id: SourceId,
419
        count: usize,
420
    }
421

422
    struct NoLenHintStub {
423
        id: SourceId,
424
    }
425

426
    impl IndexableStub {
427
        fn new(id: &str, count: usize) -> Self {
2✔
428
            Self {
2✔
429
                id: id.to_string(),
2✔
430
                count,
2✔
431
            }
2✔
432
        }
2✔
433
    }
434

435
    impl NoLenHintStub {
436
        fn new(id: &str) -> Self {
2✔
437
            Self { id: id.to_string() }
2✔
438
        }
2✔
439
    }
440

441
    impl IndexableSource for IndexableStub {
442
        fn id(&self) -> &str {
5✔
443
            &self.id
5✔
444
        }
5✔
445

446
        fn len_hint(&self) -> Option<usize> {
5✔
447
            Some(self.count)
5✔
448
        }
5✔
449

450
        fn record_at(&self, idx: usize) -> Result<Option<DataRecord>, SamplerError> {
76✔
451
            if idx >= self.count {
76✔
452
                return Ok(None);
×
453
            }
76✔
454
            let now = Utc::now();
76✔
455
            Ok(Some(DataRecord {
76✔
456
                id: format!("record_{idx}"),
76✔
457
                source: self.id.clone(),
76✔
458
                created_at: now,
76✔
459
                updated_at: now,
76✔
460
                quality: QualityScore { trust: 1.0 },
76✔
461
                taxonomy: Vec::new(),
76✔
462
                sections: vec![RecordSection {
76✔
463
                    role: SectionRole::Anchor,
76✔
464
                    heading: None,
76✔
465
                    text: "stub".into(),
76✔
466
                    sentences: vec!["stub".into()],
76✔
467
                }],
76✔
468
                meta_prefix: None,
76✔
469
            }))
76✔
470
        }
76✔
471
    }
472

473
    impl IndexableSource for NoLenHintStub {
474
        fn id(&self) -> &str {
2✔
475
            &self.id
2✔
476
        }
2✔
477

478
        fn len_hint(&self) -> Option<usize> {
2✔
479
            None
2✔
480
        }
2✔
481

NEW
482
        fn record_at(&self, _idx: usize) -> Result<Option<DataRecord>, SamplerError> {
×
NEW
483
            Ok(None)
×
NEW
484
        }
×
485
    }
486

487
    #[test]
488
    fn indexable_adapter_pages_in_stable_order() {
1✔
489
        let adapter = IndexableAdapter::new(IndexableStub::new("stub", 6));
1✔
490
        let config = SamplerConfig::default();
1✔
491
        let full = adapter.refresh(&config, None, None).unwrap();
1✔
492
        let full_ids: Vec<RecordId> = full.records.into_iter().map(|r| r.id).collect();
1✔
493

494
        let mut cursor = None;
1✔
495
        let mut paged = Vec::new();
1✔
496
        for _ in 0..3 {
1✔
497
            let snapshot = adapter.refresh(&config, cursor.as_ref(), Some(2)).unwrap();
3✔
498
            cursor = Some(snapshot.cursor);
3✔
499
            paged.extend(snapshot.records.into_iter().map(|r| r.id));
3✔
500
        }
501
        assert_eq!(paged, full_ids);
1✔
502
    }
1✔
503

504
    #[test]
505
    fn indexable_paging_spans_multiple_regimes() {
1✔
506
        // Use a source id whose permutation step is not 1 or -1 mod 2^k,
507
        // otherwise the sequence would be a simple rotation/reversal.
508
        let total = 256usize;
1✔
509
        let mask = (1u64 << (64 - (total as u64 - 1).leading_zeros())) - 1;
1✔
510
        let source_id = (0..512)
1✔
511
            .map(|idx| format!("regime_test_{idx}"))
1✔
512
            .find(|id| {
1✔
513
                let seed = IndexablePager::seed_for(id, total);
1✔
514
                let a = (seed | 1) & mask;
1✔
515
                a != 1 && a != mask
1✔
516
            })
1✔
517
            .unwrap();
1✔
518

519
        // Pull a single page and ensure the indices are spread across the space,
520
        // which indicates the permutation isn't stuck in a narrow regime.
521
        let adapter = IndexableAdapter::new(IndexableStub::new(&source_id, total));
1✔
522
        let snapshot = adapter
1✔
523
            .refresh(&SamplerConfig::default(), None, Some(64))
1✔
524
            .unwrap();
1✔
525
        let indices: Vec<usize> = snapshot
1✔
526
            .records
1✔
527
            .into_iter()
1✔
528
            .map(|r| {
64✔
529
                r.id.strip_prefix("record_")
64✔
530
                    .unwrap()
64✔
531
                    .parse::<usize>()
64✔
532
                    .unwrap()
64✔
533
            })
64✔
534
            .collect();
1✔
535
        let min_idx = *indices.iter().min().unwrap();
1✔
536
        let max_idx = *indices.iter().max().unwrap();
1✔
537
        assert!(
1✔
538
            max_idx - min_idx >= total / 2,
1✔
539
            "expected spread across the index space, got min={min_idx} max={max_idx}"
540
        );
541
    }
1✔
542

543
    #[test]
544
    fn indexable_pager_errors_when_len_hint_missing() {
1✔
545
        let pager = IndexablePager::new("no_len_hint");
1✔
546
        let source = NoLenHintStub::new("no_len_hint");
1✔
547
        let result = pager.refresh(&source, None, Some(3));
1✔
548
        assert!(result.is_err());
1✔
549
    }
1✔
550

551
    #[test]
552
    fn indexable_adapter_reported_count_errors_when_len_hint_missing() {
1✔
553
        let adapter = IndexableAdapter::new(NoLenHintStub::new("no_len_hint"));
1✔
554
        let result = adapter.reported_record_count(&SamplerConfig::default());
1✔
555
        assert!(result.is_err());
1✔
556
    }
1✔
557

558
    #[test]
559
    fn indexable_pager_refresh_with_zero_total_returns_empty_snapshot() {
1✔
560
        let pager = IndexablePager::new("empty");
1✔
561
        let snapshot = pager
1✔
562
            .refresh_with(0, None, Some(4), |_idx| Ok(None))
1✔
563
            .unwrap();
1✔
564
        assert!(snapshot.records.is_empty());
1✔
565
        assert_eq!(snapshot.cursor.revision, 0);
1✔
566
    }
1✔
567

568
    #[test]
569
    fn in_memory_source_refresh_wraps_cursor_and_uses_latest_timestamp() {
1✔
570
        let now = Utc::now();
1✔
571
        let older = now - Duration::seconds(5);
1✔
572
        let newer = now + Duration::seconds(5);
1✔
573
        let mk = |id: &str, ts: chrono::DateTime<Utc>| DataRecord {
1✔
574
            id: id.to_string(),
2✔
575
            source: "mem".to_string(),
2✔
576
            created_at: ts,
2✔
577
            updated_at: ts,
2✔
578
            quality: QualityScore { trust: 1.0 },
2✔
579
            taxonomy: Vec::new(),
2✔
580
            sections: vec![RecordSection {
2✔
581
                role: SectionRole::Anchor,
2✔
582
                heading: None,
2✔
583
                text: id.to_string(),
2✔
584
                sentences: vec![id.to_string()],
2✔
585
            }],
2✔
586
            meta_prefix: None,
2✔
587
        };
2✔
588

589
        let source = InMemorySource::new("mem", vec![mk("a", older), mk("b", newer)]);
1✔
590
        let cursor = SourceCursor {
1✔
591
            last_seen: now,
1✔
592
            revision: 7,
1✔
593
        };
1✔
594

595
        let snapshot = source
1✔
596
            .refresh(&SamplerConfig::default(), Some(&cursor), Some(1))
1✔
597
            .unwrap();
1✔
598
        assert_eq!(snapshot.records.len(), 1);
1✔
599
        assert_eq!(snapshot.records[0].id, "a");
1✔
600
        assert_eq!(snapshot.cursor.revision, 1);
1✔
601
        assert_eq!(snapshot.cursor.last_seen, older);
1✔
602
    }
1✔
603

604
    #[test]
605
    fn index_permutation_permute_bits_handles_zero_bits_and_zero_seed_path() {
1✔
606
        assert_eq!(IndexPermutation::permute_bits(123, 0, 99), 0);
1✔
607

608
        let bits = 1;
1✔
609
        let value = 1;
1✔
610
        let out = IndexPermutation::permute_bits(value, bits, 0);
1✔
611
        assert!(out <= 1);
1✔
612
    }
1✔
613

614
    #[test]
615
    fn index_permutation_next_stays_within_total_and_cursor_advances() {
1✔
616
        let mut perm = IndexPermutation::new(3, 7, 0);
1✔
617
        let mut seen = Vec::new();
1✔
618
        for _ in 0..8 {
8✔
619
            seen.push(perm.next());
8✔
620
        }
8✔
621
        assert!(seen.iter().all(|idx| *idx < 3));
8✔
622
        assert!(perm.cursor() < 3);
1✔
623
    }
1✔
624

625
    #[test]
626
    fn indexable_pager_large_refresh_triggers_reporting_branch_and_wraps_cursor() {
1✔
627
        let pager = IndexablePager::new("reporting");
1✔
628
        let cursor = SourceCursor {
1✔
629
            last_seen: Utc::now(),
1✔
630
            revision: 20_000,
1✔
631
        };
1✔
632
        let snapshot = pager
1✔
633
            .refresh_with(10_000, Some(&cursor), Some(4), |idx| {
4✔
634
                Ok(Some(DataRecord {
4✔
635
                    id: format!("record_{idx}"),
4✔
636
                    source: "reporting".to_string(),
4✔
637
                    created_at: Utc::now(),
4✔
638
                    updated_at: Utc::now(),
4✔
639
                    quality: QualityScore { trust: 1.0 },
4✔
640
                    taxonomy: Vec::new(),
4✔
641
                    sections: vec![RecordSection {
4✔
642
                        role: SectionRole::Anchor,
4✔
643
                        heading: None,
4✔
644
                        text: "t".to_string(),
4✔
645
                        sentences: vec!["t".to_string()],
4✔
646
                    }],
4✔
647
                    meta_prefix: None,
4✔
648
                }))
4✔
649
            })
4✔
650
            .unwrap();
1✔
651

652
        assert_eq!(snapshot.records.len(), 4);
1✔
653
        assert!(snapshot.cursor.revision < 10_000);
1✔
654
    }
1✔
655

656
    #[test]
657
    fn indexable_pager_refresh_with_propagates_fetch_error() {
1✔
658
        let pager = IndexablePager::new("err");
1✔
659
        let err = pager
1✔
660
            .refresh_with(8, None, Some(2), |_idx| {
1✔
661
                Err(SamplerError::SourceUnavailable {
1✔
662
                    source_id: "err".to_string(),
1✔
663
                    reason: "fetch failed".to_string(),
1✔
664
                })
1✔
665
            })
1✔
666
            .unwrap_err();
1✔
667
        assert!(matches!(
1✔
668
            err,
1✔
669
            SamplerError::SourceUnavailable { ref reason, .. } if reason.contains("fetch failed")
1✔
670
        ));
671
    }
1✔
672

673
    #[test]
674
    fn seed_for_sampler_depends_on_sampler_seed() {
1✔
675
        let source_id = "seeded".to_string();
1✔
676
        let base = IndexablePager::seed_for(&source_id, 17);
1✔
677
        let with_a = IndexablePager::seed_for_sampler(&source_id, 17, 1);
1✔
678
        let with_b = IndexablePager::seed_for_sampler(&source_id, 17, 2);
1✔
679
        assert_ne!(with_a, with_b);
1✔
680
        assert_ne!(with_a, base);
1✔
681
    }
1✔
682
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc