• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jzombie / rust-triplets / 22358031159

24 Feb 2026 03:38PM UTC coverage: 92.488% (-0.2%) from 92.675%
22358031159

Pull #7

github

web-flow
Merge 843abfd29 into 980559192
Pull Request #7: Add HF source

4634 of 5195 new or added lines in 8 files covered. (89.2%)

1 existing line in 1 file now uncovered.

14073 of 15216 relevant lines covered (92.49%)

2599.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.19
/src/source/backends/huggingface_source.rs
1
use hf_hub::Repo;
2
use hf_hub::RepoType;
3
use hf_hub::api::sync::ApiBuilder;
4
use parquet::file::reader::{FileReader, SerializedFileReader};
5
use parquet::record::reader::RowIter;
6
use rayon::prelude::*;
7
use serde::{Deserialize, Serialize};
8
use serde_json::Value;
9
use std::cmp::Ordering;
10
use std::collections::hash_map::DefaultHasher;
11
use std::collections::{BTreeMap, HashMap, VecDeque};
12
use std::fs;
13
use std::fs::File;
14
use std::hash::{Hash, Hasher};
15
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
16
use std::path::Path;
17
use std::path::PathBuf;
18
use std::sync::{Arc, Mutex};
19
use std::thread;
20
use std::time::Duration;
21
use std::time::Instant;
22
use tracing::{info, warn};
23
use walkdir::WalkDir;
24

25
use crate::SamplerError;
26
use crate::config::{NegativeStrategy, SamplerConfig, Selector, TripletRecipe};
27
use crate::data::{DataRecord, QualityScore, SectionRole};
28
use crate::utils::make_section;
29
use chrono::{DateTime, Utc};
30

31
use crate::source::{DataSource, SourceCursor, SourceSnapshot};
32

33
const REMOTE_URL_PREFIX: &str = "url::";
34
/// Extra row-index headroom above currently materialized rows exposed via `len_hint`.
35
///
36
/// This is not a file count. It lets sampling look slightly past the local row
37
/// frontier so lazy remote expansion can continue without jumping to the full
38
/// global row domain at once.
39
/// Multiplies the sampler ingestion base (`SamplerConfig.ingestion_max_records`)
40
/// to compute `len_hint` expansion headroom rows.
41
const REMOTE_EXPANSION_HEADROOM_MULTIPLIER: usize = 4;
42
/// Number of initial remote shards to materialize when bootstrapping an empty
43
/// local snapshot before regular lazy expansion.
44
const REMOTE_BOOTSTRAP_SHARDS: usize = 4;
45
/// Multiplies the source `refresh` limit passed by `IngestionManager`
46
/// (`step.unwrap_or(max_records)`) to set this source's internal row-read
47
/// batch target for each refresh pass.
48
const HUGGINGFACE_REFRESH_BATCH_MULTIPLIER: usize = 32;
49
const SHARD_SEQUENCE_STATE_VERSION: u32 = 1;
50
const SHARD_SEQUENCE_STATE_FILE: &str = "_sequence_state.json";
51

52
#[derive(Clone, Debug)]
53
struct RowTextField {
54
    name: String,
55
    text: String,
56
}
57

58
#[derive(Clone, Debug)]
59
struct RowView {
60
    row_id: Option<String>,
61
    timestamp: Option<DateTime<Utc>>,
62
    text_fields: Vec<RowTextField>,
63
}
64

65
/// Configuration for a bulk Hugging Face row source backed by local snapshot files.
66
#[derive(Clone, Debug)]
67
pub struct HuggingFaceRowsConfig {
68
    /// Stable sampler source id used in record ids and metrics.
69
    pub source_id: String,
70
    /// Hugging Face dataset id, e.g. `HuggingFaceFW/fineweb`.
71
    pub dataset: String,
72
    /// Dataset config name, e.g. `default`.
73
    pub config: String,
74
    /// Split name, e.g. `train`.
75
    pub split: String,
76
    /// Local path to a snapshot directory for this split.
77
    pub snapshot_dir: PathBuf,
78
    /// File extensions accepted as shard files.
79
    pub shard_extensions: Vec<String>,
80
    /// Number of rows between seek checkpoints while indexing a shard.
81
    pub checkpoint_stride: usize,
82
    /// Maximum number of rows cached in-memory.
83
    pub cache_capacity: usize,
84
    /// Maximum number of decoded parquet row groups cached in-memory.
85
    pub parquet_row_group_cache_capacity: usize,
86
    /// Multiplier applied to current refresh `limit` when building a read batch target.
87
    ///
88
    /// Effective target is `limit * refresh_batch_multiplier`.
89
    pub refresh_batch_multiplier: usize,
90
    /// Multiplier applied to ingestion-sized base records for `len_hint` headroom.
91
    ///
92
    /// Effective headroom is `cache_capacity * remote_expansion_headroom_multiplier`.
93
    pub remote_expansion_headroom_multiplier: usize,
94
    /// Optional maximum row cap exposed by the source.
95
    pub max_rows: Option<usize>,
96
    /// Hard cap for local manifest-shard cache bytes.
97
    ///
98
    /// When exceeded, oldest cached manifest shards are evicted.
99
    pub local_disk_cap_bytes: Option<u64>,
100
    /// Minimum number of manifest shards to keep resident during eviction.
101
    pub min_resident_shards: usize,
102
    /// Optional row id column name. Falls back to synthetic id when missing.
103
    pub id_column: Option<String>,
104
    /// Text columns to extract. Empty means auto-detect textual scalar columns.
105
    pub text_columns: Vec<String>,
106
    /// Optional column used for anchor text.
107
    ///
108
    /// When set (or when `positive_column`/`context_columns` are set), role-based
109
    /// extraction is used instead of `text_columns`/auto-detect mode.
110
    pub anchor_column: Option<String>,
111
    /// Optional column used for positive text.
112
    ///
113
    /// Positive text is emitted as a `SectionRole::Context` section.
114
    pub positive_column: Option<String>,
115
    /// Optional ordered context columns.
116
    ///
117
    /// Used only in role-based extraction mode.
118
    pub context_columns: Vec<String>,
119
}
120

121
impl HuggingFaceRowsConfig {
122
    /// Create a config with required dataset identity values and local snapshot path.
123
    pub fn new(
99✔
124
        source_id: impl Into<String>,
99✔
125
        dataset: impl Into<String>,
99✔
126
        config: impl Into<String>,
99✔
127
        split: impl Into<String>,
99✔
128
        snapshot_dir: impl Into<PathBuf>,
99✔
129
    ) -> Self {
99✔
130
        Self {
99✔
131
            source_id: source_id.into(),
99✔
132
            dataset: dataset.into(),
99✔
133
            config: config.into(),
99✔
134
            split: split.into(),
99✔
135
            snapshot_dir: snapshot_dir.into(),
99✔
136
            shard_extensions: vec![
99✔
137
                "parquet".to_string(),
99✔
138
                "jsonl".to_string(),
99✔
139
                "ndjson".to_string(),
99✔
140
            ],
99✔
141
            checkpoint_stride: 4096,
99✔
142
            cache_capacity: SamplerConfig::default().ingestion_max_records,
99✔
143
            parquet_row_group_cache_capacity: 8,
99✔
144
            refresh_batch_multiplier: HUGGINGFACE_REFRESH_BATCH_MULTIPLIER,
99✔
145
            remote_expansion_headroom_multiplier: REMOTE_EXPANSION_HEADROOM_MULTIPLIER,
99✔
146
            max_rows: None,
99✔
147
            local_disk_cap_bytes: Some(32 * 1024 * 1024 * 1024),
99✔
148
            min_resident_shards: REMOTE_BOOTSTRAP_SHARDS,
99✔
149
            id_column: Some("id".to_string()),
99✔
150
            text_columns: Vec::new(),
99✔
151
            anchor_column: None,
99✔
152
            positive_column: None,
99✔
153
            context_columns: Vec::new(),
99✔
154
        }
99✔
155
    }
99✔
156
}
157

158
#[derive(Default)]
159
struct ParquetCache {
160
    readers: HashMap<PathBuf, Arc<SerializedFileReader<File>>>,
161
}
162

163
impl ParquetCache {
164
    /// Return a cached parquet reader for `path`, opening and caching it when missing.
165
    fn reader_for(
4✔
166
        &mut self,
4✔
167
        source_id: &str,
4✔
168
        path: &Path,
4✔
169
    ) -> Result<Arc<SerializedFileReader<File>>, SamplerError> {
4✔
170
        if let Some(reader) = self.readers.get(path) {
4✔
NEW
171
            return Ok(reader.clone());
×
172
        }
4✔
173

174
        let file = File::open(path).map_err(|err| SamplerError::SourceUnavailable {
4✔
175
            source_id: source_id.to_string(),
2✔
176
            reason: format!("failed opening parquet shard {}: {err}", path.display()),
2✔
177
        })?;
2✔
178
        let reader =
1✔
179
            SerializedFileReader::new(file).map_err(|err| SamplerError::SourceUnavailable {
2✔
180
                source_id: source_id.to_string(),
1✔
181
                reason: format!("failed reading parquet shard {}: {err}", path.display()),
1✔
182
            })?;
1✔
183
        let reader = Arc::new(reader);
1✔
184
        self.readers.insert(path.to_path_buf(), reader.clone());
1✔
185
        Ok(reader)
1✔
186
    }
4✔
187
}
188

189
#[derive(Clone, Debug)]
190
struct ShardIndex {
191
    path: PathBuf,
192
    global_start: usize,
193
    row_count: usize,
194
    is_parquet: bool,
195
    parquet_row_groups: Vec<(usize, usize)>,
196
    checkpoints: Vec<u64>,
197
}
198

199
#[derive(Default)]
200
struct RowCache {
201
    rows: HashMap<usize, RowView>,
202
    order: VecDeque<usize>,
203
}
204

205
impl RowCache {
206
    /// Return a cloned cached row by absolute index.
207
    fn get(&self, idx: usize) -> Option<RowView> {
48✔
208
        self.rows.get(&idx).cloned()
48✔
209
    }
48✔
210

211
    /// Insert or refresh a cached row and evict oldest entries over `capacity`.
212
    fn insert(&mut self, idx: usize, row: RowView, capacity: usize) {
42✔
213
        if capacity == 0 {
42✔
214
            return;
1✔
215
        }
41✔
216
        if !self.rows.contains_key(&idx) {
41✔
217
            self.order.push_back(idx);
41✔
218
        }
41✔
219
        self.rows.insert(idx, row);
41✔
220
        while self.rows.len() > capacity {
42✔
221
            if let Some(old) = self.order.pop_front() {
1✔
222
                self.rows.remove(&old);
1✔
223
            } else {
1✔
NEW
224
                break;
×
225
            }
226
        }
227
    }
42✔
228
}
229

230
/// Bulk-oriented Hugging Face source backed by local shard files.
231
pub struct HuggingFaceRowSource {
232
    config: HuggingFaceRowsConfig,
233
    sampler_config: Mutex<Option<SamplerConfig>>,
234
    state: Mutex<SourceState>,
235
    cache: Mutex<RowCache>,
236
    parquet_cache: Mutex<ParquetCache>,
237
}
238

239
#[derive(Debug)]
240
struct SourceState {
241
    materialized_rows: usize,
242
    total_rows: Option<usize>,
243
    shards: Vec<ShardIndex>,
244
    remote_candidates: Option<Vec<String>>,
245
    remote_candidate_sizes: HashMap<String, u64>,
246
    next_remote_idx: usize,
247
}
248

249
type ParquetGroupKey = (PathBuf, usize);
250
type ParquetGroupRequest = (usize, usize, ShardIndex);
251

252
#[derive(Clone, Debug, Serialize, Deserialize)]
253
struct PersistedShardSequence {
254
    version: u32,
255
    source_id: String,
256
    dataset: String,
257
    config: String,
258
    split: String,
259
    sampler_seed: u64,
260
    candidates: Vec<String>,
261
    candidate_sizes: HashMap<String, u64>,
262
    next_remote_idx: usize,
263
}
264

265
impl HuggingFaceRowSource {
266
    /// Build a new source by indexing local shard files.
267
    pub fn new(config: HuggingFaceRowsConfig) -> Result<Self, SamplerError> {
3✔
268
        let start_new = Instant::now();
3✔
269
        if config.checkpoint_stride == 0 {
3✔
270
            return Err(SamplerError::Configuration(
1✔
271
                "huggingface source checkpoint_stride must be > 0".to_string(),
1✔
272
            ));
1✔
273
        }
2✔
274

275
        fs::create_dir_all(&config.snapshot_dir).map_err(|err| {
2✔
NEW
276
            SamplerError::SourceUnavailable {
×
NEW
277
                source_id: config.source_id.clone(),
×
NEW
278
                reason: format!(
×
NEW
279
                    "failed creating snapshot_dir {}: {err}",
×
NEW
280
                    config.snapshot_dir.display()
×
NEW
281
                ),
×
NEW
282
            }
×
NEW
283
        })?;
×
284

285
        info!(
2✔
286
            "[triplets:hf] indexing local shards in {}",
NEW
287
            config.snapshot_dir.display()
×
288
        );
289
        let (shards, discovered) = Self::build_shard_index(&config).unwrap_or_default();
2✔
290
        if discovered == 0 {
2✔
NEW
291
            info!(
×
292
                "[triplets:hf] no local shards found in {} — lazy remote download enabled",
NEW
293
                config.snapshot_dir.display()
×
294
            );
295
        }
2✔
296

297
        let materialized_rows = config
2✔
298
            .max_rows
2✔
299
            .map(|cap| cap.min(discovered))
2✔
300
            .unwrap_or(discovered);
2✔
301
        let total_rows = match Self::fetch_global_row_count(&config) {
2✔
NEW
302
            Ok(value) => value,
×
303
            Err(err) => {
2✔
304
                warn!(
2✔
305
                    "[triplets:hf] global row count request failed; continuing with discovered rows only: {}",
306
                    err
307
                );
308
                None
2✔
309
            }
310
        };
311

312
        if let Some(global_total) = total_rows {
2✔
NEW
313
            info!(
×
314
                "[triplets:hf] global split row count reported: {} (known_local_rows={})",
315
                global_total, materialized_rows
316
            );
317
        }
2✔
318

319
        info!(
2✔
320
            "[triplets:hf] source ready in {:.2}s (rows={}, shards={})",
NEW
321
            start_new.elapsed().as_secs_f64(),
×
322
            materialized_rows,
NEW
323
            shards.len()
×
324
        );
325

326
        Ok(Self {
2✔
327
            config,
2✔
328
            sampler_config: Mutex::new(None),
2✔
329
            state: Mutex::new(SourceState {
2✔
330
                materialized_rows,
2✔
331
                total_rows,
2✔
332
                shards,
2✔
333
                remote_candidates: None,
2✔
334
                remote_candidate_sizes: HashMap::new(),
2✔
335
                next_remote_idx: 0,
2✔
336
            }),
2✔
337
            cache: Mutex::new(RowCache::default()),
2✔
338
            parquet_cache: Mutex::new(ParquetCache::default()),
2✔
339
        })
2✔
340
    }
3✔
341

342
    fn set_active_sampler_config(&self, config: &SamplerConfig) {
79✔
343
        if let Ok(mut slot) = self.sampler_config.lock() {
79✔
344
            *slot = Some(config.clone());
79✔
345
        }
79✔
346
    }
79✔
347

348
    #[cfg(test)]
349
    fn active_or_default_sampler_config(&self) -> SamplerConfig {
12✔
350
        self.sampler_config
12✔
351
            .lock()
12✔
352
            .ok()
12✔
353
            .and_then(|slot| slot.clone())
12✔
354
            .unwrap_or_default()
12✔
355
    }
12✔
356

357
    #[cfg(test)]
358
    fn configure_sampler(&self, config: &SamplerConfig) {
6✔
359
        self.set_active_sampler_config(config);
6✔
360
    }
6✔
361

362
    #[cfg(test)]
363
    fn refresh(
9✔
364
        &self,
9✔
365
        cursor: Option<&SourceCursor>,
9✔
366
        limit: Option<usize>,
9✔
367
    ) -> Result<SourceSnapshot, SamplerError> {
9✔
368
        let config = self.active_or_default_sampler_config();
9✔
369
        <Self as DataSource>::refresh(self, &config, cursor, limit)
9✔
370
    }
9✔
371

372
    #[cfg(test)]
373
    fn reported_record_count(&self) -> Result<u128, SamplerError> {
3✔
374
        let config = self.active_or_default_sampler_config();
3✔
375
        <Self as DataSource>::reported_record_count(self, &config)
3✔
376
    }
3✔
377

378
    /// Compute the effective internal row read target from refresh `limit`.
379
    fn effective_refresh_batch_target(&self, limit: usize) -> usize {
12✔
380
        let multiplier = self.config.refresh_batch_multiplier.max(1);
12✔
381
        limit.saturating_mul(multiplier)
12✔
382
    }
12✔
383

384
    /// Compute dynamic `len_hint` headroom rows based on sampler and source config.
385
    fn effective_expansion_headroom_rows(&self) -> usize {
8✔
386
        let multiplier = self.config.remote_expansion_headroom_multiplier.max(1);
8✔
387
        let base = self
8✔
388
            .sampler_config
8✔
389
            .lock()
8✔
390
            .ok()
8✔
391
            .and_then(|config| config.as_ref().map(|value| value.ingestion_max_records))
8✔
392
            .unwrap_or(self.config.cache_capacity)
8✔
393
            .max(1);
8✔
394
        base.saturating_mul(multiplier)
8✔
395
    }
8✔
396

397
    fn configured_sampler_seed(&self) -> Result<u64, SamplerError> {
24✔
398
        self.sampler_config
24✔
399
            .lock()
24✔
400
            .map_err(|_| SamplerError::SourceUnavailable {
24✔
NEW
401
                source_id: self.config.source_id.clone(),
×
NEW
402
                reason: "huggingface sampler-config lock poisoned".to_string(),
×
NEW
403
            })?
×
404
            .as_ref()
24✔
405
            .map(|config| config.seed)
24✔
406
            .ok_or_else(|| SamplerError::SourceInconsistent {
24✔
NEW
407
                source_id: self.config.source_id.clone(),
×
NEW
408
                details: "huggingface source sampler configuration not provided".to_string(),
×
NEW
409
            })
×
410
    }
24✔
411

412
    fn paging_seed(&self, total: usize) -> Result<u64, SamplerError> {
10✔
413
        let sampler_seed = self.configured_sampler_seed()?;
10✔
414
        Ok(crate::source::IndexablePager::seed_for_sampler(
10✔
415
            &self.config.source_id,
10✔
416
            total,
10✔
417
            sampler_seed,
10✔
418
        ))
10✔
419
    }
10✔
420

421
    fn normalized_shard_extensions(config: &HuggingFaceRowsConfig) -> Vec<String> {
11✔
422
        config
11✔
423
            .shard_extensions
11✔
424
            .iter()
11✔
425
            .map(|value| value.trim().trim_start_matches('.').to_ascii_lowercase())
30✔
426
            .collect::<Vec<_>>()
11✔
427
    }
11✔
428

429
    fn collect_candidates_from_siblings(
8✔
430
        config: &HuggingFaceRowsConfig,
8✔
431
        siblings: &[String],
8✔
432
        accepted: &[String],
8✔
433
        respect_split: bool,
8✔
434
    ) -> (Vec<String>, bool) {
8✔
435
        let mut saw_parquet = false;
8✔
436
        let mut candidates = Vec::new();
8✔
437
        for remote_path in siblings {
14✔
438
            if respect_split && !config.split.is_empty() {
14✔
439
                let split_tag = format!("{}/", config.split);
10✔
440
                let split_token = format!("-{}-", config.split);
10✔
441
                let split_prefix = format!("{}-", config.split);
10✔
442
                if !remote_path.contains(&split_tag)
10✔
443
                    && !remote_path.contains(&split_token)
5✔
444
                    && !Path::new(remote_path)
5✔
445
                        .file_name()
5✔
446
                        .and_then(|name| name.to_str())
5✔
447
                        .is_some_and(|name| name.starts_with(&split_prefix))
5✔
448
                {
449
                    continue;
3✔
450
                }
7✔
451
            }
4✔
452

453
            let ext = Path::new(remote_path)
11✔
454
                .extension()
11✔
455
                .and_then(|v| v.to_str())
11✔
456
                .map(|v| v.to_ascii_lowercase());
11✔
457
            if ext.as_deref() == Some("parquet") {
11✔
458
                saw_parquet = true;
3✔
459
            }
8✔
460
            if ext
11✔
461
                .as_deref()
11✔
462
                .is_some_and(|ext| accepted.iter().any(|allowed| allowed == ext))
27✔
463
            {
464
                let target = Self::candidate_target_path(config, remote_path);
6✔
465
                if target.exists() {
6✔
466
                    continue;
1✔
467
                }
5✔
468
                candidates.push(remote_path.clone());
5✔
469
            }
5✔
470
        }
471
        (candidates, saw_parquet)
8✔
472
    }
8✔
473

474
    fn resolve_remote_candidates_from_siblings(
3✔
475
        config: &HuggingFaceRowsConfig,
3✔
476
        siblings: &[String],
3✔
477
        accepted: &[String],
3✔
478
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
3✔
479
        let (mut candidates, mut saw_parquet) =
3✔
480
            Self::collect_candidates_from_siblings(config, siblings, accepted, true);
3✔
481
        if candidates.is_empty() && !config.split.is_empty() {
3✔
482
            let (fallback_candidates, fallback_saw_parquet) =
3✔
483
                Self::collect_candidates_from_siblings(config, siblings, accepted, false);
3✔
484
            if !fallback_candidates.is_empty() {
3✔
485
                warn!(
1✔
486
                    "[triplets:hf] split filter '{}' matched no remote files; falling back to extension-only remote candidate scan",
487
                    config.split
488
                );
489
                candidates = fallback_candidates;
1✔
490
                saw_parquet = fallback_saw_parquet;
1✔
491
            }
2✔
NEW
492
        }
×
493

494
        candidates.sort();
3✔
495
        info!(
3✔
496
            "[triplets:hf] remote candidates matching {:?}: {}",
497
            config.shard_extensions,
498
            candidates.len()
3✔
499
        );
500
        if candidates.is_empty() {
3✔
501
            if saw_parquet {
2✔
502
                return Err(SamplerError::SourceUnavailable {
1✔
503
                    source_id: config.source_id.clone(),
1✔
504
                    reason: format!(
1✔
505
                        "dataset '{}' appears to be parquet-only, but shard_extensions does not include parquet ({:?}).",
1✔
506
                        config.dataset, config.shard_extensions
1✔
507
                    ),
1✔
508
                });
1✔
509
            }
1✔
510
            warn!(
1✔
511
                "[triplets:hf] no remote candidates found for dataset='{}' split='{}' extensions={:?}; source will be treated as exhausted",
512
                config.dataset, config.split, config.shard_extensions
513
            );
514
            return Ok((Vec::new(), HashMap::new()));
1✔
515
        }
1✔
516

517
        Ok((candidates, HashMap::new()))
1✔
518
    }
3✔
519

520
    fn candidates_from_parquet_manifest_json(
5✔
521
        config: &HuggingFaceRowsConfig,
5✔
522
        json: &Value,
5✔
523
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
5✔
524
        let accepted = Self::normalized_shard_extensions(config);
5✔
525

526
        let mut candidates = Vec::new();
5✔
527
        let mut candidate_sizes = HashMap::new();
5✔
528
        if let Some(entries) = json.get("parquet_files").and_then(Value::as_array) {
5✔
529
            for entry in entries {
8✔
530
                let Some(url) = entry.get("url").and_then(Value::as_str) else {
8✔
531
                    continue;
1✔
532
                };
533

534
                let ext = Path::new(url)
7✔
535
                    .extension()
7✔
536
                    .and_then(|value| value.to_str())
7✔
537
                    .map(|value| value.to_ascii_lowercase());
7✔
538
                if !ext
7✔
539
                    .as_deref()
7✔
540
                    .is_some_and(|value| accepted.iter().any(|allowed| allowed == value))
11✔
541
                {
542
                    continue;
1✔
543
                }
6✔
544

545
                let candidate = format!("{REMOTE_URL_PREFIX}{url}");
6✔
546
                let expected_size = entry.get("size").and_then(Value::as_u64);
6✔
547
                let target = Self::candidate_target_path(config, &candidate);
6✔
548
                if target.exists() {
6✔
549
                    if Self::target_matches_expected_size(&target, expected_size) {
3✔
550
                        continue;
1✔
551
                    }
2✔
552
                    warn!(
2✔
553
                        "[triplets:hf] incomplete cached shard detected (will redownload): {}",
554
                        target.display()
2✔
555
                    );
556
                    if let Err(err) = fs::remove_file(&target)
2✔
557
                        && err.kind() != std::io::ErrorKind::NotFound
1✔
558
                    {
559
                        return Err(SamplerError::SourceUnavailable {
1✔
560
                            source_id: config.source_id.clone(),
1✔
561
                            reason: format!(
1✔
562
                                "failed removing incomplete shard {}: {err}",
1✔
563
                                target.display()
1✔
564
                            ),
1✔
565
                        });
1✔
566
                    }
1✔
567
                }
3✔
568
                if let Some(size) = expected_size {
4✔
569
                    candidate_sizes.insert(candidate.clone(), size);
4✔
570
                }
4✔
571
                candidates.push(candidate);
4✔
572
            }
573
        }
1✔
574

575
        candidates.sort();
4✔
576
        Ok((candidates, candidate_sizes))
4✔
577
    }
5✔
578

579
    /// Resolve and filter remote shard candidates from manifest or repository listing.
NEW
580
    fn list_remote_candidates(
×
NEW
581
        config: &HuggingFaceRowsConfig,
×
NEW
582
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
×
NEW
583
        if let Ok((candidates, candidate_sizes)) =
×
NEW
584
            Self::list_remote_candidates_from_parquet_manifest(config)
×
NEW
585
            && !candidates.is_empty()
×
586
        {
NEW
587
            info!(
×
588
                "[triplets:hf] remote parquet manifest candidates matching {:?}: {}",
589
                config.shard_extensions,
NEW
590
                candidates.len()
×
591
            );
NEW
592
            return Ok((candidates, candidate_sizes));
×
NEW
593
        }
×
594

NEW
595
        let api = ApiBuilder::new()
×
NEW
596
            .with_progress(true)
×
NEW
597
            .with_retries(5)
×
NEW
598
            .with_token(None)
×
NEW
599
            .build()
×
NEW
600
            .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
601
                source_id: config.source_id.clone(),
×
NEW
602
                reason: format!("failed building hf-hub client: {err}"),
×
NEW
603
            })?;
×
604

NEW
605
        let repo = Repo::new(config.dataset.clone(), RepoType::Dataset);
×
NEW
606
        let repo_api = api.repo(repo);
×
NEW
607
        info!(
×
608
            "[triplets:hf] reading remote file list for dataset {}",
609
            config.dataset
610
        );
NEW
611
        let info = repo_api
×
NEW
612
            .info()
×
NEW
613
            .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
614
                source_id: config.source_id.clone(),
×
NEW
615
                reason: format!("failed reading hf-hub repository info: {err}"),
×
NEW
616
            })?;
×
617

NEW
618
        let accepted = Self::normalized_shard_extensions(config);
×
619

NEW
620
        let siblings = info
×
NEW
621
            .siblings
×
NEW
622
            .into_iter()
×
NEW
623
            .map(|entry| entry.rfilename)
×
NEW
624
            .collect::<Vec<_>>();
×
625

NEW
626
        Self::resolve_remote_candidates_from_siblings(config, &siblings, &accepted)
×
NEW
627
    }
×
628

629
    /// Return the persistence file path for shard sequence state.
630
    fn shard_sequence_state_path(config: &HuggingFaceRowsConfig) -> PathBuf {
19✔
631
        config
19✔
632
            .snapshot_dir
19✔
633
            .join("_parquet_manifest")
19✔
634
            .join(SHARD_SEQUENCE_STATE_FILE)
19✔
635
    }
19✔
636

637
    /// Load persisted shard candidate sequence when metadata and sampler seed match.
638
    #[cfg(test)]
639
    fn load_persisted_shard_sequence(
6✔
640
        config: &HuggingFaceRowsConfig,
6✔
641
        current_sampler_seed: u64,
6✔
642
    ) -> Result<Option<PersistedShardSequence>, SamplerError> {
6✔
643
        let path = Self::shard_sequence_state_path(config);
6✔
644
        if !path.exists() {
6✔
645
            return Ok(None);
1✔
646
        }
5✔
647

648
        let raw = fs::read_to_string(&path).map_err(|err| SamplerError::SourceUnavailable {
5✔
NEW
649
            source_id: config.source_id.clone(),
×
NEW
650
            reason: format!(
×
651
                "failed reading shard-sequence state {}: {err}",
NEW
652
                path.display()
×
653
            ),
NEW
654
        })?;
×
655

656
        let mut persisted: PersistedShardSequence =
4✔
657
            serde_json::from_str(&raw).map_err(|err| SamplerError::SourceUnavailable {
5✔
658
                source_id: config.source_id.clone(),
1✔
659
                reason: format!(
1✔
660
                    "failed parsing shard-sequence state {}: {err}",
661
                    path.display()
1✔
662
                ),
663
            })?;
1✔
664

665
        if persisted.version != SHARD_SEQUENCE_STATE_VERSION
4✔
666
            || persisted.source_id != config.source_id
4✔
667
            || persisted.dataset != config.dataset
3✔
668
            || persisted.config != config.config
3✔
669
            || persisted.split != config.split
3✔
670
            || persisted.sampler_seed != current_sampler_seed
3✔
671
        {
672
            warn!(
2✔
673
                "[triplets:hf] shard-sequence state mismatch for {}; rebuilding candidate order",
674
                path.display()
2✔
675
            );
676
            return Ok(None);
2✔
677
        }
2✔
678

679
        if persisted.next_remote_idx > persisted.candidates.len() {
2✔
680
            persisted.next_remote_idx = persisted.candidates.len();
1✔
681
        }
1✔
682

683
        Ok(Some(persisted))
2✔
684
    }
6✔
685

686
    /// Persist current shard candidate sequence and position atomically.
687
    fn persist_shard_sequence_locked(&self, state: &SourceState) -> Result<(), SamplerError> {
8✔
688
        let Some(candidates) = state.remote_candidates.as_ref() else {
8✔
689
            return Ok(());
1✔
690
        };
691

692
        let path = Self::shard_sequence_state_path(&self.config);
7✔
693
        if let Some(parent) = path.parent() {
7✔
694
            fs::create_dir_all(parent).map_err(|err| SamplerError::SourceUnavailable {
7✔
NEW
695
                source_id: self.config.source_id.clone(),
×
NEW
696
                reason: format!(
×
697
                    "failed creating shard-sequence state dir {}: {err}",
NEW
698
                    parent.display()
×
699
                ),
NEW
700
            })?;
×
NEW
701
        }
×
702

703
        let persisted = PersistedShardSequence {
7✔
704
            version: SHARD_SEQUENCE_STATE_VERSION,
705
            source_id: self.config.source_id.clone(),
7✔
706
            dataset: self.config.dataset.clone(),
7✔
707
            config: self.config.config.clone(),
7✔
708
            split: self.config.split.clone(),
7✔
709
            sampler_seed: self.configured_sampler_seed()?,
7✔
710
            candidates: candidates.clone(),
7✔
711
            candidate_sizes: state.remote_candidate_sizes.clone(),
7✔
712
            next_remote_idx: state.next_remote_idx.min(candidates.len()),
7✔
713
        };
714

715
        let raw = serde_json::to_vec_pretty(&persisted).map_err(|err| {
7✔
NEW
716
            SamplerError::SourceUnavailable {
×
NEW
717
                source_id: self.config.source_id.clone(),
×
NEW
718
                reason: format!(
×
NEW
719
                    "failed encoding shard-sequence state {}: {err}",
×
NEW
720
                    path.display()
×
NEW
721
                ),
×
NEW
722
            }
×
NEW
723
        })?;
×
724

725
        let tmp_path = path.with_extension("tmp");
7✔
726
        fs::write(&tmp_path, raw).map_err(|err| SamplerError::SourceUnavailable {
7✔
NEW
727
            source_id: self.config.source_id.clone(),
×
NEW
728
            reason: format!(
×
729
                "failed writing shard-sequence state temp {}: {err}",
NEW
730
                tmp_path.display()
×
731
            ),
NEW
732
        })?;
×
733
        fs::rename(&tmp_path, &path).map_err(|err| SamplerError::SourceUnavailable {
7✔
NEW
734
            source_id: self.config.source_id.clone(),
×
NEW
735
            reason: format!(
×
736
                "failed replacing shard-sequence state {}: {err}",
NEW
737
                path.display()
×
738
            ),
NEW
739
        })?;
×
740

741
        Ok(())
7✔
742
    }
8✔
743

744
    /// Rotate candidate ordering deterministically using source identity.
745
    fn rotate_candidates_deterministically(
2✔
746
        config: &HuggingFaceRowsConfig,
2✔
747
        candidates: &mut [String],
2✔
748
    ) {
2✔
749
        if candidates.len() <= 1 {
2✔
750
            return;
1✔
751
        }
1✔
752
        let mut hasher = DefaultHasher::new();
1✔
753
        config.source_id.hash(&mut hasher);
1✔
754
        config.dataset.hash(&mut hasher);
1✔
755
        config.config.hash(&mut hasher);
1✔
756
        config.split.hash(&mut hasher);
1✔
757
        let offset = (hasher.finish() as usize) % candidates.len();
1✔
758
        candidates.rotate_left(offset);
1✔
759
    }
2✔
760

761
    /// Build deterministic seed used to permute remote shard candidate order.
762
    fn shard_candidate_seed(
14✔
763
        config: &HuggingFaceRowsConfig,
14✔
764
        total_candidates: usize,
14✔
765
        sampler_seed: u64,
14✔
766
    ) -> u64 {
14✔
767
        let mut hasher = DefaultHasher::new();
14✔
768
        "hf_shard_candidate_sequence_v1".hash(&mut hasher);
14✔
769
        sampler_seed.hash(&mut hasher);
14✔
770
        config.source_id.hash(&mut hasher);
14✔
771
        config.dataset.hash(&mut hasher);
14✔
772
        config.config.hash(&mut hasher);
14✔
773
        config.split.hash(&mut hasher);
14✔
774
        total_candidates.hash(&mut hasher);
14✔
775
        hasher.finish()
14✔
776
    }
14✔
777

778
    /// Query datasets-server parquet manifest and derive shard candidates.
NEW
779
    fn list_remote_candidates_from_parquet_manifest(
×
NEW
780
        config: &HuggingFaceRowsConfig,
×
NEW
781
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
×
NEW
782
        let endpoint = "https://datasets-server.huggingface.co/parquet";
×
NEW
783
        info!(
×
784
            "[triplets:hf] reading datasets-server parquet manifest for dataset {}",
785
            config.dataset
786
        );
NEW
787
        let response = ureq::get(endpoint)
×
NEW
788
            .query("dataset", &config.dataset)
×
NEW
789
            .query("config", &config.config)
×
NEW
790
            .query("split", &config.split)
×
NEW
791
            .call()
×
NEW
792
            .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
793
                source_id: config.source_id.clone(),
×
NEW
794
                reason: format!("failed querying datasets-server parquet endpoint: {err}"),
×
NEW
795
            })?;
×
796

NEW
797
        let body = response.into_body().read_to_string().map_err(|err| {
×
NEW
798
            SamplerError::SourceUnavailable {
×
NEW
799
                source_id: config.source_id.clone(),
×
NEW
800
                reason: format!("failed reading datasets-server parquet response body: {err}"),
×
NEW
801
            }
×
NEW
802
        })?;
×
803

NEW
804
        Self::parse_parquet_manifest_response(config, &body)
×
NEW
805
    }
×
806

807
    fn parse_parquet_manifest_response(
2✔
808
        config: &HuggingFaceRowsConfig,
2✔
809
        body: &str,
2✔
810
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
2✔
811
        let json: Value =
1✔
812
            serde_json::from_str(body).map_err(|err| SamplerError::SourceUnavailable {
2✔
813
                source_id: config.source_id.clone(),
1✔
814
                reason: format!("failed parsing datasets-server parquet response: {err}"),
1✔
815
            })?;
1✔
816

817
        Self::candidates_from_parquet_manifest_json(config, &json)
1✔
818
    }
2✔
819

820
    /// Map a candidate identifier to the local snapshot target path.
821
    fn candidate_target_path(config: &HuggingFaceRowsConfig, candidate: &str) -> PathBuf {
33✔
822
        if let Some(url) = candidate.strip_prefix(REMOTE_URL_PREFIX) {
33✔
823
            let suffix = url
25✔
824
                .split("/resolve/")
25✔
825
                .nth(1)
25✔
826
                .map(|value| value.trim_start_matches('/'))
25✔
827
                .filter(|value| !value.is_empty())
25✔
828
                .unwrap_or("parquet/unknown.parquet");
25✔
829
            return config.snapshot_dir.join("_parquet_manifest").join(suffix);
25✔
830
        }
8✔
831
        config.snapshot_dir.join(candidate)
8✔
832
    }
33✔
833

834
    /// Validate target file size against expected bytes when available.
835
    fn target_matches_expected_size(path: &Path, expected_bytes: Option<u64>) -> bool {
9✔
836
        if !path.exists() {
9✔
837
            return false;
1✔
838
        }
8✔
839
        if let Some(expected) = expected_bytes
8✔
840
            && expected > 0
7✔
841
        {
842
            return fs::metadata(path)
7✔
843
                .map(|meta| meta.len() == expected)
7✔
844
                .unwrap_or(false);
7✔
845
        }
1✔
846
        true
1✔
847
    }
9✔
848

849
    /// Return root directory used for manifest-cached remote shards.
850
    fn manifest_cache_root(&self) -> PathBuf {
18✔
851
        self.config.snapshot_dir.join("_parquet_manifest")
18✔
852
    }
18✔
853

854
    /// Return on-disk size for a shard path, or 0 if metadata lookup fails.
855
    fn shard_size_bytes(path: &Path) -> u64 {
17✔
856
        fs::metadata(path).map(|meta| meta.len()).unwrap_or(0)
17✔
857
    }
17✔
858

859
    /// Recompute shard `global_start` offsets and total materialized row count.
860
    fn recompute_shard_offsets(state: &mut SourceState) {
2✔
861
        let mut running = 0usize;
2✔
862
        for shard in &mut state.shards {
3✔
863
            shard.global_start = running;
3✔
864
            running = running.saturating_add(shard.row_count);
3✔
865
        }
3✔
866
        state.materialized_rows = running;
2✔
867
    }
2✔
868

869
    /// Enforce local disk cap by evicting oldest manifest shards when possible.
870
    fn enforce_disk_cap_locked(
9✔
871
        &self,
9✔
872
        state: &mut SourceState,
9✔
873
        protected_path: &Path,
9✔
874
    ) -> Result<bool, SamplerError> {
9✔
875
        let Some(cap_bytes) = self.config.local_disk_cap_bytes else {
9✔
876
            return Ok(false);
1✔
877
        };
878

879
        let manifest_root = self.manifest_cache_root();
8✔
880
        let mut usage_bytes = state
8✔
881
            .shards
8✔
882
            .iter()
8✔
883
            .filter(|shard| shard.path.starts_with(&manifest_root))
9✔
884
            .map(|shard| Self::shard_size_bytes(&shard.path))
9✔
885
            .sum::<u64>();
8✔
886

887
        if usage_bytes <= cap_bytes {
8✔
888
            return Ok(false);
6✔
889
        }
2✔
890

891
        let mut evicted_any = false;
2✔
892
        loop {
893
            if usage_bytes <= cap_bytes {
3✔
894
                break;
1✔
895
            }
2✔
896

897
            let resident_manifest_count = state
2✔
898
                .shards
2✔
899
                .iter()
2✔
900
                .filter(|shard| shard.path.starts_with(&manifest_root))
3✔
901
                .count();
2✔
902
            if resident_manifest_count <= self.config.min_resident_shards {
2✔
903
                break;
1✔
904
            }
1✔
905

906
            let evict_pos = state.shards.iter().position(|shard| {
1✔
907
                shard.path.starts_with(&manifest_root) && shard.path != protected_path
1✔
908
            });
1✔
909
            let Some(pos) = evict_pos else {
1✔
NEW
910
                break;
×
911
            };
912

913
            let shard = state.shards.remove(pos);
1✔
914
            let shard_size = Self::shard_size_bytes(&shard.path);
1✔
915
            if let Err(err) = fs::remove_file(&shard.path)
1✔
NEW
916
                && err.kind() != std::io::ErrorKind::NotFound
×
917
            {
NEW
918
                return Err(SamplerError::SourceUnavailable {
×
NEW
919
                    source_id: self.config.source_id.clone(),
×
NEW
920
                    reason: format!(
×
NEW
921
                        "failed evicting shard {} under disk cap: {err}",
×
NEW
922
                        shard.path.display()
×
NEW
923
                    ),
×
NEW
924
                });
×
925
            }
1✔
926

927
            usage_bytes = usage_bytes.saturating_sub(shard_size);
1✔
928
            evicted_any = true;
1✔
929
            warn!(
1✔
930
                "[triplets:hf] evicted shard for disk cap: {} (usage={:.2} GiB cap={:.2} GiB)",
931
                shard.path.display(),
1✔
932
                usage_bytes as f64 / (1024.0 * 1024.0 * 1024.0),
1✔
933
                cap_bytes as f64 / (1024.0 * 1024.0 * 1024.0)
1✔
934
            );
935
        }
936

937
        if usage_bytes > cap_bytes {
2✔
938
            if protected_path.exists() {
1✔
939
                let _ = fs::remove_file(protected_path);
1✔
940
            }
1✔
941
            return Err(SamplerError::SourceUnavailable {
1✔
942
                source_id: self.config.source_id.clone(),
1✔
943
                reason: format!(
1✔
944
                    "local disk cap exceeded and cannot evict further (usage={} bytes cap={} bytes)",
1✔
945
                    usage_bytes, cap_bytes
1✔
946
                ),
1✔
947
            });
1✔
948
        }
1✔
949

950
        if evicted_any {
1✔
951
            Self::recompute_shard_offsets(state);
1✔
952
        }
1✔
953
        Ok(evicted_any)
1✔
954
    }
9✔
955

956
    /// Return total on-disk bytes used by manifest-backed shards.
957
    fn manifest_usage_bytes_locked(&self, state: &SourceState) -> u64 {
6✔
958
        let manifest_root = self.manifest_cache_root();
6✔
959
        state
6✔
960
            .shards
6✔
961
            .iter()
6✔
962
            .filter(|shard| shard.path.starts_with(&manifest_root))
7✔
963
            .map(|shard| Self::shard_size_bytes(&shard.path))
6✔
964
            .sum::<u64>()
6✔
965
    }
6✔
966

967
    /// Fetch exact split row count metadata from datasets-server size endpoint.
968
    fn fetch_global_row_count(
2✔
969
        config: &HuggingFaceRowsConfig,
2✔
970
    ) -> Result<Option<usize>, SamplerError> {
2✔
971
        let endpoint = "https://datasets-server.huggingface.co/size";
2✔
972
        info!(
2✔
973
            "[triplets:hf] requesting global row count dataset='{}' config='{}' split='{}'",
974
            config.dataset, config.config, config.split
975
        );
976

977
        let response = ureq::get(endpoint)
2✔
978
            .query("dataset", &config.dataset)
2✔
979
            .query("config", &config.config)
2✔
980
            .query("split", &config.split)
2✔
981
            .call()
2✔
982
            .map_err(|err| SamplerError::SourceUnavailable {
2✔
983
                source_id: config.source_id.clone(),
2✔
984
                reason: format!("failed querying datasets-server size endpoint: {err}"),
2✔
985
            })?;
2✔
986

NEW
987
        let body = response.into_body().read_to_string().map_err(|err| {
×
NEW
988
            SamplerError::SourceUnavailable {
×
NEW
989
                source_id: config.source_id.clone(),
×
NEW
990
                reason: format!("failed reading datasets-server size response body: {err}"),
×
NEW
991
            }
×
NEW
992
        })?;
×
993

NEW
994
        Self::parse_global_row_count_response(config, &body)
×
995
    }
2✔
996

997
    fn parse_global_row_count_response(
3✔
998
        config: &HuggingFaceRowsConfig,
3✔
999
        body: &str,
3✔
1000
    ) -> Result<Option<usize>, SamplerError> {
3✔
1001
        let json: Value =
2✔
1002
            serde_json::from_str(body).map_err(|err| SamplerError::SourceUnavailable {
3✔
1003
                source_id: config.source_id.clone(),
1✔
1004
                reason: format!("failed parsing datasets-server size response: {err}"),
1✔
1005
            })?;
1✔
1006

1007
        let mut count =
2✔
1008
            Self::extract_split_row_count_from_size_response(&json, &config.config, &config.split);
2✔
1009
        if let (Some(max_rows), Some(rows)) = (config.max_rows, count) {
2✔
1010
            count = Some(rows.min(max_rows));
1✔
1011
        }
1✔
1012
        Ok(count)
2✔
1013
    }
3✔
1014

1015
    /// Extract split row count from datasets-server size payload variants.
1016
    fn extract_split_row_count_from_size_response(
8✔
1017
        json: &Value,
8✔
1018
        config_name: &str,
8✔
1019
        split_name: &str,
8✔
1020
    ) -> Option<usize> {
8✔
1021
        let to_usize = |value: &Value| value.as_u64().and_then(|raw| usize::try_from(raw).ok());
8✔
1022

1023
        let size = json.get("size")?;
8✔
1024

1025
        if let Some(splits) = size.get("splits").and_then(Value::as_array) {
8✔
1026
            for entry in splits {
4✔
1027
                let entry_config = entry
4✔
1028
                    .get("config")
4✔
1029
                    .or_else(|| entry.get("config_name"))
4✔
1030
                    .and_then(Value::as_str)
4✔
1031
                    .unwrap_or_default();
4✔
1032
                let entry_split = entry
4✔
1033
                    .get("split")
4✔
1034
                    .or_else(|| entry.get("name"))
4✔
1035
                    .and_then(Value::as_str)
4✔
1036
                    .unwrap_or_default();
4✔
1037
                if entry_config == config_name
4✔
1038
                    && entry_split == split_name
3✔
1039
                    && let Some(rows) = entry.get("num_rows").and_then(to_usize)
2✔
1040
                {
1041
                    return Some(rows);
2✔
1042
                }
2✔
1043
            }
1044
        }
5✔
1045

1046
        if let Some(configs) = size.get("configs").and_then(Value::as_array) {
6✔
1047
            for config_entry in configs {
4✔
1048
                let entry_config = config_entry
4✔
1049
                    .get("config")
4✔
1050
                    .or_else(|| config_entry.get("config_name"))
4✔
1051
                    .and_then(Value::as_str)
4✔
1052
                    .unwrap_or_default();
4✔
1053
                if entry_config != config_name {
4✔
1054
                    continue;
1✔
1055
                }
3✔
1056

1057
                if let Some(splits) = config_entry.get("splits").and_then(Value::as_array) {
3✔
1058
                    for split_entry in splits {
3✔
1059
                        let entry_split = split_entry
3✔
1060
                            .get("split")
3✔
1061
                            .or_else(|| split_entry.get("name"))
3✔
1062
                            .and_then(Value::as_str)
3✔
1063
                            .unwrap_or_default();
3✔
1064
                        if entry_split == split_name
3✔
1065
                            && let Some(rows) = split_entry.get("num_rows").and_then(to_usize)
1✔
1066
                        {
1067
                            return Some(rows);
1✔
1068
                        }
2✔
1069
                    }
NEW
1070
                }
×
1071

1072
                if split_name.is_empty()
2✔
1073
                    && let Some(rows) = config_entry.get("num_rows").and_then(to_usize)
2✔
1074
                {
1075
                    return Some(rows);
2✔
NEW
1076
                }
×
1077
            }
1078
        }
2✔
1079

1080
        if split_name.is_empty() {
3✔
1081
            return size
1✔
1082
                .get("dataset")
1✔
1083
                .and_then(|dataset| dataset.get("num_rows"))
1✔
1084
                .and_then(to_usize);
1✔
1085
        }
2✔
1086

1087
        None
2✔
1088
    }
8✔
1089

1090
    /// Download a shard (URL or hf-hub path) and materialize it under snapshot dir.
1091
    fn download_and_materialize_shard(
11✔
1092
        config: &HuggingFaceRowsConfig,
11✔
1093
        remote_path: &str,
11✔
1094
        expected_bytes: Option<u64>,
11✔
1095
    ) -> Result<PathBuf, SamplerError> {
11✔
1096
        if let Some(remote_url) = remote_path.strip_prefix(REMOTE_URL_PREFIX) {
11✔
1097
            let target = Self::candidate_target_path(config, remote_path);
11✔
1098
            if target.exists() {
11✔
1099
                if Self::target_matches_expected_size(&target, expected_bytes) {
2✔
1100
                    return Ok(target);
1✔
1101
                }
1✔
1102
                warn!(
1✔
1103
                    "[triplets:hf] replacing incomplete shard before retry: {}",
1104
                    target.display()
1✔
1105
                );
1106
                fs::remove_file(&target).map_err(|err| SamplerError::SourceUnavailable {
1✔
NEW
1107
                    source_id: config.source_id.clone(),
×
NEW
1108
                    reason: format!(
×
1109
                        "failed removing incomplete shard {}: {err}",
NEW
1110
                        target.display()
×
1111
                    ),
NEW
1112
                })?;
×
1113
            }
9✔
1114

1115
            if let Some(parent) = target.parent() {
10✔
1116
                fs::create_dir_all(parent).map_err(|err| SamplerError::SourceUnavailable {
10✔
NEW
1117
                    source_id: config.source_id.clone(),
×
NEW
1118
                    reason: format!(
×
1119
                        "failed creating snapshot subdir {}: {err}",
NEW
1120
                        parent.display()
×
1121
                    ),
NEW
1122
                })?;
×
NEW
1123
            }
×
1124

1125
            let temp_target = target.with_extension("part");
10✔
1126
            if temp_target.exists() {
10✔
1127
                let _ = fs::remove_file(&temp_target);
1✔
1128
            }
9✔
1129

1130
            let response =
10✔
1131
                ureq::get(remote_url)
10✔
1132
                    .call()
10✔
1133
                    .map_err(|err| SamplerError::SourceUnavailable {
10✔
NEW
1134
                        source_id: config.source_id.clone(),
×
NEW
1135
                        reason: format!("failed downloading shard URL '{}': {err}", remote_url),
×
NEW
1136
                    })?;
×
1137
            let mut reader = response.into_body().into_reader();
10✔
1138
            let mut file =
10✔
1139
                File::create(&temp_target).map_err(|err| SamplerError::SourceUnavailable {
10✔
NEW
1140
                    source_id: config.source_id.clone(),
×
NEW
1141
                    reason: format!(
×
1142
                        "failed creating target shard {}: {err}",
NEW
1143
                        temp_target.display()
×
1144
                    ),
NEW
1145
                })?;
×
1146
            info!(
10✔
1147
                "[triplets:hf] downloading shard payload -> {}",
1148
                target.display()
10✔
1149
            );
1150
            let started = Instant::now();
10✔
1151
            let mut total_bytes = 0u64;
10✔
1152
            let mut buffer = vec![0u8; 8 * 1024 * 1024];
10✔
1153
            let mut last_report = Instant::now();
10✔
1154
            loop {
1155
                let read =
19✔
1156
                    reader
19✔
1157
                        .read(&mut buffer)
19✔
1158
                        .map_err(|err| SamplerError::SourceUnavailable {
19✔
NEW
1159
                            source_id: config.source_id.clone(),
×
NEW
1160
                            reason: format!("failed reading shard stream '{}': {err}", remote_url),
×
NEW
1161
                        })?;
×
1162
                if read == 0 {
19✔
1163
                    break;
10✔
1164
                }
9✔
1165
                file.write_all(&buffer[..read])
9✔
1166
                    .map_err(|err| SamplerError::SourceUnavailable {
9✔
NEW
1167
                        source_id: config.source_id.clone(),
×
NEW
1168
                        reason: format!(
×
1169
                            "failed writing target shard {}: {err}",
NEW
1170
                            temp_target.display()
×
1171
                        ),
NEW
1172
                    })?;
×
1173
                total_bytes = total_bytes.saturating_add(read as u64);
9✔
1174
                if last_report.elapsed() >= Duration::from_secs(2) {
9✔
NEW
1175
                    let elapsed = started.elapsed().as_secs_f64();
×
NEW
1176
                    if let Some(expected) = expected_bytes
×
NEW
1177
                        && expected > 0
×
1178
                    {
NEW
1179
                        let pct =
×
NEW
1180
                            ((total_bytes as f64 / expected as f64) * 100.0).clamp(0.0, 100.0);
×
NEW
1181
                        let rate = if elapsed > 0.0 {
×
NEW
1182
                            total_bytes as f64 / elapsed
×
1183
                        } else {
NEW
1184
                            0.0
×
1185
                        };
NEW
1186
                        let eta_secs = if rate > 0.0 && total_bytes < expected {
×
NEW
1187
                            (expected.saturating_sub(total_bytes) as f64) / rate
×
1188
                        } else {
NEW
1189
                            0.0
×
1190
                        };
NEW
1191
                        info!(
×
1192
                            "[triplets:hf] download progress {}: {:.1}/{:.1} MiB ({:.1}%, {:.1}s elapsed, ETA {:.1}s)",
NEW
1193
                            target.display(),
×
NEW
1194
                            total_bytes as f64 / (1024.0 * 1024.0),
×
NEW
1195
                            expected as f64 / (1024.0 * 1024.0),
×
1196
                            pct,
1197
                            elapsed,
NEW
1198
                            eta_secs.max(0.0)
×
1199
                        );
1200
                    } else {
NEW
1201
                        info!(
×
1202
                            "[triplets:hf] download progress {}: {:.1} MiB ({:.1}s)",
NEW
1203
                            target.display(),
×
NEW
1204
                            total_bytes as f64 / (1024.0 * 1024.0),
×
1205
                            elapsed
1206
                        );
1207
                    }
NEW
1208
                    last_report = Instant::now();
×
1209
                }
9✔
1210
            }
1211
            let elapsed = started.elapsed().as_secs_f64();
10✔
1212
            if let Some(expected) = expected_bytes
10✔
1213
                && expected > 0
3✔
1214
            {
1215
                let pct = ((total_bytes as f64 / expected as f64) * 100.0).clamp(0.0, 100.0);
3✔
1216
                info!(
3✔
1217
                    "[triplets:hf] download complete {}: {:.1}/{:.1} MiB ({:.1}%) in {:.1}s",
1218
                    target.display(),
3✔
1219
                    total_bytes as f64 / (1024.0 * 1024.0),
3✔
1220
                    expected as f64 / (1024.0 * 1024.0),
3✔
1221
                    pct,
1222
                    elapsed
1223
                );
1224
            } else {
1225
                info!(
7✔
1226
                    "[triplets:hf] download complete {}: {:.1} MiB in {:.1}s",
1227
                    target.display(),
7✔
1228
                    total_bytes as f64 / (1024.0 * 1024.0),
7✔
1229
                    elapsed
1230
                );
1231
            }
1232

1233
            fs::rename(&temp_target, &target).map_err(|err| SamplerError::SourceUnavailable {
10✔
NEW
1234
                source_id: config.source_id.clone(),
×
NEW
1235
                reason: format!(
×
1236
                    "failed moving downloaded shard {} -> {}: {err}",
NEW
1237
                    temp_target.display(),
×
NEW
1238
                    target.display()
×
1239
                ),
NEW
1240
            })?;
×
1241
            return Ok(target);
10✔
NEW
1242
        }
×
1243

NEW
1244
        let api = ApiBuilder::new()
×
NEW
1245
            .with_progress(true)
×
NEW
1246
            .with_retries(5)
×
NEW
1247
            .with_token(None)
×
NEW
1248
            .build()
×
NEW
1249
            .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
1250
                source_id: config.source_id.clone(),
×
NEW
1251
                reason: format!("failed building hf-hub client: {err}"),
×
NEW
1252
            })?;
×
1253

NEW
1254
        let repo = Repo::new(config.dataset.clone(), RepoType::Dataset);
×
NEW
1255
        let repo_api = api.repo(repo);
×
1256

NEW
1257
        let mut local_cached =
×
NEW
1258
            repo_api
×
NEW
1259
                .get(remote_path)
×
NEW
1260
                .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
1261
                    source_id: config.source_id.clone(),
×
NEW
1262
                    reason: format!("failed downloading '{}' from hf-hub: {err}", remote_path),
×
NEW
1263
                })?;
×
NEW
1264
        if !local_cached.exists() {
×
NEW
1265
            for _ in 0..5 {
×
NEW
1266
                local_cached = repo_api.download(remote_path).map_err(|err| {
×
NEW
1267
                    SamplerError::SourceUnavailable {
×
NEW
1268
                        source_id: config.source_id.clone(),
×
NEW
1269
                        reason: format!(
×
NEW
1270
                            "hf-hub returned missing cache path for '{}', and forced download failed: {err}",
×
NEW
1271
                            remote_path
×
NEW
1272
                        ),
×
NEW
1273
                    }
×
NEW
1274
                })?;
×
NEW
1275
                if local_cached.exists() {
×
NEW
1276
                    break;
×
NEW
1277
                }
×
NEW
1278
                thread::sleep(Duration::from_millis(400));
×
1279
            }
NEW
1280
        }
×
NEW
1281
        if !local_cached.exists() {
×
NEW
1282
            return Err(SamplerError::SourceUnavailable {
×
NEW
1283
                source_id: config.source_id.clone(),
×
NEW
1284
                reason: format!(
×
NEW
1285
                    "hf-hub returned non-existent cache file for '{}' at {}",
×
NEW
1286
                    remote_path,
×
NEW
1287
                    local_cached.display()
×
NEW
1288
                ),
×
NEW
1289
            });
×
NEW
1290
        }
×
1291

NEW
1292
        let target = Self::candidate_target_path(config, remote_path);
×
NEW
1293
        Self::materialize_local_file(config, &local_cached, &target)?;
×
NEW
1294
        Ok(target)
×
1295
    }
11✔
1296

1297
    /// Build shard metadata for a single local file.
1298
    fn index_single_shard(
28✔
1299
        config: &HuggingFaceRowsConfig,
28✔
1300
        path: &Path,
28✔
1301
        global_start: usize,
28✔
1302
    ) -> Result<Option<ShardIndex>, SamplerError> {
28✔
1303
        let is_parquet = path
28✔
1304
            .extension()
28✔
1305
            .and_then(|v| v.to_str())
28✔
1306
            .is_some_and(|ext| ext.eq_ignore_ascii_case("parquet"));
28✔
1307

1308
        let (rows, parquet_row_groups, checkpoints) = if is_parquet {
28✔
1309
            let (rows, parquet_row_groups) = Self::parquet_row_group_map(config, path)?;
2✔
1310
            (rows, parquet_row_groups, Vec::new())
2✔
1311
        } else {
1312
            let file = File::open(path).map_err(|err| SamplerError::SourceUnavailable {
26✔
NEW
1313
                source_id: config.source_id.clone(),
×
NEW
1314
                reason: format!("failed opening shard {}: {err}", path.display()),
×
NEW
1315
            })?;
×
1316
            let mut reader = BufReader::new(file);
26✔
1317
            let mut checkpoints = Vec::new();
26✔
1318
            let mut line = String::new();
26✔
1319
            let mut offset = 0u64;
26✔
1320
            let mut rows = 0usize;
26✔
1321

1322
            loop {
1323
                if rows.is_multiple_of(config.checkpoint_stride) {
10,073✔
1324
                    checkpoints.push(offset);
91✔
1325
                }
9,982✔
1326
                line.clear();
10,073✔
1327
                let bytes =
10,073✔
1328
                    reader
10,073✔
1329
                        .read_line(&mut line)
10,073✔
1330
                        .map_err(|err| SamplerError::SourceUnavailable {
10,073✔
NEW
1331
                            source_id: config.source_id.clone(),
×
NEW
1332
                            reason: format!("failed reading shard {}: {err}", path.display()),
×
NEW
1333
                        })?;
×
1334
                if bytes == 0 {
10,073✔
1335
                    break;
26✔
1336
                }
10,047✔
1337
                rows += 1;
10,047✔
1338
                offset = offset.saturating_add(bytes as u64);
10,047✔
1339
            }
1340

1341
            (rows, Vec::new(), checkpoints)
26✔
1342
        };
1343

1344
        if rows == 0 {
28✔
1345
            return Ok(None);
3✔
1346
        }
25✔
1347

1348
        Ok(Some(ShardIndex {
25✔
1349
            path: path.to_path_buf(),
25✔
1350
            global_start,
25✔
1351
            row_count: rows,
25✔
1352
            is_parquet,
25✔
1353
            parquet_row_groups,
25✔
1354
            checkpoints,
25✔
1355
        }))
25✔
1356
    }
28✔
1357

1358
    /// Build parquet row-group map for random-access row reads.
1359
    fn parquet_row_group_map(
3✔
1360
        config: &HuggingFaceRowsConfig,
3✔
1361
        path: &Path,
3✔
1362
    ) -> Result<(usize, Vec<(usize, usize)>), SamplerError> {
3✔
1363
        let file = File::open(path).map_err(|err| SamplerError::SourceUnavailable {
3✔
NEW
1364
            source_id: config.source_id.clone(),
×
NEW
1365
            reason: format!("failed opening parquet shard {}: {err}", path.display()),
×
NEW
1366
        })?;
×
1367
        let reader =
3✔
1368
            SerializedFileReader::new(file).map_err(|err| SamplerError::SourceUnavailable {
3✔
NEW
1369
                source_id: config.source_id.clone(),
×
NEW
1370
                reason: format!("failed reading parquet metadata {}: {err}", path.display()),
×
NEW
1371
            })?;
×
1372

1373
        let mut row_groups = Vec::new();
3✔
1374
        let mut running = 0usize;
3✔
1375
        for meta in reader.metadata().row_groups() {
3✔
1376
            let group_rows =
3✔
1377
                usize::try_from(meta.num_rows()).map_err(|_| SamplerError::SourceUnavailable {
3✔
NEW
1378
                    source_id: config.source_id.clone(),
×
NEW
1379
                    reason: format!("parquet row group size overflow in {}", path.display()),
×
NEW
1380
                })?;
×
1381
            if group_rows == 0 {
3✔
NEW
1382
                continue;
×
1383
            }
3✔
1384
            row_groups.push((running, group_rows));
3✔
1385
            running = running.saturating_add(group_rows);
3✔
1386
        }
1387
        if running > 0 {
3✔
1388
            return Ok((running, row_groups));
3✔
NEW
1389
        }
×
1390

NEW
1391
        let total_rows =
×
NEW
1392
            usize::try_from(reader.metadata().file_metadata().num_rows()).map_err(|_| {
×
NEW
1393
                SamplerError::SourceUnavailable {
×
NEW
1394
                    source_id: config.source_id.clone(),
×
NEW
1395
                    reason: format!("parquet row count overflow in {}", path.display()),
×
NEW
1396
                }
×
NEW
1397
            })?;
×
NEW
1398
        if total_rows == 0 {
×
NEW
1399
            return Ok((0, Vec::new()));
×
NEW
1400
        }
×
NEW
1401
        Ok((total_rows, vec![(0, total_rows)]))
×
1402
    }
3✔
1403

1404
    /// Ensure row index is available, expanding remote shard set lazily if needed.
1405
    fn ensure_row_available(&self, idx: usize) -> Result<bool, SamplerError> {
51✔
1406
        loop {
1407
            {
1408
                let state = self
54✔
1409
                    .state
54✔
1410
                    .lock()
54✔
1411
                    .map_err(|_| SamplerError::SourceUnavailable {
54✔
NEW
1412
                        source_id: self.config.source_id.clone(),
×
NEW
1413
                        reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1414
                    })?;
×
1415

1416
                if idx < state.materialized_rows {
54✔
1417
                    return Ok(true);
47✔
1418
                }
7✔
1419

1420
                if self.config.max_rows.is_some_and(|max_rows| idx >= max_rows) {
7✔
1421
                    return Ok(false);
1✔
1422
                }
6✔
1423

1424
                if let Some(candidates) = &state.remote_candidates
6✔
1425
                    && state.next_remote_idx >= candidates.len()
6✔
1426
                {
1427
                    return Ok(false);
3✔
1428
                }
3✔
1429
            }
1430

1431
            let need_candidates = {
3✔
1432
                let state = self
3✔
1433
                    .state
3✔
1434
                    .lock()
3✔
1435
                    .map_err(|_| SamplerError::SourceUnavailable {
3✔
NEW
1436
                        source_id: self.config.source_id.clone(),
×
NEW
1437
                        reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1438
                    })?;
×
1439
                state.remote_candidates.is_none()
3✔
1440
            };
1441

1442
            if need_candidates {
3✔
NEW
1443
                let mut state = self
×
NEW
1444
                    .state
×
NEW
1445
                    .lock()
×
NEW
1446
                    .map_err(|_| SamplerError::SourceUnavailable {
×
NEW
1447
                        source_id: self.config.source_id.clone(),
×
NEW
1448
                        reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1449
                    })?;
×
NEW
1450
                if state.remote_candidates.is_none() {
×
NEW
1451
                    let (mut candidates, candidate_sizes) =
×
NEW
1452
                        Self::list_remote_candidates(&self.config)?;
×
NEW
1453
                    Self::rotate_candidates_deterministically(&self.config, &mut candidates);
×
NEW
1454
                    state.remote_candidates = Some(candidates);
×
NEW
1455
                    state.remote_candidate_sizes = candidate_sizes;
×
NEW
1456
                    state.next_remote_idx = 0;
×
1457

NEW
1458
                    self.persist_shard_sequence_locked(&state)?;
×
1459

NEW
1460
                    let candidate_count = state
×
NEW
1461
                        .remote_candidates
×
NEW
1462
                        .as_ref()
×
NEW
1463
                        .map(|values| values.len())
×
NEW
1464
                        .unwrap_or(0);
×
NEW
1465
                    let bootstrap_needed = state.materialized_rows == 0
×
NEW
1466
                        && candidate_count > 0
×
NEW
1467
                        && state.next_remote_idx == 0;
×
NEW
1468
                    let known_rows = state.materialized_rows;
×
NEW
1469
                    let shard_count = state.shards.len();
×
NEW
1470
                    info!(
×
1471
                        "[triplets:hf] state: candidates={} known_rows={} active_shards={} disk_cap={} min_resident_shards={}",
1472
                        candidate_count,
1473
                        known_rows,
1474
                        shard_count,
NEW
1475
                        self.config
×
NEW
1476
                            .local_disk_cap_bytes
×
NEW
1477
                            .map(|bytes| format!(
×
1478
                                "{:.2} GiB",
NEW
1479
                                bytes as f64 / (1024.0 * 1024.0 * 1024.0)
×
1480
                            ))
NEW
1481
                            .unwrap_or_else(|| "disabled".to_string()),
×
1482
                        self.config.min_resident_shards,
1483
                    );
NEW
1484
                    drop(state);
×
1485

NEW
1486
                    if bootstrap_needed {
×
NEW
1487
                        let bootstrap_target = REMOTE_BOOTSTRAP_SHARDS.min(candidate_count);
×
NEW
1488
                        info!(
×
1489
                            "[triplets:hf] bootstrapping remote shard diversity: target={} shard(s)",
1490
                            bootstrap_target
1491
                        );
NEW
1492
                        for step in 0..bootstrap_target {
×
NEW
1493
                            info!(
×
1494
                                "[triplets:hf] bootstrap progress: {}/{}",
NEW
1495
                                step + 1,
×
1496
                                bootstrap_target
1497
                            );
NEW
1498
                            if !self.download_next_remote_shard()? {
×
NEW
1499
                                break;
×
NEW
1500
                            }
×
1501
                        }
NEW
1502
                        info!("[triplets:hf] bootstrap complete");
×
NEW
1503
                    }
×
NEW
1504
                } else {
×
NEW
1505
                    drop(state);
×
NEW
1506
                }
×
NEW
1507
                continue;
×
1508
            }
3✔
1509
            if !self.download_next_remote_shard()? {
3✔
NEW
1510
                return Ok(false);
×
1511
            }
3✔
1512
        }
1513
    }
51✔
1514

1515
    /// Download and register the next remote shard candidate.
1516
    fn download_next_remote_shard(&self) -> Result<bool, SamplerError> {
7✔
1517
        let (remote_ordinal, remote_total, remote_path, expected_bytes) = {
7✔
1518
            let mut state = self
7✔
1519
                .state
7✔
1520
                .lock()
7✔
1521
                .map_err(|_| SamplerError::SourceUnavailable {
7✔
NEW
1522
                    source_id: self.config.source_id.clone(),
×
NEW
1523
                    reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1524
                })?;
×
1525
            let Some(candidates) = &state.remote_candidates else {
7✔
NEW
1526
                return Ok(false);
×
1527
            };
1528
            if state.next_remote_idx >= candidates.len() {
7✔
NEW
1529
                return Ok(false);
×
1530
            }
7✔
1531
            let sequence_pos = state.next_remote_idx;
7✔
1532
            let remote_ordinal = sequence_pos + 1;
7✔
1533
            let remote_total = candidates.len();
7✔
1534
            let sampler_seed = self.configured_sampler_seed()?;
7✔
1535
            let seed = Self::shard_candidate_seed(&self.config, remote_total, sampler_seed);
7✔
1536
            let mut permutation =
7✔
1537
                crate::source::IndexPermutation::new(remote_total, seed, sequence_pos as u64);
7✔
1538
            let candidate_idx = permutation.next();
7✔
1539
            let remote_path = candidates[candidate_idx].clone();
7✔
1540
            let expected_bytes = state.remote_candidate_sizes.get(&remote_path).copied();
7✔
1541
            state.next_remote_idx += 1;
7✔
1542
            (remote_ordinal, remote_total, remote_path, expected_bytes)
7✔
1543
        };
1544

1545
        info!(
7✔
1546
            "[triplets:hf] lazy downloading shard {}/{}: {}",
1547
            remote_ordinal,
1548
            remote_total,
1549
            remote_path.as_str()
7✔
1550
        );
1551
        let local_path =
7✔
1552
            Self::download_and_materialize_shard(&self.config, &remote_path, expected_bytes)?;
7✔
1553

1554
        let global_start = {
7✔
1555
            let state = self
7✔
1556
                .state
7✔
1557
                .lock()
7✔
1558
                .map_err(|_| SamplerError::SourceUnavailable {
7✔
NEW
1559
                    source_id: self.config.source_id.clone(),
×
NEW
1560
                    reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1561
                })?;
×
1562
            state.materialized_rows
7✔
1563
        };
1564

1565
        let Some(shard) = Self::index_single_shard(&self.config, &local_path, global_start)? else {
7✔
1566
            warn!(
1✔
1567
                "[triplets:hf] downloaded shard had zero rows and was skipped: {}",
1568
                local_path.display()
1✔
1569
            );
1570
            return Ok(true);
1✔
1571
        };
1572

1573
        let mut state = self
6✔
1574
            .state
6✔
1575
            .lock()
6✔
1576
            .map_err(|_| SamplerError::SourceUnavailable {
6✔
NEW
1577
                source_id: self.config.source_id.clone(),
×
NEW
1578
                reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1579
            })?;
×
1580

1581
        if self
6✔
1582
            .config
6✔
1583
            .max_rows
6✔
1584
            .is_some_and(|max_rows| state.materialized_rows >= max_rows)
6✔
1585
        {
1586
            return Ok(true);
1✔
1587
        }
5✔
1588

1589
        let mut rows_to_add = shard.row_count;
5✔
1590
        if let Some(max_rows) = self.config.max_rows {
5✔
1591
            rows_to_add = rows_to_add.min(max_rows.saturating_sub(state.materialized_rows));
1✔
1592
        }
4✔
1593
        if rows_to_add == 0 {
5✔
NEW
1594
            return Ok(true);
×
1595
        }
5✔
1596

1597
        let mut shard = shard;
5✔
1598
        shard.global_start = state.materialized_rows;
5✔
1599
        shard.row_count = rows_to_add;
5✔
1600
        if shard.is_parquet {
5✔
NEW
1601
            shard
×
NEW
1602
                .parquet_row_groups
×
NEW
1603
                .retain(|(start, _)| *start < rows_to_add);
×
NEW
1604
            if let Some((start, count)) = shard.parquet_row_groups.last_mut() {
×
NEW
1605
                let allowed = rows_to_add.saturating_sub(*start);
×
NEW
1606
                *count = (*count).min(allowed);
×
NEW
1607
            }
×
1608
        }
5✔
1609
        state.materialized_rows += rows_to_add;
5✔
1610
        state.shards.push(shard);
5✔
1611

1612
        let evicted_any = self.enforce_disk_cap_locked(&mut state, &local_path)?;
5✔
1613
        self.persist_shard_sequence_locked(&state)?;
5✔
1614
        let materialized_rows = state.materialized_rows;
5✔
1615
        let shard_count = state.shards.len();
5✔
1616
        let remaining_candidates = state
5✔
1617
            .remote_candidates
5✔
1618
            .as_ref()
5✔
1619
            .map(|candidates| candidates.len().saturating_sub(state.next_remote_idx))
5✔
1620
            .unwrap_or(0);
5✔
1621
        let usage_bytes = self.manifest_usage_bytes_locked(&state);
5✔
1622
        let usage_gib = usage_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
5✔
1623
        let cap_str = self
5✔
1624
            .config
5✔
1625
            .local_disk_cap_bytes
5✔
1626
            .map(|bytes| format!("{:.2} GiB", bytes as f64 / (1024.0 * 1024.0 * 1024.0)))
5✔
1627
            .unwrap_or_else(|| "disabled".to_string());
5✔
1628
        drop(state);
5✔
1629

1630
        if evicted_any {
5✔
NEW
1631
            if let Ok(mut cache) = self.cache.lock() {
×
NEW
1632
                cache.rows.clear();
×
NEW
1633
                cache.order.clear();
×
NEW
1634
            }
×
NEW
1635
            if let Ok(mut parquet_cache) = self.parquet_cache.lock() {
×
NEW
1636
                parquet_cache.readers.clear();
×
NEW
1637
            }
×
1638
        }
5✔
1639

1640
        info!(
5✔
1641
            "[triplets:hf] state: rows={} shards={} remaining_candidates={} disk_usage={:.2} GiB cap={}",
1642
            materialized_rows, shard_count, remaining_candidates, usage_gib, cap_str,
1643
        );
1644

1645
        Ok(true)
5✔
1646
    }
7✔
1647

1648
    /// Copy cached/downloaded source file into snapshot tree.
1649
    fn materialize_local_file(
3✔
1650
        config: &HuggingFaceRowsConfig,
3✔
1651
        source_path: &Path,
3✔
1652
        target_path: &Path,
3✔
1653
    ) -> Result<(), SamplerError> {
3✔
1654
        let resolved_source =
3✔
1655
            fs::canonicalize(source_path).unwrap_or_else(|_| source_path.to_path_buf());
3✔
1656

1657
        if let Some(parent) = target_path.parent() {
3✔
1658
            fs::create_dir_all(parent).map_err(|err| SamplerError::SourceUnavailable {
3✔
NEW
1659
                source_id: config.source_id.clone(),
×
NEW
1660
                reason: format!(
×
1661
                    "failed creating snapshot subdir {}: {err}",
NEW
1662
                    parent.display()
×
1663
                ),
NEW
1664
            })?;
×
NEW
1665
        }
×
1666

1667
        if target_path.exists() {
3✔
1668
            let src_meta =
2✔
1669
                fs::metadata(&resolved_source).map_err(|err| SamplerError::SourceUnavailable {
2✔
NEW
1670
                    source_id: config.source_id.clone(),
×
NEW
1671
                    reason: format!(
×
1672
                        "failed reading source metadata {}: {err}",
NEW
1673
                        resolved_source.display()
×
1674
                    ),
NEW
1675
                })?;
×
1676
            let dst_meta =
2✔
1677
                fs::metadata(target_path).map_err(|err| SamplerError::SourceUnavailable {
2✔
NEW
1678
                    source_id: config.source_id.clone(),
×
NEW
1679
                    reason: format!(
×
1680
                        "failed reading target metadata {}: {err}",
NEW
1681
                        target_path.display()
×
1682
                    ),
NEW
1683
                })?;
×
1684
            if src_meta.len() == dst_meta.len() {
2✔
1685
                return Ok(());
1✔
1686
            }
1✔
1687
            fs::remove_file(target_path).map_err(|err| SamplerError::SourceUnavailable {
1✔
NEW
1688
                source_id: config.source_id.clone(),
×
NEW
1689
                reason: format!(
×
1690
                    "failed replacing target file {}: {err}",
NEW
1691
                    target_path.display()
×
1692
                ),
NEW
1693
            })?;
×
1694
        }
1✔
1695

1696
        fs::copy(&resolved_source, target_path).map_err(|err| SamplerError::SourceUnavailable {
2✔
NEW
1697
            source_id: config.source_id.clone(),
×
NEW
1698
            reason: format!(
×
1699
                "failed copying synced file {} -> {}: {err}",
NEW
1700
                resolved_source.display(),
×
NEW
1701
                target_path.display()
×
1702
            ),
NEW
1703
        })?;
×
1704
        Ok(())
2✔
1705
    }
3✔
1706

1707
    /// Build deterministic local shard index for accepted extensions.
1708
    fn build_shard_index(
8✔
1709
        config: &HuggingFaceRowsConfig,
8✔
1710
    ) -> Result<(Vec<ShardIndex>, usize), SamplerError> {
8✔
1711
        let start_index = Instant::now();
8✔
1712
        let mut shard_paths = Vec::new();
8✔
1713
        let manifest_root = config.snapshot_dir.join("_parquet_manifest");
8✔
1714
        let accepted = config
8✔
1715
            .shard_extensions
8✔
1716
            .iter()
8✔
1717
            .map(|ext| ext.trim().trim_start_matches('.').to_ascii_lowercase())
16✔
1718
            .collect::<Vec<_>>();
8✔
1719

1720
        let mut saw_parquet = false;
8✔
1721
        for entry in WalkDir::new(&config.snapshot_dir)
22✔
1722
            .follow_links(true)
8✔
1723
            .into_iter()
8✔
1724
            .filter_map(Result::ok)
8✔
1725
        {
1726
            if !entry.file_type().is_file() {
22✔
1727
                continue;
11✔
1728
            }
11✔
1729
            if entry.path().starts_with(&manifest_root) {
11✔
1730
                continue;
1✔
1731
            }
10✔
1732
            let Some(ext) = entry.path().extension().and_then(|v| v.to_str()) else {
10✔
NEW
1733
                continue;
×
1734
            };
1735
            if ext.eq_ignore_ascii_case("parquet") {
10✔
1736
                saw_parquet = true;
1✔
1737
            }
9✔
1738
            if accepted
10✔
1739
                .iter()
10✔
1740
                .any(|allowed| allowed == &ext.to_ascii_lowercase())
20✔
1741
            {
8✔
1742
                shard_paths.push(entry.path().to_path_buf());
8✔
1743
            }
8✔
1744
        }
1745

1746
        shard_paths.sort();
8✔
1747
        if shard_paths.is_empty() {
8✔
1748
            if saw_parquet && !accepted.iter().any(|value| value == "parquet") {
2✔
1749
                return Err(SamplerError::SourceUnavailable {
1✔
1750
                    source_id: config.source_id.clone(),
1✔
1751
                    reason: format!(
1✔
1752
                        "found parquet files under {}, but shard_extensions does not include parquet.",
1✔
1753
                        config.snapshot_dir.display()
1✔
1754
                    ),
1✔
1755
                });
1✔
1756
            }
1✔
1757
            return Err(SamplerError::SourceUnavailable {
1✔
1758
                source_id: config.source_id.clone(),
1✔
1759
                reason: format!(
1✔
1760
                    "no shard files found under {} with extensions {:?}",
1✔
1761
                    config.snapshot_dir.display(),
1✔
1762
                    config.shard_extensions
1✔
1763
                ),
1✔
1764
            });
1✔
1765
        }
6✔
1766

1767
        let mut indexed_shards = shard_paths
6✔
1768
            .into_par_iter()
6✔
1769
            .enumerate()
6✔
1770
            .map(|(ordinal, path)| {
8✔
1771
                info!(
8✔
1772
                    "[triplets:hf] indexing shard {}: {}",
1773
                    ordinal + 1,
6✔
1774
                    path.display()
6✔
1775
                );
1776
                let shard = Self::index_single_shard(config, &path, 0)?;
8✔
1777
                Ok::<_, SamplerError>((ordinal, shard))
8✔
1778
            })
8✔
1779
            .collect::<Result<Vec<_>, _>>()?;
6✔
1780

1781
        indexed_shards.sort_by_key(|(ordinal, _)| *ordinal);
6✔
1782

1783
        let mut shards = Vec::new();
6✔
1784
        let mut running_total = 0usize;
6✔
1785
        for (_, maybe_shard) in indexed_shards {
8✔
1786
            let Some(mut shard) = maybe_shard else {
8✔
1787
                continue;
1✔
1788
            };
1789

1790
            if let Some(max_rows) = config.max_rows {
7✔
1791
                if running_total >= max_rows {
3✔
NEW
1792
                    break;
×
1793
                }
3✔
1794
                let allowed = max_rows.saturating_sub(running_total);
3✔
1795
                if shard.row_count > allowed {
3✔
1796
                    shard.row_count = allowed;
1✔
1797
                    if shard.is_parquet {
1✔
NEW
1798
                        shard
×
NEW
1799
                            .parquet_row_groups
×
NEW
1800
                            .retain(|(start, _)| *start < shard.row_count);
×
NEW
1801
                        if let Some((start, count)) = shard.parquet_row_groups.last_mut() {
×
NEW
1802
                            let group_allowed = shard.row_count.saturating_sub(*start);
×
NEW
1803
                            *count = (*count).min(group_allowed);
×
NEW
1804
                        }
×
1805
                    }
1✔
1806
                }
2✔
1807
            }
4✔
1808

1809
            if shard.row_count == 0 {
7✔
NEW
1810
                continue;
×
1811
            }
7✔
1812

1813
            shard.global_start = running_total;
7✔
1814
            running_total = running_total.saturating_add(shard.row_count);
7✔
1815
            shards.push(shard);
7✔
1816
        }
1817

1818
        info!(
6✔
1819
            "[triplets:hf] indexing complete in {:.2}s (rows={}, shards={})",
1820
            start_index.elapsed().as_secs_f64(),
4✔
1821
            running_total,
1822
            shards.len()
4✔
1823
        );
1824

1825
        Ok((shards, running_total))
6✔
1826
    }
8✔
1827

1828
    /// Locate containing shard and local offset for a global row index.
1829
    fn locate_shard(shards: &[ShardIndex], idx: usize) -> Option<(&ShardIndex, usize)> {
42✔
1830
        let pos = shards
42✔
1831
            .binary_search_by(|shard| {
42✔
1832
                if idx < shard.global_start {
42✔
1833
                    Ordering::Greater
1✔
1834
                } else if idx >= shard.global_start + shard.row_count {
41✔
1835
                    Ordering::Less
1✔
1836
                } else {
1837
                    Ordering::Equal
40✔
1838
                }
1839
            })
42✔
1840
            .ok()?;
42✔
1841
        let shard = shards.get(pos)?;
40✔
1842
        Some((shard, idx - shard.global_start))
40✔
1843
    }
42✔
1844

1845
    /// Read one JSONL/NDJSON line at a local row offset using checkpoints.
1846
    fn read_line_at(&self, shard: &ShardIndex, local_idx: usize) -> Result<String, SamplerError> {
40✔
1847
        let checkpoint_idx = local_idx / self.config.checkpoint_stride;
40✔
1848
        let checkpoint_line = checkpoint_idx * self.config.checkpoint_stride;
40✔
1849
        let seek_offset = *shard.checkpoints.get(checkpoint_idx).ok_or_else(|| {
40✔
1850
            SamplerError::SourceUnavailable {
2✔
1851
                source_id: self.config.source_id.clone(),
2✔
1852
                reason: format!(
2✔
1853
                    "missing checkpoint for shard {} line {}",
2✔
1854
                    shard.path.display(),
2✔
1855
                    local_idx
2✔
1856
                ),
2✔
1857
            }
2✔
1858
        })?;
2✔
1859

1860
        let mut file = File::open(&shard.path).map_err(|err| SamplerError::SourceUnavailable {
38✔
NEW
1861
            source_id: self.config.source_id.clone(),
×
NEW
1862
            reason: format!("failed opening shard {}: {err}", shard.path.display()),
×
NEW
1863
        })?;
×
1864
        file.seek(SeekFrom::Start(seek_offset))
38✔
1865
            .map_err(|err| SamplerError::SourceUnavailable {
38✔
NEW
1866
                source_id: self.config.source_id.clone(),
×
NEW
1867
                reason: format!("failed seeking shard {}: {err}", shard.path.display()),
×
NEW
1868
            })?;
×
1869

1870
        let mut reader = BufReader::new(file);
38✔
1871
        let mut line = String::new();
38✔
1872
        for _ in checkpoint_line..local_idx {
38✔
1873
            line.clear();
180✔
1874
            let bytes =
180✔
1875
                reader
180✔
1876
                    .read_line(&mut line)
180✔
1877
                    .map_err(|err| SamplerError::SourceUnavailable {
180✔
NEW
1878
                        source_id: self.config.source_id.clone(),
×
NEW
1879
                        reason: format!("failed scanning shard {}: {err}", shard.path.display()),
×
NEW
1880
                    })?;
×
1881
            if bytes == 0 {
180✔
NEW
1882
                return Err(SamplerError::SourceUnavailable {
×
NEW
1883
                    source_id: self.config.source_id.clone(),
×
NEW
1884
                    reason: format!(
×
NEW
1885
                        "unexpected EOF while scanning shard {} at row {}",
×
NEW
1886
                        shard.path.display(),
×
NEW
1887
                        local_idx
×
NEW
1888
                    ),
×
NEW
1889
                });
×
1890
            }
180✔
1891
        }
1892

1893
        line.clear();
38✔
1894
        let bytes = reader
38✔
1895
            .read_line(&mut line)
38✔
1896
            .map_err(|err| SamplerError::SourceUnavailable {
38✔
NEW
1897
                source_id: self.config.source_id.clone(),
×
NEW
1898
                reason: format!("failed reading shard {}: {err}", shard.path.display()),
×
NEW
1899
            })?;
×
1900
        if bytes == 0 {
38✔
1901
            return Err(SamplerError::SourceUnavailable {
1✔
1902
                source_id: self.config.source_id.clone(),
1✔
1903
                reason: format!(
1✔
1904
                    "unexpected EOF while reading shard {} row {}",
1✔
1905
                    shard.path.display(),
1✔
1906
                    local_idx
1✔
1907
                ),
1✔
1908
            });
1✔
1909
        }
37✔
1910
        Ok(line)
37✔
1911
    }
40✔
1912

1913
    /// Locate parquet row-group and in-group row offset for a local row index.
1914
    fn locate_parquet_group(
5✔
1915
        &self,
5✔
1916
        shard: &ShardIndex,
5✔
1917
        local_idx: usize,
5✔
1918
    ) -> Result<(usize, usize), SamplerError> {
5✔
1919
        let group_pos = shard
5✔
1920
            .parquet_row_groups
5✔
1921
            .binary_search_by(|(start, count)| {
9✔
1922
                if local_idx < *start {
9✔
1923
                    Ordering::Greater
1✔
1924
                } else if local_idx >= start.saturating_add(*count) {
8✔
1925
                    Ordering::Less
3✔
1926
                } else {
1927
                    Ordering::Equal
5✔
1928
                }
1929
            })
9✔
1930
            .map_err(|_| SamplerError::SourceUnavailable {
5✔
1931
                source_id: self.config.source_id.clone(),
1✔
1932
                reason: format!(
1✔
1933
                    "parquet row {} could not be mapped to a row group in {}",
1934
                    local_idx,
1935
                    shard.path.display()
1✔
1936
                ),
1937
            })?;
1✔
1938
        let (group_start, _) = shard.parquet_row_groups[group_pos];
4✔
1939
        Ok((group_pos, local_idx.saturating_sub(group_start)))
4✔
1940
    }
5✔
1941

1942
    /// Convert a serde JSON value into non-empty text when possible.
1943
    fn value_to_text(value: &Value) -> Option<String> {
99✔
1944
        match value {
99✔
1945
            Value::Null => None,
1✔
1946
            Value::String(s) => {
92✔
1947
                if s.trim().is_empty() {
92✔
1948
                    None
2✔
1949
                } else {
1950
                    Some(s.clone())
90✔
1951
                }
1952
            }
1953
            Value::Bool(b) => Some(b.to_string()),
2✔
1954
            Value::Number(n) => Some(n.to_string()),
3✔
1955
            Value::Array(_) | Value::Object(_) => Some(value.to_string()),
1✔
1956
        }
1957
    }
99✔
1958

1959
    /// Parse a raw row payload into normalized `RowView` fields.
1960
    fn parse_row(&self, absolute_idx: usize, row_value: &Value) -> Result<RowView, SamplerError> {
46✔
1961
        let row_payload = row_value.get("row").unwrap_or(row_value);
46✔
1962
        let row_obj = row_payload
46✔
1963
            .as_object()
46✔
1964
            .ok_or_else(|| SamplerError::SourceUnavailable {
46✔
1965
                source_id: self.config.source_id.clone(),
1✔
1966
                reason: "snapshot row entry missing JSON object payload".to_string(),
1✔
1967
            })?;
1✔
1968

1969
        let row_id = self
45✔
1970
            .config
45✔
1971
            .id_column
45✔
1972
            .as_ref()
45✔
1973
            .and_then(|col| row_obj.get(col))
45✔
1974
            .and_then(Self::value_to_text)
45✔
1975
            .unwrap_or_else(|| {
45✔
1976
                format!(
5✔
1977
                    "{}:{}:{}",
1978
                    self.config.dataset, self.config.split, absolute_idx
1979
                )
1980
            });
5✔
1981

1982
        let mut text_fields = Vec::new();
45✔
1983
        let use_role_columns = self.config.anchor_column.is_some()
45✔
1984
            || self.config.positive_column.is_some()
41✔
1985
            || !self.config.context_columns.is_empty();
41✔
1986

1987
        if use_role_columns {
45✔
1988
            if let Some(name) = &self.config.anchor_column {
4✔
1989
                let value = row_obj
4✔
1990
                    .get(name)
4✔
1991
                    .ok_or_else(|| SamplerError::SourceInconsistent {
4✔
NEW
1992
                        source_id: self.config.source_id.clone(),
×
NEW
1993
                        details: format!("missing configured anchor column '{name}'"),
×
NEW
1994
                    })?;
×
1995
                let text =
3✔
1996
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
4✔
1997
                        source_id: self.config.source_id.clone(),
1✔
1998
                        details: format!("configured anchor column '{name}' has null/empty value"),
1✔
1999
                    })?;
1✔
2000
                text_fields.push(RowTextField {
3✔
2001
                    name: name.clone(),
3✔
2002
                    text,
3✔
2003
                });
3✔
NEW
2004
            }
×
2005

2006
            if let Some(name) = &self.config.positive_column {
3✔
2007
                let value = row_obj
2✔
2008
                    .get(name)
2✔
2009
                    .ok_or_else(|| SamplerError::SourceInconsistent {
2✔
2010
                        source_id: self.config.source_id.clone(),
1✔
2011
                        details: format!("missing configured positive column '{name}'"),
1✔
2012
                    })?;
1✔
2013
                let text =
1✔
2014
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
1✔
NEW
2015
                        source_id: self.config.source_id.clone(),
×
NEW
2016
                        details: format!(
×
2017
                            "configured positive column '{name}' has null/empty value"
2018
                        ),
NEW
2019
                    })?;
×
2020
                text_fields.push(RowTextField {
1✔
2021
                    name: name.clone(),
1✔
2022
                    text,
1✔
2023
                });
1✔
2024
            }
1✔
2025

2026
            for name in &self.config.context_columns {
3✔
2027
                let value = row_obj
3✔
2028
                    .get(name)
3✔
2029
                    .ok_or_else(|| SamplerError::SourceInconsistent {
3✔
2030
                        source_id: self.config.source_id.clone(),
1✔
2031
                        details: format!("missing configured context column '{name}'"),
1✔
2032
                    })?;
1✔
2033
                let text =
2✔
2034
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
2✔
NEW
2035
                        source_id: self.config.source_id.clone(),
×
NEW
2036
                        details: format!("configured context column '{name}' has null/empty value"),
×
NEW
2037
                    })?;
×
2038
                text_fields.push(RowTextField {
2✔
2039
                    name: name.clone(),
2✔
2040
                    text,
2✔
2041
                });
2✔
2042
            }
2043
        } else if self.config.text_columns.is_empty() {
41✔
2044
            for (name, value) in row_obj {
71✔
2045
                if self.config.id_column.as_ref().is_some_and(|id| id == name) {
71✔
2046
                    continue;
34✔
2047
                }
37✔
2048
                if let Some(text) = Self::value_to_text(value) {
37✔
2049
                    text_fields.push(RowTextField {
37✔
2050
                        name: name.clone(),
37✔
2051
                        text,
37✔
2052
                    });
37✔
2053
                }
37✔
2054
            }
2055
        } else {
2056
            for name in &self.config.text_columns {
9✔
2057
                let value = row_obj
9✔
2058
                    .get(name)
9✔
2059
                    .ok_or_else(|| SamplerError::SourceInconsistent {
9✔
NEW
2060
                        source_id: self.config.source_id.clone(),
×
NEW
2061
                        details: format!("missing configured text column '{name}'"),
×
NEW
2062
                    })?;
×
2063
                let text =
9✔
2064
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
9✔
NEW
2065
                        source_id: self.config.source_id.clone(),
×
NEW
2066
                        details: format!("configured text column '{name}' has null/empty value"),
×
NEW
2067
                    })?;
×
2068
                text_fields.push(RowTextField {
9✔
2069
                    name: name.clone(),
9✔
2070
                    text,
9✔
2071
                });
9✔
2072
            }
2073
        }
2074

2075
        if text_fields.is_empty() {
42✔
NEW
2076
            return Err(SamplerError::SourceInconsistent {
×
NEW
2077
                source_id: self.config.source_id.clone(),
×
NEW
2078
                details: "row resolved to zero text fields".to_string(),
×
NEW
2079
            });
×
2080
        }
42✔
2081

2082
        Ok(RowView {
42✔
2083
            row_id: Some(row_id),
42✔
2084
            timestamp: None,
42✔
2085
            text_fields,
42✔
2086
        })
42✔
2087
    }
46✔
2088

2089
    /// Convert a `RowView` into a sampler `DataRecord`.
2090
    fn row_to_record(
46✔
2091
        &self,
46✔
2092
        row: &RowView,
46✔
2093
        row_index: u64,
46✔
2094
    ) -> Result<Option<DataRecord>, SamplerError> {
46✔
2095
        if row.text_fields.is_empty() {
46✔
2096
            return Ok(None);
1✔
2097
        }
45✔
2098

2099
        let record_id = row
45✔
2100
            .row_id
45✔
2101
            .as_ref()
45✔
2102
            .cloned()
45✔
2103
            .unwrap_or_else(|| format!("row_{row_index}"));
45✔
2104
        let id = format!("{}::{}", self.config.source_id, record_id);
45✔
2105

2106
        let mut sections = Vec::new();
45✔
2107
        let anchor = &row.text_fields[0];
45✔
2108
        sections.push(make_section(
45✔
2109
            SectionRole::Anchor,
45✔
2110
            Some(anchor.name.as_str()),
45✔
2111
            anchor.text.as_str(),
45✔
2112
        ));
2113

2114
        let positive = row.text_fields.get(1).unwrap_or(anchor);
45✔
2115
        sections.push(make_section(
45✔
2116
            SectionRole::Context,
45✔
2117
            Some(positive.name.as_str()),
45✔
2118
            positive.text.as_str(),
45✔
2119
        ));
2120

2121
        for field in row.text_fields.iter().skip(2) {
45✔
2122
            sections.push(make_section(
1✔
2123
                SectionRole::Context,
1✔
2124
                Some(field.name.as_str()),
1✔
2125
                field.text.as_str(),
1✔
2126
            ));
1✔
2127
        }
1✔
2128

2129
        let timestamp = row.timestamp.unwrap_or(DateTime::<Utc>::UNIX_EPOCH);
45✔
2130
        Ok(Some(DataRecord {
45✔
2131
            id,
45✔
2132
            source: self.config.source_id.clone(),
45✔
2133
            created_at: timestamp,
45✔
2134
            updated_at: timestamp,
45✔
2135
            quality: QualityScore::default(),
45✔
2136
            taxonomy: vec![
45✔
2137
                format!("dataset={}", self.config.dataset),
45✔
2138
                format!("config={}", self.config.config),
45✔
2139
                format!("split={}", self.config.split),
45✔
2140
            ],
45✔
2141
            sections,
45✔
2142
            meta_prefix: None,
45✔
2143
        }))
45✔
2144
    }
46✔
2145

2146
    /// Materialize records for requested indices into output buffer.
2147
    fn read_row_batch(
17✔
2148
        &self,
17✔
2149
        indices: &[usize],
17✔
2150
        out: &mut Vec<DataRecord>,
17✔
2151
        limit: Option<usize>,
17✔
2152
    ) -> Result<(), SamplerError> {
17✔
2153
        let mut sorted = indices.to_vec();
17✔
2154
        sorted.sort_unstable();
17✔
2155

2156
        let mut fetched = HashMap::with_capacity(sorted.len());
17✔
2157
        let mut pending = Vec::new();
17✔
2158
        for idx in &sorted {
46✔
2159
            if !self.ensure_row_available(*idx)? {
46✔
2160
                fetched.insert(*idx, None);
2✔
2161
                continue;
2✔
2162
            }
44✔
2163

2164
            if let Some(row) = self
44✔
2165
                .cache
44✔
2166
                .lock()
44✔
2167
                .map_err(|_| SamplerError::SourceUnavailable {
44✔
NEW
2168
                    source_id: self.config.source_id.clone(),
×
NEW
2169
                    reason: "huggingface row cache lock poisoned".to_string(),
×
NEW
2170
                })?
×
2171
                .get(*idx)
44✔
2172
            {
2173
                let record = self.row_to_record(&row, *idx as u64)?;
4✔
2174
                fetched.insert(*idx, record);
4✔
2175
                continue;
4✔
2176
            }
40✔
2177

2178
            pending.push(*idx);
40✔
2179
        }
2180

2181
        if !pending.is_empty() {
17✔
2182
            let resolutions =
13✔
2183
                {
2184
                    let state = self
14✔
2185
                        .state
14✔
2186
                        .lock()
14✔
2187
                        .map_err(|_| SamplerError::SourceUnavailable {
14✔
NEW
2188
                            source_id: self.config.source_id.clone(),
×
NEW
2189
                            reason: "huggingface source state lock poisoned".to_string(),
×
NEW
2190
                        })?;
×
2191
                    let mut resolved = Vec::with_capacity(pending.len());
14✔
2192
                    for idx in &pending {
40✔
2193
                        let (shard, local_idx) = Self::locate_shard(&state.shards, *idx)
40✔
2194
                            .ok_or_else(|| SamplerError::SourceUnavailable {
40✔
2195
                                source_id: self.config.source_id.clone(),
1✔
2196
                                reason: format!("row index out of range: {idx}"),
1✔
2197
                            })?;
1✔
2198
                        resolved.push((*idx, shard.clone(), local_idx));
39✔
2199
                    }
2200
                    resolved
13✔
2201
                };
2202

2203
            let mut parquet_groups: HashMap<ParquetGroupKey, Vec<ParquetGroupRequest>> =
13✔
2204
                HashMap::new();
13✔
2205
            for (idx, shard, local_idx) in resolutions {
39✔
2206
                if shard.is_parquet {
39✔
2207
                    let (group_pos, local_in_group) =
3✔
2208
                        self.locate_parquet_group(&shard, local_idx)?;
3✔
2209
                    parquet_groups
3✔
2210
                        .entry((shard.path.clone(), group_pos))
3✔
2211
                        .or_default()
3✔
2212
                        .push((idx, local_in_group, shard));
3✔
2213
                    continue;
3✔
2214
                }
36✔
2215

2216
                let line = self.read_line_at(&shard, local_idx)?;
36✔
2217
                let row_value = serde_json::from_str::<Value>(line.trim()).map_err(|err| {
36✔
2218
                    SamplerError::SourceInconsistent {
1✔
2219
                        source_id: self.config.source_id.clone(),
1✔
2220
                        details: format!(
1✔
2221
                            "failed decoding JSON row from shard {} at local index {}: {err}",
1✔
2222
                            shard.path.display(),
1✔
2223
                            local_idx
1✔
2224
                        ),
1✔
2225
                    }
1✔
2226
                })?;
1✔
2227
                let row = self.parse_row(idx, &row_value)?;
35✔
2228
                let record = self.row_to_record(&row, idx as u64)?;
35✔
2229
                self.cache
35✔
2230
                    .lock()
35✔
2231
                    .map_err(|_| SamplerError::SourceUnavailable {
35✔
NEW
2232
                        source_id: self.config.source_id.clone(),
×
NEW
2233
                        reason: "huggingface row cache lock poisoned".to_string(),
×
NEW
2234
                    })?
×
2235
                    .insert(idx, row, self.config.cache_capacity);
35✔
2236
                fetched.insert(idx, record);
35✔
2237
            }
2238

2239
            for ((shard_path, group_pos), mut requested) in parquet_groups {
12✔
2240
                requested.sort_by_key(|(_, local_in_group, _)| *local_in_group);
2✔
2241
                let shard = requested
2✔
2242
                    .first()
2✔
2243
                    .map(|(_, _, shard)| shard.clone())
2✔
2244
                    .ok_or_else(|| SamplerError::SourceUnavailable {
2✔
NEW
2245
                        source_id: self.config.source_id.clone(),
×
NEW
2246
                        reason: format!(
×
2247
                            "missing parquet request metadata for shard {} row_group {}",
NEW
2248
                            shard_path.display(),
×
2249
                            group_pos
2250
                        ),
NEW
2251
                    })?;
×
2252

2253
                let mut targets: BTreeMap<usize, Vec<usize>> = BTreeMap::new();
2✔
2254
                for (idx, local_in_group, _) in requested {
3✔
2255
                    targets.entry(local_in_group).or_default().push(idx);
3✔
2256
                }
3✔
2257
                let max_target = targets.keys().next_back().copied().unwrap_or(0);
2✔
2258

2259
                let reader = self
2✔
2260
                    .parquet_cache
2✔
2261
                    .lock()
2✔
2262
                    .map_err(|_| SamplerError::SourceUnavailable {
2✔
NEW
2263
                        source_id: self.config.source_id.clone(),
×
NEW
2264
                        reason: "huggingface parquet cache lock poisoned".to_string(),
×
NEW
2265
                    })?
×
2266
                    .reader_for(&self.config.source_id, &shard.path)?;
2✔
2267

2268
                let row_group = reader.get_row_group(group_pos).map_err(|err| {
1✔
NEW
2269
                    SamplerError::SourceUnavailable {
×
NEW
2270
                        source_id: self.config.source_id.clone(),
×
NEW
2271
                        reason: format!(
×
NEW
2272
                            "failed opening parquet row group {} for {}: {err}",
×
NEW
2273
                            group_pos,
×
NEW
2274
                            shard.path.display()
×
NEW
2275
                        ),
×
NEW
2276
                    }
×
NEW
2277
                })?;
×
2278
                let iter = RowIter::from_row_group(None, row_group.as_ref()).map_err(|err| {
1✔
NEW
2279
                    SamplerError::SourceUnavailable {
×
NEW
2280
                        source_id: self.config.source_id.clone(),
×
NEW
2281
                        reason: format!(
×
NEW
2282
                            "failed iterating parquet row group {} for {}: {err}",
×
NEW
2283
                            group_pos,
×
NEW
2284
                            shard.path.display()
×
NEW
2285
                        ),
×
NEW
2286
                    }
×
NEW
2287
                })?;
×
2288

2289
                for (position, row_result) in iter.enumerate() {
2✔
2290
                    if position > max_target {
2✔
NEW
2291
                        break;
×
2292
                    }
2✔
2293
                    let Some(indices_for_position) = targets.remove(&position) else {
2✔
NEW
2294
                        continue;
×
2295
                    };
2296
                    let row_value = row_result.map_err(|err| SamplerError::SourceUnavailable {
2✔
NEW
2297
                        source_id: self.config.source_id.clone(),
×
NEW
2298
                        reason: format!(
×
2299
                            "failed reading parquet row {} in shard {} row_group {}: {err}",
2300
                            position,
NEW
2301
                            shard.path.display(),
×
2302
                            group_pos
2303
                        ),
NEW
2304
                    })?;
×
2305
                    let row_value = row_value.to_json_value();
2✔
2306

2307
                    for idx in indices_for_position {
2✔
2308
                        let row = self.parse_row(idx, &row_value)?;
2✔
2309
                        let record = self.row_to_record(&row, idx as u64)?;
2✔
2310
                        self.cache
2✔
2311
                            .lock()
2✔
2312
                            .map_err(|_| SamplerError::SourceUnavailable {
2✔
NEW
2313
                                source_id: self.config.source_id.clone(),
×
NEW
2314
                                reason: "huggingface row cache lock poisoned".to_string(),
×
NEW
2315
                            })?
×
2316
                            .insert(idx, row, self.config.cache_capacity);
2✔
2317
                        fetched.insert(idx, record);
2✔
2318
                    }
2319

2320
                    if targets.is_empty() {
2✔
2321
                        break;
1✔
2322
                    }
1✔
2323
                }
2324

2325
                if !targets.is_empty() {
1✔
NEW
2326
                    let missing = targets
×
NEW
2327
                        .into_keys()
×
NEW
2328
                        .map(|value| value.to_string())
×
NEW
2329
                        .collect::<Vec<_>>()
×
NEW
2330
                        .join(",");
×
NEW
2331
                    return Err(SamplerError::SourceUnavailable {
×
NEW
2332
                        source_id: self.config.source_id.clone(),
×
NEW
2333
                        reason: format!(
×
NEW
2334
                            "parquet rows missing in shard {} row_group {} at local offsets [{}]",
×
NEW
2335
                            shard.path.display(),
×
NEW
2336
                            group_pos,
×
NEW
2337
                            missing
×
NEW
2338
                        ),
×
NEW
2339
                    });
×
2340
                }
1✔
2341
            }
2342
        }
3✔
2343

2344
        for idx in indices {
43✔
2345
            if limit.is_some_and(|max| out.len() >= max) {
43✔
2346
                break;
1✔
2347
            }
42✔
2348
            if let Some(record) = fetched.remove(idx).flatten() {
42✔
2349
                out.push(record);
40✔
2350
            }
40✔
2351
        }
2352
        Ok(())
14✔
2353
    }
17✔
2354

2355
    /// Return the current index-domain upper bound for refresh paging.
2356
    fn len_hint(&self) -> Option<usize> {
20✔
2357
        let state = self.state.lock().ok()?;
20✔
2358
        let known = state.materialized_rows;
20✔
2359
        if known > 0 {
20✔
2360
            let mut upper = known;
15✔
2361
            if state
15✔
2362
                .total_rows
15✔
2363
                .is_some_and(|total_rows| total_rows > known)
15✔
2364
            {
2365
                let headroom = self.effective_expansion_headroom_rows();
4✔
2366
                upper = known.saturating_add(headroom);
4✔
2367
                if let Some(total_rows) = state.total_rows {
4✔
2368
                    upper = upper.min(total_rows);
4✔
2369
                }
4✔
2370
            }
11✔
2371
            if let Some(max_rows) = self.config.max_rows {
15✔
2372
                upper = upper.min(max_rows);
5✔
2373
            }
10✔
2374
            return Some(upper.max(known));
15✔
2375
        }
5✔
2376

2377
        if state.total_rows.is_some_and(|total_rows| total_rows == 0) {
5✔
2378
            return Some(0);
2✔
2379
        }
3✔
2380

2381
        if state
3✔
2382
            .remote_candidates
3✔
2383
            .as_ref()
3✔
2384
            .is_some_and(|candidates| candidates.is_empty())
3✔
2385
        {
NEW
2386
            return Some(0);
×
2387
        }
3✔
2388

2389
        if self.config.max_rows.is_some_and(|max_rows| max_rows == 0) {
3✔
2390
            return Some(0);
1✔
2391
        }
2✔
2392

2393
        Some(1)
2✔
2394
    }
20✔
2395
}
2396

2397
impl DataSource for HuggingFaceRowSource {
2398
    /// Return stable source id.
NEW
2399
    fn id(&self) -> &str {
×
NEW
2400
        &self.config.source_id
×
NEW
2401
    }
×
2402

2403
    /// Refresh source records for the requested cursor and row limit.
2404
    fn refresh(
11✔
2405
        &self,
11✔
2406
        config: &SamplerConfig,
11✔
2407
        cursor: Option<&SourceCursor>,
11✔
2408
        limit: Option<usize>,
11✔
2409
    ) -> Result<SourceSnapshot, SamplerError> {
11✔
2410
        self.set_active_sampler_config(config);
11✔
2411
        let total = self
11✔
2412
            .len_hint()
11✔
2413
            .ok_or_else(|| SamplerError::SourceInconsistent {
11✔
NEW
2414
                source_id: self.config.source_id.clone(),
×
NEW
2415
                details: "huggingface source did not provide len_hint".to_string(),
×
NEW
2416
            })?;
×
2417

2418
        if total == 0 {
11✔
2419
            return Ok(SourceSnapshot {
1✔
2420
                records: Vec::new(),
1✔
2421
                cursor: SourceCursor {
1✔
2422
                    last_seen: Utc::now(),
1✔
2423
                    revision: 0,
1✔
2424
                },
1✔
2425
            });
1✔
2426
        }
10✔
2427

2428
        let max = limit.unwrap_or(total);
10✔
2429
        let mut start = cursor.map(|state| state.revision as usize).unwrap_or(0);
10✔
2430
        if start >= total {
10✔
2431
            start = 0;
1✔
2432
        }
9✔
2433

2434
        let source_id = self.config.source_id.clone();
10✔
2435
        let seed = self.paging_seed(total)?;
10✔
2436
        let mut permutation = crate::source::IndexPermutation::new(total, seed, start as u64);
10✔
2437

2438
        let mut records = Vec::new();
10✔
2439
        let read_batch_target = self.effective_refresh_batch_target(max);
10✔
2440
        let mut pending_indices = Vec::with_capacity(read_batch_target);
10✔
2441
        let should_report = total >= 10_000 || max >= 1_024;
10✔
2442
        let report_every = Duration::from_millis(750);
10✔
2443
        let refresh_start = Instant::now();
10✔
2444
        let mut last_report = refresh_start;
10✔
2445
        let mut attempts = 0usize;
10✔
2446

2447
        if should_report {
10✔
2448
            info!(
1✔
2449
                "[triplets:source] refresh start source='{}' total={} target={}",
2450
                source_id, total, max
2451
            );
2452
        }
9✔
2453

2454
        while attempts < total && records.len() < max {
20✔
2455
            pending_indices.clear();
10✔
2456
            let remaining_attempts = total.saturating_sub(attempts);
10✔
2457
            let to_collect = read_batch_target.min(remaining_attempts);
10✔
2458
            for _ in 0..to_collect {
10✔
2459
                if records.len() + pending_indices.len() >= max {
36✔
2460
                    break;
1✔
2461
                }
35✔
2462
                pending_indices.push(permutation.next());
35✔
2463
                attempts += 1;
35✔
2464
            }
2465

2466
            if pending_indices.is_empty() {
10✔
NEW
2467
                break;
×
2468
            }
10✔
2469

2470
            if should_report {
10✔
2471
                info!(
1✔
2472
                    "[triplets:source] refresh batch source='{}' batch_size={} attempted={} fetched={} elapsed={:.1}s",
2473
                    source_id,
2474
                    pending_indices.len(),
1✔
2475
                    attempts,
2476
                    records.len(),
1✔
2477
                    refresh_start.elapsed().as_secs_f64()
1✔
2478
                );
2479
            }
9✔
2480

2481
            self.read_row_batch(&pending_indices, &mut records, Some(max))?;
10✔
2482

2483
            if should_report && last_report.elapsed() >= report_every {
10✔
NEW
2484
                info!(
×
2485
                    "[triplets:source] refresh progress source='{}' attempted={}/{} fetched={}/{} elapsed={:.1}s",
2486
                    source_id,
2487
                    attempts,
2488
                    total,
NEW
2489
                    records.len(),
×
2490
                    max,
NEW
2491
                    refresh_start.elapsed().as_secs_f64()
×
2492
                );
NEW
2493
                last_report = Instant::now();
×
2494
            }
10✔
2495
        }
2496

2497
        if should_report {
10✔
2498
            info!(
1✔
2499
                "[triplets:source] refresh done source='{}' attempted={} fetched={} elapsed={:.2}s",
2500
                source_id,
2501
                attempts,
2502
                records.len(),
1✔
2503
                refresh_start.elapsed().as_secs_f64()
1✔
2504
            );
2505
        }
9✔
2506

2507
        let next_start = permutation.cursor();
10✔
2508
        let last_seen = records
10✔
2509
            .iter()
10✔
2510
            .map(|record| record.updated_at)
10✔
2511
            .max()
10✔
2512
            .unwrap_or_else(Utc::now);
10✔
2513

2514
        Ok(SourceSnapshot {
10✔
2515
            records,
10✔
2516
            cursor: SourceCursor {
10✔
2517
                last_seen,
10✔
2518
                revision: next_start as u64,
10✔
2519
            },
10✔
2520
        })
10✔
2521
    }
11✔
2522

2523
    /// Return exact reported record count from current len hint.
2524
    fn reported_record_count(&self, config: &SamplerConfig) -> Result<u128, SamplerError> {
4✔
2525
        self.set_active_sampler_config(config);
4✔
2526
        self.len_hint()
4✔
2527
            .map(|count| count as u128)
4✔
2528
            .ok_or_else(|| SamplerError::SourceInconsistent {
4✔
NEW
2529
                source_id: self.config.source_id.clone(),
×
NEW
2530
                details: "huggingface source did not provide len_hint".to_string(),
×
NEW
2531
            })
×
2532
    }
4✔
2533

2534
    /// Return default triplet recipe used by Hugging Face row sources.
2535
    fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
1✔
2536
        vec![TripletRecipe {
1✔
2537
            name: "huggingface_anchor_context".into(),
1✔
2538
            anchor: Selector::Role(SectionRole::Anchor),
1✔
2539
            positive_selector: Selector::Role(SectionRole::Context),
1✔
2540
            negative_selector: Selector::Role(SectionRole::Context),
1✔
2541
            negative_strategy: NegativeStrategy::WrongArticle,
1✔
2542
            weight: 1.0,
1✔
2543
            instruction: None,
1✔
2544
        }]
1✔
2545
    }
1✔
2546
}
2547

2548
#[cfg(test)]
2549
mod tests {
2550
    use super::*;
2551
    use parquet::data_type::{ByteArray, ByteArrayType};
2552
    use parquet::file::properties::WriterProperties;
2553
    use parquet::file::writer::SerializedFileWriter;
2554
    use parquet::schema::parser::parse_message_type;
2555
    use serde_json::json;
2556
    use std::io::{Read, Write};
2557
    use std::net::TcpListener;
2558
    use std::thread;
2559
    use tempfile::tempdir;
2560

2561
    fn test_config(snapshot_dir: PathBuf) -> HuggingFaceRowsConfig {
97✔
2562
        let mut config =
97✔
2563
            HuggingFaceRowsConfig::new("hf_test", "org/dataset", "default", "train", snapshot_dir);
97✔
2564
        config.cache_capacity = 10;
97✔
2565
        config.remote_expansion_headroom_multiplier = 3;
97✔
2566
        config
97✔
2567
    }
97✔
2568

2569
    fn test_source(config: HuggingFaceRowsConfig) -> HuggingFaceRowSource {
58✔
2570
        let source = HuggingFaceRowSource {
58✔
2571
            config,
58✔
2572
            sampler_config: Mutex::new(None),
58✔
2573
            state: Mutex::new(SourceState {
58✔
2574
                materialized_rows: 0,
58✔
2575
                total_rows: None,
58✔
2576
                shards: Vec::new(),
58✔
2577
                remote_candidates: None,
58✔
2578
                remote_candidate_sizes: HashMap::new(),
58✔
2579
                next_remote_idx: 0,
58✔
2580
            }),
58✔
2581
            cache: Mutex::new(RowCache::default()),
58✔
2582
            parquet_cache: Mutex::new(ParquetCache::default()),
58✔
2583
        };
58✔
2584
        source.set_active_sampler_config(&SamplerConfig {
58✔
2585
            seed: 1,
58✔
2586
            ingestion_max_records: source.config.cache_capacity,
58✔
2587
            ..SamplerConfig::default()
58✔
2588
        });
58✔
2589
        source
58✔
2590
    }
58✔
2591

2592
    fn spawn_one_shot_http(payload: Vec<u8>) -> (String, thread::JoinHandle<()>) {
10✔
2593
        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
10✔
2594
        let addr = listener.local_addr().unwrap();
10✔
2595
        let handle = thread::spawn(move || {
10✔
2596
            let (mut stream, _) = listener.accept().unwrap();
10✔
2597
            let mut request_buf = [0u8; 1024];
10✔
2598
            let _ = stream.read(&mut request_buf);
10✔
2599
            let headers = format!(
10✔
2600
                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
2601
                payload.len()
10✔
2602
            );
2603
            stream.write_all(headers.as_bytes()).unwrap();
10✔
2604
            stream.write_all(&payload).unwrap();
10✔
2605
            let _ = stream.flush();
10✔
2606
        });
10✔
2607
        (format!("http://{addr}"), handle)
10✔
2608
    }
10✔
2609

2610
    fn write_parquet_fixture(path: &Path, rows: &[(&str, &str)]) {
2✔
2611
        let schema = Arc::new(
2✔
2612
            parse_message_type(
2✔
2613
                "message test_schema {
2✔
2614
                    REQUIRED BINARY id (UTF8);
2✔
2615
                    REQUIRED BINARY text (UTF8);
2✔
2616
                }",
2✔
2617
            )
2618
            .unwrap(),
2✔
2619
        );
2620
        let props = Arc::new(WriterProperties::builder().build());
2✔
2621
        let file = File::create(path).unwrap();
2✔
2622
        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
2✔
2623
        let mut row_group = writer.next_row_group().unwrap();
2✔
2624

2625
        if let Some(mut col_writer) = row_group.next_column().unwrap() {
2✔
2626
            let values = rows
2✔
2627
                .iter()
2✔
2628
                .map(|(id, _)| ByteArray::from(*id))
5✔
2629
                .collect::<Vec<_>>();
2✔
2630
            col_writer
2✔
2631
                .typed::<ByteArrayType>()
2✔
2632
                .write_batch(&values, None, None)
2✔
2633
                .unwrap();
2✔
2634
            col_writer.close().unwrap();
2✔
NEW
2635
        }
×
2636

2637
        if let Some(mut col_writer) = row_group.next_column().unwrap() {
2✔
2638
            let values = rows
2✔
2639
                .iter()
2✔
2640
                .map(|(_, text)| ByteArray::from(*text))
5✔
2641
                .collect::<Vec<_>>();
2✔
2642
            col_writer
2✔
2643
                .typed::<ByteArrayType>()
2✔
2644
                .write_batch(&values, None, None)
2✔
2645
                .unwrap();
2✔
2646
            col_writer.close().unwrap();
2✔
NEW
2647
        }
×
2648

2649
        assert!(row_group.next_column().unwrap().is_none());
2✔
2650
        row_group.close().unwrap();
2✔
2651
        writer.close().unwrap();
2✔
2652
    }
2✔
2653

2654
    #[test]
2655
    fn row_cache_insert_and_evicts_oldest_entry() {
1✔
2656
        let mut cache = RowCache::default();
1✔
2657
        let row_a = RowView {
1✔
2658
            row_id: Some("a".to_string()),
1✔
2659
            timestamp: None,
1✔
2660
            text_fields: vec![RowTextField {
1✔
2661
                name: "text".to_string(),
1✔
2662
                text: "alpha".to_string(),
1✔
2663
            }],
1✔
2664
        };
1✔
2665
        let row_b = RowView {
1✔
2666
            row_id: Some("b".to_string()),
1✔
2667
            timestamp: None,
1✔
2668
            text_fields: vec![RowTextField {
1✔
2669
                name: "text".to_string(),
1✔
2670
                text: "beta".to_string(),
1✔
2671
            }],
1✔
2672
        };
1✔
2673

2674
        cache.insert(0, row_a.clone(), 1);
1✔
2675
        assert!(cache.get(0).is_some());
1✔
2676

2677
        cache.insert(1, row_b, 1);
1✔
2678
        assert!(cache.get(0).is_none());
1✔
2679
        assert_eq!(cache.get(1).unwrap().row_id.as_deref(), Some("b"));
1✔
2680

2681
        let mut zero_cache = RowCache::default();
1✔
2682
        zero_cache.insert(7, row_a, 0);
1✔
2683
        assert!(zero_cache.get(7).is_none());
1✔
2684
    }
1✔
2685

2686
    #[test]
2687
    fn parquet_cache_reader_for_reports_open_and_parse_errors() {
1✔
2688
        let dir = tempdir().unwrap();
1✔
2689
        let parquet_path = dir.path().join("missing.parquet");
1✔
2690
        let mut cache = ParquetCache::default();
1✔
2691
        let missing = cache.reader_for("hf_test", &parquet_path);
1✔
2692
        assert!(missing.is_err());
1✔
2693

2694
        let invalid_parquet = dir.path().join("invalid.parquet");
1✔
2695
        fs::write(&invalid_parquet, b"not parquet").unwrap();
1✔
2696
        let invalid = cache.reader_for("hf_test", &invalid_parquet);
1✔
2697
        assert!(invalid.is_err());
1✔
2698
    }
1✔
2699

2700
    #[test]
2701
    fn effective_targets_respect_minimum_multiplier_and_sampler_override() {
1✔
2702
        let dir = tempdir().unwrap();
1✔
2703
        let mut config = test_config(dir.path().to_path_buf());
1✔
2704
        config.refresh_batch_multiplier = 0;
1✔
2705
        config.remote_expansion_headroom_multiplier = 0;
1✔
2706
        config.cache_capacity = 9;
1✔
2707
        let source = test_source(config.clone());
1✔
2708

2709
        assert_eq!(source.effective_refresh_batch_target(5), 5);
1✔
2710
        assert_eq!(source.effective_expansion_headroom_rows(), 9);
1✔
2711

2712
        let sampler = SamplerConfig {
1✔
2713
            ingestion_max_records: 4,
1✔
2714
            ..SamplerConfig::default()
1✔
2715
        };
1✔
2716
        *source.sampler_config.lock().unwrap() = Some(sampler);
1✔
2717
        assert_eq!(source.effective_expansion_headroom_rows(), 4);
1✔
2718
    }
1✔
2719

2720
    #[test]
2721
    fn collect_candidates_from_siblings_filters_split_and_tracks_parquet() {
1✔
2722
        let dir = tempdir().unwrap();
1✔
2723
        let config = test_config(dir.path().to_path_buf());
1✔
2724
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
2725
        let siblings = vec![
1✔
2726
            "train/a.ndjson".to_string(),
1✔
2727
            "dev/b.ndjson".to_string(),
1✔
2728
            "train-c.parquet".to_string(),
1✔
2729
            "train-z.txt".to_string(),
1✔
2730
        ];
2731

2732
        let (candidates, saw_parquet) = HuggingFaceRowSource::collect_candidates_from_siblings(
1✔
2733
            &config, &siblings, &accepted, true,
1✔
2734
        );
1✔
2735

2736
        assert!(saw_parquet);
1✔
2737
        assert_eq!(
1✔
2738
            candidates,
2739
            vec!["train/a.ndjson".to_string(), "train-c.parquet".to_string()]
1✔
2740
        );
2741
    }
1✔
2742

2743
    #[test]
2744
    fn collect_candidates_from_siblings_skips_existing_targets() {
1✔
2745
        let dir = tempdir().unwrap();
1✔
2746
        let config = test_config(dir.path().to_path_buf());
1✔
2747
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
2748
        let existing = "train/already.ndjson".to_string();
1✔
2749
        let existing_target = HuggingFaceRowSource::candidate_target_path(&config, &existing);
1✔
2750
        fs::create_dir_all(existing_target.parent().unwrap()).unwrap();
1✔
2751
        fs::write(&existing_target, b"x\n").unwrap();
1✔
2752

2753
        let siblings = vec![existing, "train/new.ndjson".to_string()];
1✔
2754
        let (candidates, _) = HuggingFaceRowSource::collect_candidates_from_siblings(
1✔
2755
            &config, &siblings, &accepted, true,
1✔
2756
        );
1✔
2757
        assert_eq!(candidates, vec!["train/new.ndjson".to_string()]);
1✔
2758
    }
1✔
2759

2760
    #[test]
2761
    fn candidates_from_parquet_manifest_json_filters_and_records_sizes() {
1✔
2762
        let dir = tempdir().unwrap();
1✔
2763
        let config = test_config(dir.path().to_path_buf());
1✔
2764
        let payload = json!({
1✔
2765
            "parquet_files": [
1✔
2766
                {"url": "https://host/x/train/000.parquet", "size": 11},
1✔
2767
                {"url": "https://host/x/train/001.ndjson", "size": 13},
1✔
2768
                {"url": "https://host/x/train/002.txt", "size": 5},
1✔
2769
                {"foo": "missing-url"}
1✔
2770
            ]
2771
        });
2772

2773
        let (candidates, sizes) =
1✔
2774
            HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload).unwrap();
1✔
2775
        assert_eq!(candidates.len(), 2);
1✔
2776
        assert!(
1✔
2777
            candidates
1✔
2778
                .iter()
1✔
2779
                .any(|c| c.ends_with("https://host/x/train/000.parquet"))
1✔
2780
        );
2781
        assert!(
1✔
2782
            candidates
1✔
2783
                .iter()
1✔
2784
                .any(|c| c.ends_with("https://host/x/train/001.ndjson"))
2✔
2785
        );
2786
        assert_eq!(sizes.len(), 2);
1✔
2787
    }
1✔
2788

2789
    #[test]
2790
    fn candidates_from_parquet_manifest_skips_complete_cached_and_replaces_incomplete() {
1✔
2791
        let dir = tempdir().unwrap();
1✔
2792
        let config = test_config(dir.path().to_path_buf());
1✔
2793

2794
        let complete_url = "https://host/datasets/org/ds/resolve/main/train/000.parquet";
1✔
2795
        let complete_candidate = format!("{REMOTE_URL_PREFIX}{complete_url}");
1✔
2796
        let complete_target =
1✔
2797
            HuggingFaceRowSource::candidate_target_path(&config, &complete_candidate);
1✔
2798
        fs::create_dir_all(complete_target.parent().unwrap()).unwrap();
1✔
2799
        fs::write(&complete_target, vec![1u8; 7]).unwrap();
1✔
2800

2801
        let stale_url = "https://host/datasets/org/ds/resolve/main/train/001.parquet";
1✔
2802
        let stale_candidate = format!("{REMOTE_URL_PREFIX}{stale_url}");
1✔
2803
        let stale_target = HuggingFaceRowSource::candidate_target_path(&config, &stale_candidate);
1✔
2804
        fs::create_dir_all(stale_target.parent().unwrap()).unwrap();
1✔
2805
        fs::write(&stale_target, vec![2u8; 3]).unwrap();
1✔
2806

2807
        let payload = json!({
1✔
2808
            "parquet_files": [
1✔
2809
                {"url": complete_url, "size": 7},
1✔
2810
                {"url": stale_url, "size": 9}
1✔
2811
            ]
2812
        });
2813

2814
        let (candidates, sizes) =
1✔
2815
            HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload).unwrap();
1✔
2816
        assert_eq!(candidates.len(), 1);
1✔
2817
        assert!(candidates[0].ends_with(stale_url));
1✔
2818
        assert!(!stale_target.exists());
1✔
2819
        assert_eq!(sizes[&candidates[0]], 9);
1✔
2820
        assert!(complete_target.exists());
1✔
2821
    }
1✔
2822

2823
    #[test]
2824
    fn candidates_from_parquet_manifest_errors_when_removing_incomplete_target_fails() {
1✔
2825
        let dir = tempdir().unwrap();
1✔
2826
        let config = test_config(dir.path().to_path_buf());
1✔
2827
        let url = "https://host/datasets/org/ds/resolve/main/train/blocked.parquet";
1✔
2828
        let candidate = format!("{REMOTE_URL_PREFIX}{url}");
1✔
2829
        let target = HuggingFaceRowSource::candidate_target_path(&config, &candidate);
1✔
2830
        fs::create_dir_all(&target).unwrap();
1✔
2831

2832
        let payload = json!({
1✔
2833
            "parquet_files": [
1✔
2834
                {"url": url, "size": 1}
1✔
2835
            ]
2836
        });
2837

2838
        let err = HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload);
1✔
2839
        assert!(err.is_err());
1✔
2840
    }
1✔
2841

2842
    #[test]
2843
    fn normalized_shard_extensions_trims_dots_and_lowercases() {
1✔
2844
        let dir = tempdir().unwrap();
1✔
2845
        let mut config = test_config(dir.path().to_path_buf());
1✔
2846
        config.shard_extensions = vec![".PARQUET".into(), " ndjson ".into()];
1✔
2847
        let normalized = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
2848
        assert_eq!(
1✔
2849
            normalized,
2850
            vec!["parquet".to_string(), "ndjson".to_string()]
1✔
2851
        );
2852
    }
1✔
2853

2854
    #[test]
2855
    fn manifest_usage_bytes_locked_counts_only_manifest_shards() {
1✔
2856
        let dir = tempdir().unwrap();
1✔
2857
        let config = test_config(dir.path().to_path_buf());
1✔
2858
        let source = test_source(config);
1✔
2859
        let manifest_root = source.manifest_cache_root();
1✔
2860
        fs::create_dir_all(&manifest_root).unwrap();
1✔
2861

2862
        let manifest_file = manifest_root.join("a.parquet");
1✔
2863
        fs::write(&manifest_file, vec![1u8; 7]).unwrap();
1✔
2864
        let local_file = source.config.snapshot_dir.join("local.ndjson");
1✔
2865
        fs::write(&local_file, vec![2u8; 9]).unwrap();
1✔
2866

2867
        let state = SourceState {
1✔
2868
            materialized_rows: 2,
1✔
2869
            total_rows: None,
1✔
2870
            shards: vec![
1✔
2871
                ShardIndex {
1✔
2872
                    path: manifest_file,
1✔
2873
                    global_start: 0,
1✔
2874
                    row_count: 1,
1✔
2875
                    is_parquet: true,
1✔
2876
                    parquet_row_groups: vec![(0, 1)],
1✔
2877
                    checkpoints: Vec::new(),
1✔
2878
                },
1✔
2879
                ShardIndex {
1✔
2880
                    path: local_file,
1✔
2881
                    global_start: 1,
1✔
2882
                    row_count: 1,
1✔
2883
                    is_parquet: false,
1✔
2884
                    parquet_row_groups: Vec::new(),
1✔
2885
                    checkpoints: vec![0],
1✔
2886
                },
1✔
2887
            ],
1✔
2888
            remote_candidates: None,
1✔
2889
            remote_candidate_sizes: HashMap::new(),
1✔
2890
            next_remote_idx: 0,
1✔
2891
        };
1✔
2892

2893
        assert_eq!(source.manifest_usage_bytes_locked(&state), 7);
1✔
2894
    }
1✔
2895

2896
    #[test]
2897
    fn build_shard_index_errors_when_parquet_present_but_not_accepted() {
1✔
2898
        let dir = tempdir().unwrap();
1✔
2899
        fs::write(dir.path().join("rows.parquet"), b"fake").unwrap();
1✔
2900
        let mut config = test_config(dir.path().to_path_buf());
1✔
2901
        config.shard_extensions = vec!["ndjson".to_string()];
1✔
2902

2903
        let result = HuggingFaceRowSource::build_shard_index(&config);
1✔
2904
        assert!(result.is_err());
1✔
2905
    }
1✔
2906

2907
    #[test]
2908
    fn locate_parquet_group_maps_offsets_and_reports_missing() {
1✔
2909
        let dir = tempdir().unwrap();
1✔
2910
        let config = test_config(dir.path().to_path_buf());
1✔
2911
        let source = test_source(config);
1✔
2912
        let shard = ShardIndex {
1✔
2913
            path: dir.path().join("rows.parquet"),
1✔
2914
            global_start: 0,
1✔
2915
            row_count: 6,
1✔
2916
            is_parquet: true,
1✔
2917
            parquet_row_groups: vec![(0, 2), (2, 2), (4, 2)],
1✔
2918
            checkpoints: Vec::new(),
1✔
2919
        };
1✔
2920

2921
        let mapped = source.locate_parquet_group(&shard, 3).unwrap();
1✔
2922
        assert_eq!(mapped, (1, 1));
1✔
2923
        let missing = source.locate_parquet_group(&shard, 99);
1✔
2924
        assert!(missing.is_err());
1✔
2925
    }
1✔
2926

2927
    #[test]
2928
    fn parse_row_role_columns_mode_builds_expected_fields() {
1✔
2929
        let dir = tempdir().unwrap();
1✔
2930
        let mut config = test_config(dir.path().to_path_buf());
1✔
2931
        config.anchor_column = Some("anchor".into());
1✔
2932
        config.positive_column = Some("positive".into());
1✔
2933
        config.context_columns = vec!["ctx1".into(), "ctx2".into()];
1✔
2934
        let source = test_source(config);
1✔
2935

2936
        let row = source
1✔
2937
            .parse_row(
1✔
2938
                2,
2939
                &json!({"id":"r","anchor":"a","positive":"p","ctx1":"c1","ctx2":2}),
1✔
2940
            )
2941
            .unwrap();
1✔
2942
        assert_eq!(row.text_fields.len(), 4);
1✔
2943
        assert_eq!(row.text_fields[0].name, "anchor");
1✔
2944
        assert_eq!(row.text_fields[1].name, "positive");
1✔
2945
    }
1✔
2946

2947
    #[test]
2948
    fn parse_row_role_columns_mode_errors_on_missing_or_empty_values() {
1✔
2949
        let dir = tempdir().unwrap();
1✔
2950
        let mut config = test_config(dir.path().to_path_buf());
1✔
2951
        config.anchor_column = Some("anchor".into());
1✔
2952
        config.context_columns = vec!["ctx".into()];
1✔
2953
        let source = test_source(config);
1✔
2954

2955
        let missing = source.parse_row(0, &json!({"anchor":"a"}));
1✔
2956
        assert!(missing.is_err());
1✔
2957

2958
        let empty_anchor = source.parse_row(1, &json!({"anchor":"   ", "ctx":"ok"}));
1✔
2959
        assert!(empty_anchor.is_err());
1✔
2960
    }
1✔
2961

2962
    #[test]
2963
    fn row_to_record_uses_anchor_for_positive_when_single_field() {
1✔
2964
        let dir = tempdir().unwrap();
1✔
2965
        let config = test_config(dir.path().to_path_buf());
1✔
2966
        let source = test_source(config);
1✔
2967
        let row = RowView {
1✔
2968
            row_id: Some("r1".into()),
1✔
2969
            timestamp: None,
1✔
2970
            text_fields: vec![RowTextField {
1✔
2971
                name: "text".into(),
1✔
2972
                text: "alpha".into(),
1✔
2973
            }],
1✔
2974
        };
1✔
2975

2976
        let record = source.row_to_record(&row, 0).unwrap().unwrap();
1✔
2977
        assert_eq!(record.sections.len(), 2);
1✔
2978
        assert_eq!(record.sections[0].text, record.sections[1].text);
1✔
2979
    }
1✔
2980

2981
    #[test]
2982
    fn read_line_at_errors_on_unexpected_eof_while_scanning() {
1✔
2983
        let dir = tempdir().unwrap();
1✔
2984
        let path = dir.path().join("rows.jsonl");
1✔
2985
        fs::write(&path, b"{\"text\":\"a\"}\n").unwrap();
1✔
2986
        let mut config = test_config(dir.path().to_path_buf());
1✔
2987
        config.checkpoint_stride = 1;
1✔
2988
        let source = test_source(config.clone());
1✔
2989
        let mut shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
2990
            .unwrap()
1✔
2991
            .unwrap();
1✔
2992
        shard.checkpoints = vec![0];
1✔
2993

2994
        let err = source.read_line_at(&shard, 3);
1✔
2995
        assert!(err.is_err());
1✔
2996
    }
1✔
2997

2998
    #[test]
2999
    fn target_matches_expected_size_is_false_for_missing_path() {
1✔
3000
        let dir = tempdir().unwrap();
1✔
3001
        let missing = dir.path().join("missing.bin");
1✔
3002
        assert!(!HuggingFaceRowSource::target_matches_expected_size(
1✔
3003
            &missing,
1✔
3004
            Some(1)
1✔
3005
        ));
1✔
3006
    }
1✔
3007

3008
    #[test]
3009
    fn candidate_target_path_uses_fallback_suffix_without_resolve_segment() {
1✔
3010
        let dir = tempdir().unwrap();
1✔
3011
        let config = test_config(dir.path().to_path_buf());
1✔
3012
        let candidate = "url::https://example.com/raw/file.parquet";
1✔
3013
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
3014
        assert!(target.ends_with("_parquet_manifest/parquet/unknown.parquet"));
1✔
3015
    }
1✔
3016

3017
    #[test]
3018
    fn persist_shard_sequence_is_noop_without_remote_candidates() {
1✔
3019
        let dir = tempdir().unwrap();
1✔
3020
        let config = test_config(dir.path().to_path_buf());
1✔
3021
        let source = test_source(config.clone());
1✔
3022
        let state = SourceState {
1✔
3023
            materialized_rows: 0,
1✔
3024
            total_rows: None,
1✔
3025
            shards: Vec::new(),
1✔
3026
            remote_candidates: None,
1✔
3027
            remote_candidate_sizes: HashMap::new(),
1✔
3028
            next_remote_idx: 0,
1✔
3029
        };
1✔
3030

3031
        source.persist_shard_sequence_locked(&state).unwrap();
1✔
3032
        assert!(!HuggingFaceRowSource::shard_sequence_state_path(&config).exists());
1✔
3033
    }
1✔
3034

3035
    #[test]
3036
    fn load_persisted_shard_sequence_returns_none_for_identity_mismatch() {
1✔
3037
        let dir = tempdir().unwrap();
1✔
3038
        let config = test_config(dir.path().to_path_buf());
1✔
3039
        let state_path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
3040
        fs::create_dir_all(state_path.parent().unwrap()).unwrap();
1✔
3041
        fs::write(
1✔
3042
            &state_path,
1✔
3043
            serde_json::to_vec_pretty(&json!({
1✔
3044
                "version": 1,
1✔
3045
                "source_id": "different",
1✔
3046
                "dataset": config.dataset,
1✔
3047
                "config": config.config,
1✔
3048
                "split": config.split,
1✔
3049
                "sampler_seed": 1,
1✔
3050
                "candidates": ["train/0.ndjson"],
1✔
3051
                "candidate_sizes": {},
1✔
3052
                "next_remote_idx": 0
1✔
3053
            }))
1✔
3054
            .unwrap(),
1✔
3055
        )
3056
        .unwrap();
1✔
3057

3058
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 1).unwrap();
1✔
3059
        assert!(loaded.is_none());
1✔
3060
    }
1✔
3061

3062
    #[test]
3063
    fn parse_row_falls_back_to_synthetic_id_when_missing_id_column() {
1✔
3064
        let dir = tempdir().unwrap();
1✔
3065
        let mut config = test_config(dir.path().to_path_buf());
1✔
3066
        config.id_column = Some("id".into());
1✔
3067
        let source = test_source(config);
1✔
3068

3069
        let row = source.parse_row(42, &json!({"text": "hello"})).unwrap();
1✔
3070
        assert_eq!(row.row_id, Some("org/dataset:train:42".to_string()));
1✔
3071
    }
1✔
3072

3073
    #[test]
3074
    fn row_to_record_falls_back_to_row_index_when_row_id_missing() {
1✔
3075
        let dir = tempdir().unwrap();
1✔
3076
        let config = test_config(dir.path().to_path_buf());
1✔
3077
        let source = test_source(config);
1✔
3078
        let row = RowView {
1✔
3079
            row_id: None,
1✔
3080
            timestamp: None,
1✔
3081
            text_fields: vec![RowTextField {
1✔
3082
                name: "text".into(),
1✔
3083
                text: "body".into(),
1✔
3084
            }],
1✔
3085
        };
1✔
3086

3087
        let record = source.row_to_record(&row, 7).unwrap().unwrap();
1✔
3088
        assert!(record.id.ends_with("::row_7"));
1✔
3089
    }
1✔
3090

3091
    #[test]
3092
    fn locate_shard_returns_none_for_out_of_range_index() {
1✔
3093
        let shards = vec![ShardIndex {
1✔
3094
            path: PathBuf::from("a.ndjson"),
1✔
3095
            global_start: 0,
1✔
3096
            row_count: 2,
1✔
3097
            is_parquet: false,
1✔
3098
            parquet_row_groups: Vec::new(),
1✔
3099
            checkpoints: vec![0],
1✔
3100
        }];
1✔
3101

3102
        assert!(HuggingFaceRowSource::locate_shard(&shards, 5).is_none());
1✔
3103
    }
1✔
3104

3105
    #[test]
3106
    fn read_row_batch_errors_when_row_not_mappable_to_shard() {
1✔
3107
        let dir = tempdir().unwrap();
1✔
3108
        let config = test_config(dir.path().to_path_buf());
1✔
3109
        let source = test_source(config);
1✔
3110
        {
1✔
3111
            let mut state = source.state.lock().unwrap();
1✔
3112
            state.materialized_rows = 1;
1✔
3113
            state.total_rows = Some(1);
1✔
3114
            state.shards.clear();
1✔
3115
        }
1✔
3116

3117
        let mut out = Vec::new();
1✔
3118
        let err = source.read_row_batch(&[0], &mut out, Some(1));
1✔
3119
        assert!(err.is_err());
1✔
3120
    }
1✔
3121

3122
    #[test]
3123
    fn len_hint_applies_max_rows_cap() {
1✔
3124
        let dir = tempdir().unwrap();
1✔
3125
        let mut config = test_config(dir.path().to_path_buf());
1✔
3126
        config.max_rows = Some(3);
1✔
3127
        let source = test_source(config);
1✔
3128
        {
1✔
3129
            let mut state = source.state.lock().unwrap();
1✔
3130
            state.materialized_rows = 2;
1✔
3131
            state.total_rows = Some(100);
1✔
3132
        }
1✔
3133
        assert_eq!(source.len_hint(), Some(3));
1✔
3134
    }
1✔
3135

3136
    #[test]
3137
    fn enforce_disk_cap_returns_false_when_disabled_or_under_limit() {
1✔
3138
        let dir = tempdir().unwrap();
1✔
3139
        let mut config = test_config(dir.path().to_path_buf());
1✔
3140
        config.local_disk_cap_bytes = None;
1✔
3141
        let source = test_source(config);
1✔
3142
        let mut state = SourceState {
1✔
3143
            materialized_rows: 0,
1✔
3144
            total_rows: None,
1✔
3145
            shards: Vec::new(),
1✔
3146
            remote_candidates: None,
1✔
3147
            remote_candidate_sizes: HashMap::new(),
1✔
3148
            next_remote_idx: 0,
1✔
3149
        };
1✔
3150
        let protected = dir.path().join("p");
1✔
3151
        assert!(
1✔
3152
            !source
1✔
3153
                .enforce_disk_cap_locked(&mut state, &protected)
1✔
3154
                .unwrap()
1✔
3155
        );
3156

3157
        let mut config2 = test_config(dir.path().to_path_buf());
1✔
3158
        config2.local_disk_cap_bytes = Some(10_000);
1✔
3159
        let source2 = test_source(config2);
1✔
3160
        let manifest_root = source2.manifest_cache_root();
1✔
3161
        fs::create_dir_all(&manifest_root).unwrap();
1✔
3162
        let shard_path = manifest_root.join("small.parquet");
1✔
3163
        fs::write(&shard_path, vec![1u8; 32]).unwrap();
1✔
3164
        let mut state2 = SourceState {
1✔
3165
            materialized_rows: 1,
1✔
3166
            total_rows: None,
1✔
3167
            shards: vec![ShardIndex {
1✔
3168
                path: shard_path,
1✔
3169
                global_start: 0,
1✔
3170
                row_count: 1,
1✔
3171
                is_parquet: true,
1✔
3172
                parquet_row_groups: vec![(0, 1)],
1✔
3173
                checkpoints: Vec::new(),
1✔
3174
            }],
1✔
3175
            remote_candidates: None,
1✔
3176
            remote_candidate_sizes: HashMap::new(),
1✔
3177
            next_remote_idx: 0,
1✔
3178
        };
1✔
3179
        assert!(
1✔
3180
            !source2
1✔
3181
                .enforce_disk_cap_locked(&mut state2, &protected)
1✔
3182
                .unwrap()
1✔
3183
        );
3184
    }
1✔
3185

3186
    #[test]
3187
    fn default_triplet_recipes_returns_expected_shape() {
1✔
3188
        let dir = tempdir().unwrap();
1✔
3189
        let config = test_config(dir.path().to_path_buf());
1✔
3190
        let source = test_source(config);
1✔
3191
        let recipes = source.default_triplet_recipes();
1✔
3192
        assert_eq!(recipes.len(), 1);
1✔
3193
        assert_eq!(recipes[0].name, "huggingface_anchor_context");
1✔
3194
    }
1✔
3195

3196
    #[test]
3197
    fn download_and_materialize_shard_url_short_circuits_when_cached_complete() {
1✔
3198
        let dir = tempdir().unwrap();
1✔
3199
        let config = test_config(dir.path().to_path_buf());
1✔
3200
        let candidate = "url::https://host/datasets/org/ds/resolve/main/train/ok.ndjson";
1✔
3201
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
3202
        fs::create_dir_all(target.parent().unwrap()).unwrap();
1✔
3203
        fs::write(&target, b"ok").unwrap();
1✔
3204

3205
        let resolved =
1✔
3206
            HuggingFaceRowSource::download_and_materialize_shard(&config, candidate, Some(2))
1✔
3207
                .unwrap();
1✔
3208
        assert_eq!(resolved, target);
1✔
3209
    }
1✔
3210

3211
    #[test]
3212
    fn download_and_materialize_shard_url_replaces_stale_part_file() {
1✔
3213
        let dir = tempdir().unwrap();
1✔
3214
        let config = test_config(dir.path().to_path_buf());
1✔
3215
        let payload = b"{\"text\":\"a\"}\n".to_vec();
1✔
3216
        let (base_url, server) = spawn_one_shot_http(payload.clone());
1✔
3217
        let candidate = format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-x.ndjson");
1✔
3218
        let target = HuggingFaceRowSource::candidate_target_path(&config, &candidate);
1✔
3219
        let temp_target = target.with_extension("part");
1✔
3220
        fs::create_dir_all(temp_target.parent().unwrap()).unwrap();
1✔
3221
        fs::write(&temp_target, b"stale").unwrap();
1✔
3222

3223
        let out = HuggingFaceRowSource::download_and_materialize_shard(&config, &candidate, None)
1✔
3224
            .unwrap();
1✔
3225
        server.join().unwrap();
1✔
3226

3227
        assert_eq!(out, target);
1✔
3228
        assert_eq!(fs::read(&target).unwrap(), payload);
1✔
3229
    }
1✔
3230

3231
    #[test]
3232
    fn download_next_remote_shard_skips_when_max_rows_already_reached() {
1✔
3233
        let dir = tempdir().unwrap();
1✔
3234
        let mut config = test_config(dir.path().to_path_buf());
1✔
3235
        config.max_rows = Some(0);
1✔
3236
        let source = test_source(config);
1✔
3237
        let payload = b"{\"text\":\"x\"}\n".to_vec();
1✔
3238
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3239
        let candidate =
1✔
3240
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-200.ndjson");
1✔
3241

3242
        {
1✔
3243
            let mut state = source.state.lock().unwrap();
1✔
3244
            state.remote_candidates = Some(vec![candidate]);
1✔
3245
            state.next_remote_idx = 0;
1✔
3246
            state.materialized_rows = 0;
1✔
3247
        }
1✔
3248

3249
        assert!(source.download_next_remote_shard().unwrap());
1✔
3250
        server.join().unwrap();
1✔
3251
        let state = source.state.lock().unwrap();
1✔
3252
        assert_eq!(state.materialized_rows, 0);
1✔
3253
        assert!(state.shards.is_empty());
1✔
3254
    }
1✔
3255

3256
    #[test]
3257
    fn download_next_remote_shard_skips_zero_row_download() {
1✔
3258
        let dir = tempdir().unwrap();
1✔
3259
        let config = test_config(dir.path().to_path_buf());
1✔
3260
        let source = test_source(config);
1✔
3261
        let payload = Vec::<u8>::new();
1✔
3262
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3263
        let candidate =
1✔
3264
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-empty.ndjson");
1✔
3265

3266
        {
1✔
3267
            let mut state = source.state.lock().unwrap();
1✔
3268
            state.remote_candidates = Some(vec![candidate]);
1✔
3269
            state.next_remote_idx = 0;
1✔
3270
        }
1✔
3271

3272
        assert!(source.download_next_remote_shard().unwrap());
1✔
3273
        server.join().unwrap();
1✔
3274
        let state = source.state.lock().unwrap();
1✔
3275
        assert_eq!(state.materialized_rows, 0);
1✔
3276
        assert!(state.shards.is_empty());
1✔
3277
    }
1✔
3278

3279
    #[test]
3280
    fn read_row_batch_errors_when_parquet_reader_cannot_open_file() {
1✔
3281
        let dir = tempdir().unwrap();
1✔
3282
        let config = test_config(dir.path().to_path_buf());
1✔
3283
        let source = test_source(config);
1✔
3284
        {
1✔
3285
            let mut state = source.state.lock().unwrap();
1✔
3286
            state.materialized_rows = 1;
1✔
3287
            state.total_rows = Some(1);
1✔
3288
            state.shards = vec![ShardIndex {
1✔
3289
                path: dir.path().join("missing.parquet"),
1✔
3290
                global_start: 0,
1✔
3291
                row_count: 1,
1✔
3292
                is_parquet: true,
1✔
3293
                parquet_row_groups: vec![(0, 1)],
1✔
3294
                checkpoints: Vec::new(),
1✔
3295
            }];
1✔
3296
        }
1✔
3297

3298
        let mut out = Vec::new();
1✔
3299
        let err = source.read_row_batch(&[0], &mut out, Some(1));
1✔
3300
        assert!(err.is_err());
1✔
3301
    }
1✔
3302

3303
    #[test]
3304
    fn refresh_exercises_large_total_progress_branch() {
1✔
3305
        let dir = tempdir().unwrap();
1✔
3306
        let path = dir.path().join("rows.jsonl");
1✔
3307
        let line = b"{\"id\":\"r\",\"text\":\"v\"}\n";
1✔
3308
        let mut bytes = Vec::with_capacity(line.len() * 10_000);
1✔
3309
        for _ in 0..10_000 {
10,000✔
3310
            bytes.extend_from_slice(line);
10,000✔
3311
        }
10,000✔
3312
        fs::write(&path, bytes).unwrap();
1✔
3313

3314
        let mut config = test_config(dir.path().to_path_buf());
1✔
3315
        config.checkpoint_stride = 256;
1✔
3316
        config.refresh_batch_multiplier = 1;
1✔
3317
        let source = test_source(config.clone());
1✔
3318
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3319
            .unwrap()
1✔
3320
            .unwrap();
1✔
3321

3322
        {
1✔
3323
            let mut state = source.state.lock().unwrap();
1✔
3324
            state.materialized_rows = 10_000;
1✔
3325
            state.total_rows = Some(10_000);
1✔
3326
            state.shards = vec![shard];
1✔
3327
        }
1✔
3328

3329
        let snapshot = source.refresh(None, Some(1)).unwrap();
1✔
3330
        assert_eq!(snapshot.records.len(), 1);
1✔
3331
    }
1✔
3332

3333
    #[test]
3334
    fn shard_size_bytes_returns_zero_for_missing_path() {
1✔
3335
        let dir = tempdir().unwrap();
1✔
3336
        let missing = dir.path().join("missing.file");
1✔
3337
        assert_eq!(HuggingFaceRowSource::shard_size_bytes(&missing), 0);
1✔
3338
    }
1✔
3339

3340
    #[test]
3341
    fn load_persisted_shard_sequence_errors_on_invalid_json() {
1✔
3342
        let dir = tempdir().unwrap();
1✔
3343
        let config = test_config(dir.path().to_path_buf());
1✔
3344
        let path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
3345
        fs::create_dir_all(path.parent().unwrap()).unwrap();
1✔
3346
        fs::write(&path, b"{not-valid-json").unwrap();
1✔
3347

3348
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 1);
1✔
3349
        assert!(loaded.is_err());
1✔
3350
    }
1✔
3351

3352
    #[test]
3353
    fn rotate_candidates_deterministically_is_noop_for_singleton() {
1✔
3354
        let dir = tempdir().unwrap();
1✔
3355
        let config = test_config(dir.path().to_path_buf());
1✔
3356
        let mut candidates = vec!["one".to_string()];
1✔
3357
        HuggingFaceRowSource::rotate_candidates_deterministically(&config, &mut candidates);
1✔
3358
        assert_eq!(candidates, vec!["one".to_string()]);
1✔
3359
    }
1✔
3360

3361
    #[test]
3362
    fn extract_split_row_count_returns_none_when_missing_entries() {
1✔
3363
        let payload = json!({"size": {"configs": [{"config": "other", "splits": []}]}});
1✔
3364
        let rows = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
3365
            &payload, "default", "train",
1✔
3366
        );
3367
        assert!(rows.is_none());
1✔
3368
    }
1✔
3369

3370
    #[test]
3371
    fn candidates_from_parquet_manifest_json_returns_empty_without_entries() {
1✔
3372
        let dir = tempdir().unwrap();
1✔
3373
        let config = test_config(dir.path().to_path_buf());
1✔
3374
        let payload = json!({"other": []});
1✔
3375
        let (candidates, sizes) =
1✔
3376
            HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload).unwrap();
1✔
3377
        assert!(candidates.is_empty());
1✔
3378
        assert!(sizes.is_empty());
1✔
3379
    }
1✔
3380

3381
    #[test]
3382
    fn read_line_at_errors_on_unexpected_eof_while_reading_target_row() {
1✔
3383
        let dir = tempdir().unwrap();
1✔
3384
        let path = dir.path().join("rows.jsonl");
1✔
3385
        fs::write(&path, b"{\"text\":\"a\"}\n").unwrap();
1✔
3386
        let mut config = test_config(dir.path().to_path_buf());
1✔
3387
        config.checkpoint_stride = 1;
1✔
3388
        let source = test_source(config.clone());
1✔
3389
        let mut shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3390
            .unwrap()
1✔
3391
            .unwrap();
1✔
3392
        let end = fs::metadata(&path).unwrap().len();
1✔
3393
        shard.checkpoints = vec![0, end];
1✔
3394

3395
        let err = source.read_line_at(&shard, 1);
1✔
3396
        assert!(err.is_err());
1✔
3397
    }
1✔
3398

3399
    #[test]
3400
    fn load_persisted_shard_sequence_returns_none_when_state_missing() {
1✔
3401
        let dir = tempdir().unwrap();
1✔
3402
        let config = test_config(dir.path().to_path_buf());
1✔
3403
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 1).unwrap();
1✔
3404
        assert!(loaded.is_none());
1✔
3405
    }
1✔
3406

3407
    #[test]
3408
    fn persist_shard_sequence_clamps_next_index_on_write() {
1✔
3409
        let dir = tempdir().unwrap();
1✔
3410
        let config = test_config(dir.path().to_path_buf());
1✔
3411
        let source = test_source(config.clone());
1✔
3412
        let state = SourceState {
1✔
3413
            materialized_rows: 0,
1✔
3414
            total_rows: None,
1✔
3415
            shards: Vec::new(),
1✔
3416
            remote_candidates: Some(vec!["a".into(), "b".into()]),
1✔
3417
            remote_candidate_sizes: HashMap::new(),
1✔
3418
            next_remote_idx: 99,
1✔
3419
        };
1✔
3420

3421
        source.persist_shard_sequence_locked(&state).unwrap();
1✔
3422
        let raw =
1✔
3423
            fs::read_to_string(HuggingFaceRowSource::shard_sequence_state_path(&config)).unwrap();
1✔
3424
        let parsed: Value = serde_json::from_str(&raw).unwrap();
1✔
3425
        assert_eq!(
1✔
3426
            parsed.get("next_remote_idx").and_then(Value::as_u64),
1✔
3427
            Some(2)
3428
        );
3429
    }
1✔
3430

3431
    #[test]
3432
    fn materialize_local_file_replaces_target_when_size_differs() {
1✔
3433
        let dir = tempdir().unwrap();
1✔
3434
        let config = test_config(dir.path().to_path_buf());
1✔
3435
        let src = dir.path().join("src.ndjson");
1✔
3436
        let dst = dir.path().join("dst.ndjson");
1✔
3437
        fs::write(&src, b"newer\n").unwrap();
1✔
3438
        fs::write(&dst, b"old\n").unwrap();
1✔
3439

3440
        HuggingFaceRowSource::materialize_local_file(&config, &src, &dst).unwrap();
1✔
3441
        assert_eq!(fs::read(&dst).unwrap(), b"newer\n");
1✔
3442
    }
1✔
3443

3444
    #[test]
3445
    fn row_to_record_preserves_explicit_timestamp() {
1✔
3446
        let dir = tempdir().unwrap();
1✔
3447
        let config = test_config(dir.path().to_path_buf());
1✔
3448
        let source = test_source(config);
1✔
3449
        let ts = Utc::now();
1✔
3450
        let row = RowView {
1✔
3451
            row_id: Some("r1".into()),
1✔
3452
            timestamp: Some(ts),
1✔
3453
            text_fields: vec![RowTextField {
1✔
3454
                name: "text".into(),
1✔
3455
                text: "alpha".into(),
1✔
3456
            }],
1✔
3457
        };
1✔
3458

3459
        let record = source.row_to_record(&row, 0).unwrap().unwrap();
1✔
3460
        assert_eq!(record.created_at, ts);
1✔
3461
        assert_eq!(record.updated_at, ts);
1✔
3462
    }
1✔
3463

3464
    #[test]
3465
    fn parse_row_text_columns_accept_numeric_values() {
1✔
3466
        let dir = tempdir().unwrap();
1✔
3467
        let mut config = test_config(dir.path().to_path_buf());
1✔
3468
        config.text_columns = vec!["score".into()];
1✔
3469
        let source = test_source(config);
1✔
3470

3471
        let row = source.parse_row(0, &json!({"score": 123})).unwrap();
1✔
3472
        assert_eq!(row.text_fields.len(), 1);
1✔
3473
        assert_eq!(row.text_fields[0].text, "123");
1✔
3474
    }
1✔
3475

3476
    #[test]
3477
    fn len_hint_returns_zero_when_max_rows_is_zero() {
1✔
3478
        let dir = tempdir().unwrap();
1✔
3479
        let mut config = test_config(dir.path().to_path_buf());
1✔
3480
        config.max_rows = Some(0);
1✔
3481
        let source = test_source(config);
1✔
3482
        assert_eq!(source.len_hint(), Some(0));
1✔
3483
    }
1✔
3484

3485
    #[test]
3486
    fn refresh_limit_none_reads_up_to_total() {
1✔
3487
        let dir = tempdir().unwrap();
1✔
3488
        let path = dir.path().join("rows.jsonl");
1✔
3489
        fs::write(
1✔
3490
            &path,
1✔
3491
            b"{\"id\":\"r1\",\"text\":\"a\"}\n{\"id\":\"r2\",\"text\":\"b\"}\n",
3492
        )
3493
        .unwrap();
1✔
3494
        let mut config = test_config(dir.path().to_path_buf());
1✔
3495
        config.checkpoint_stride = 1;
1✔
3496
        config.refresh_batch_multiplier = 1;
1✔
3497
        let source = test_source(config.clone());
1✔
3498
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3499
            .unwrap()
1✔
3500
            .unwrap();
1✔
3501
        {
1✔
3502
            let mut state = source.state.lock().unwrap();
1✔
3503
            state.materialized_rows = 2;
1✔
3504
            state.total_rows = Some(2);
1✔
3505
            state.shards = vec![shard];
1✔
3506
        }
1✔
3507

3508
        let snapshot = source.refresh(None, None).unwrap();
1✔
3509
        assert_eq!(snapshot.records.len(), 2);
1✔
3510
    }
1✔
3511

3512
    #[test]
3513
    fn read_row_batch_skips_unavailable_indices_without_error() {
1✔
3514
        let dir = tempdir().unwrap();
1✔
3515
        let config = test_config(dir.path().to_path_buf());
1✔
3516
        let source = test_source(config);
1✔
3517
        {
1✔
3518
            let mut state = source.state.lock().unwrap();
1✔
3519
            state.materialized_rows = 0;
1✔
3520
            state.total_rows = Some(0);
1✔
3521
            state.remote_candidates = Some(Vec::new());
1✔
3522
        }
1✔
3523

3524
        let mut out = Vec::new();
1✔
3525
        source.read_row_batch(&[0, 1], &mut out, Some(2)).unwrap();
1✔
3526
        assert!(out.is_empty());
1✔
3527
    }
1✔
3528

3529
    #[test]
3530
    fn candidate_target_path_maps_remote_urls_under_manifest_root() {
1✔
3531
        let dir = tempdir().unwrap();
1✔
3532
        let config = test_config(dir.path().to_path_buf());
1✔
3533
        let candidate =
1✔
3534
            "url::https://huggingface.co/datasets/org/ds/resolve/main/train/part-000.parquet";
1✔
3535
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
3536
        assert!(target.ends_with("_parquet_manifest/main/train/part-000.parquet"));
1✔
3537
    }
1✔
3538

3539
    #[test]
3540
    fn candidate_target_path_keeps_local_candidates_relative() {
1✔
3541
        let dir = tempdir().unwrap();
1✔
3542
        let config = test_config(dir.path().to_path_buf());
1✔
3543
        let candidate = "train/part-001.ndjson";
1✔
3544
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
3545
        assert_eq!(target, config.snapshot_dir.join(candidate));
1✔
3546
    }
1✔
3547

3548
    #[test]
3549
    fn target_matches_expected_size_validates_when_expected_is_provided() {
1✔
3550
        let dir = tempdir().unwrap();
1✔
3551
        let path = dir.path().join("payload.bin");
1✔
3552
        fs::write(&path, vec![0u8; 5]).unwrap();
1✔
3553

3554
        assert!(HuggingFaceRowSource::target_matches_expected_size(
1✔
3555
            &path,
1✔
3556
            Some(5)
1✔
3557
        ));
3558
        assert!(!HuggingFaceRowSource::target_matches_expected_size(
1✔
3559
            &path,
1✔
3560
            Some(4)
1✔
3561
        ));
1✔
3562
        assert!(HuggingFaceRowSource::target_matches_expected_size(
1✔
3563
            &path, None
1✔
3564
        ));
3565
    }
1✔
3566

3567
    #[test]
3568
    fn parquet_row_group_map_and_index_single_shard_cover_success_path() {
1✔
3569
        let dir = tempdir().unwrap();
1✔
3570
        let path = dir.path().join("rows.parquet");
1✔
3571
        write_parquet_fixture(&path, &[("r1", "alpha"), ("r2", "beta"), ("r3", "gamma")]);
1✔
3572
        let config = test_config(dir.path().to_path_buf());
1✔
3573

3574
        let (total_rows, groups) =
1✔
3575
            HuggingFaceRowSource::parquet_row_group_map(&config, &path).unwrap();
1✔
3576
        assert_eq!(total_rows, 3);
1✔
3577
        assert!(!groups.is_empty());
1✔
3578

3579
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3580
            .unwrap()
1✔
3581
            .unwrap();
1✔
3582
        assert!(shard.is_parquet);
1✔
3583
        assert_eq!(shard.row_count, 3);
1✔
3584
        assert!(shard.checkpoints.is_empty());
1✔
3585
    }
1✔
3586

3587
    #[test]
3588
    fn read_row_batch_reads_parquet_rows_and_uses_cache_on_repeat() {
1✔
3589
        let dir = tempdir().unwrap();
1✔
3590
        let path = dir.path().join("rows.parquet");
1✔
3591
        write_parquet_fixture(&path, &[("r10", "ten"), ("r11", "eleven")]);
1✔
3592

3593
        let config = test_config(dir.path().to_path_buf());
1✔
3594
        let source = test_source(config.clone());
1✔
3595
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3596
            .unwrap()
1✔
3597
            .unwrap();
1✔
3598
        {
1✔
3599
            let mut state = source.state.lock().unwrap();
1✔
3600
            state.materialized_rows = 2;
1✔
3601
            state.total_rows = Some(2);
1✔
3602
            state.shards = vec![shard];
1✔
3603
        }
1✔
3604

3605
        let mut first = Vec::new();
1✔
3606
        source.read_row_batch(&[0, 1], &mut first, None).unwrap();
1✔
3607
        assert_eq!(first.len(), 2);
1✔
3608
        assert!(first.iter().any(|record| record.id.ends_with("::r10")));
1✔
3609

3610
        let mut second = Vec::new();
1✔
3611
        source.read_row_batch(&[0, 1], &mut second, None).unwrap();
1✔
3612
        assert_eq!(second.len(), 2);
1✔
3613
    }
1✔
3614

3615
    #[test]
3616
    fn ensure_row_available_bootstraps_from_in_memory_candidates() {
1✔
3617
        let dir = tempdir().unwrap();
1✔
3618
        let config = test_config(dir.path().to_path_buf());
1✔
3619
        let source = test_source(config.clone());
1✔
3620

3621
        let payload =
1✔
3622
            b"{\"id\":\"r1\",\"text\":\"alpha\"}\n{\"id\":\"r2\",\"text\":\"beta\"}\n".to_vec();
1✔
3623
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3624
        let candidate =
1✔
3625
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/persisted.ndjson");
1✔
3626

3627
        {
1✔
3628
            let mut state = source.state.lock().unwrap();
1✔
3629
            state.remote_candidates = Some(vec![candidate]);
1✔
3630
            state.next_remote_idx = 0;
1✔
3631
        }
1✔
3632

3633
        assert!(source.ensure_row_available(0).unwrap());
1✔
3634
        server.join().unwrap();
1✔
3635

3636
        let state = source.state.lock().unwrap();
1✔
3637
        assert_eq!(state.materialized_rows, 2);
1✔
3638
        assert_eq!(state.next_remote_idx, 1);
1✔
3639
        assert_eq!(state.shards.len(), 1);
1✔
3640
    }
1✔
3641

3642
    #[test]
3643
    fn configure_sampler_updates_len_hint_headroom_via_trait_methods() {
1✔
3644
        let dir = tempdir().unwrap();
1✔
3645
        let mut config = test_config(dir.path().to_path_buf());
1✔
3646
        config.cache_capacity = 10;
1✔
3647
        config.remote_expansion_headroom_multiplier = 3;
1✔
3648
        let source = test_source(config);
1✔
3649
        {
1✔
3650
            let mut state = source.state.lock().unwrap();
1✔
3651
            state.materialized_rows = 5;
1✔
3652
            state.total_rows = Some(100);
1✔
3653
        }
1✔
3654

3655
        assert_eq!(source.reported_record_count().unwrap(), 35);
1✔
3656

3657
        let sampler = SamplerConfig {
1✔
3658
            ingestion_max_records: 2,
1✔
3659
            ..SamplerConfig::default()
1✔
3660
        };
1✔
3661
        source.configure_sampler(&sampler);
1✔
3662

3663
        assert_eq!(source.reported_record_count().unwrap(), 11);
1✔
3664
    }
1✔
3665

3666
    #[test]
3667
    fn refresh_ignores_persisted_remote_sequence_state() {
1✔
3668
        let dir = tempdir().unwrap();
1✔
3669
        let config = test_config(dir.path().to_path_buf());
1✔
3670
        let source = test_source(config.clone());
1✔
3671

3672
        let payload = b"{\"id\":\"rr\",\"text\":\"hello\"}\n".to_vec();
1✔
3673
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3674
        let candidate =
1✔
3675
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/refresh.ndjson");
1✔
3676

3677
        let state_path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
3678
        fs::create_dir_all(state_path.parent().unwrap()).unwrap();
1✔
3679
        fs::write(
1✔
3680
            &state_path,
1✔
3681
            serde_json::to_vec_pretty(&json!({
1✔
3682
                "version": 1,
1✔
3683
                "source_id": config.source_id,
1✔
3684
                "dataset": config.dataset,
1✔
3685
                "config": config.config,
1✔
3686
                "split": config.split,
1✔
3687
                "sampler_seed": 1,
1✔
3688
                "candidates": [candidate],
1✔
3689
                "candidate_sizes": {},
1✔
3690
                "next_remote_idx": 1
1✔
3691
            }))
1✔
3692
            .unwrap(),
1✔
3693
        )
3694
        .unwrap();
1✔
3695

3696
        {
1✔
3697
            let mut state = source.state.lock().unwrap();
1✔
3698
            state.remote_candidates = Some(vec![format!(
1✔
3699
                "url::{base_url}/datasets/org/ds/resolve/main/train/refresh.ndjson"
1✔
3700
            )]);
1✔
3701
            state.next_remote_idx = 0;
1✔
3702
        }
1✔
3703

3704
        let snapshot = source.refresh(None, Some(1)).unwrap();
1✔
3705
        server.join().unwrap();
1✔
3706

3707
        assert_eq!(snapshot.records.len(), 1);
1✔
3708
        assert!(snapshot.records[0].id.contains("hf_test::rr"));
1✔
3709
    }
1✔
3710

3711
    #[test]
3712
    fn download_next_remote_shard_trims_rows_to_max_rows_limit() {
1✔
3713
        let dir = tempdir().unwrap();
1✔
3714
        let mut config = test_config(dir.path().to_path_buf());
1✔
3715
        config.max_rows = Some(1);
1✔
3716
        let source = test_source(config);
1✔
3717
        let payload = b"{\"text\":\"a\"}\n{\"text\":\"b\"}\n".to_vec();
1✔
3718
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3719
        let candidate = format!("url::{base_url}/datasets/org/ds/resolve/main/train/trim.ndjson");
1✔
3720

3721
        {
1✔
3722
            let mut state = source.state.lock().unwrap();
1✔
3723
            state.remote_candidates = Some(vec![candidate]);
1✔
3724
            state.next_remote_idx = 0;
1✔
3725
            state.materialized_rows = 0;
1✔
3726
        }
1✔
3727

3728
        assert!(source.download_next_remote_shard().unwrap());
1✔
3729
        server.join().unwrap();
1✔
3730

3731
        let state = source.state.lock().unwrap();
1✔
3732
        assert_eq!(state.materialized_rows, 1);
1✔
3733
        assert_eq!(state.shards.len(), 1);
1✔
3734
        assert_eq!(state.shards[0].row_count, 1);
1✔
3735
    }
1✔
3736

3737
    #[test]
3738
    fn build_shard_index_skips_empty_files_and_keeps_non_empty() {
1✔
3739
        let dir = tempdir().unwrap();
1✔
3740
        fs::write(dir.path().join("a.ndjson"), b"").unwrap();
1✔
3741
        fs::write(dir.path().join("b.ndjson"), b"{\"text\":\"x\"}\n").unwrap();
1✔
3742
        let config = test_config(dir.path().to_path_buf());
1✔
3743

3744
        let (shards, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
3745
        assert_eq!(discovered, 1);
1✔
3746
        assert_eq!(shards.len(), 1);
1✔
3747
        assert_eq!(shards[0].row_count, 1);
1✔
3748
    }
1✔
3749

3750
    #[test]
3751
    fn resolve_remote_candidates_from_siblings_falls_back_when_split_filter_misses() {
1✔
3752
        let dir = tempdir().unwrap();
1✔
3753
        let mut config = test_config(dir.path().to_path_buf());
1✔
3754
        config.split = "train".to_string();
1✔
3755
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
3756
        let siblings = vec![
1✔
3757
            "validation/file-a.ndjson".to_string(),
1✔
3758
            "test/file-b.ndjson".to_string(),
1✔
3759
        ];
3760

3761
        let (candidates, sizes) = HuggingFaceRowSource::resolve_remote_candidates_from_siblings(
1✔
3762
            &config, &siblings, &accepted,
1✔
3763
        )
1✔
3764
        .unwrap();
1✔
3765

3766
        assert!(sizes.is_empty());
1✔
3767
        assert_eq!(candidates.len(), 2);
1✔
3768
    }
1✔
3769

3770
    #[test]
3771
    fn resolve_remote_candidates_from_siblings_errors_for_parquet_only_when_not_accepted() {
1✔
3772
        let dir = tempdir().unwrap();
1✔
3773
        let mut config = test_config(dir.path().to_path_buf());
1✔
3774
        config.shard_extensions = vec!["ndjson".to_string()];
1✔
3775
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
3776
        let siblings = vec!["train/only.parquet".to_string()];
1✔
3777

3778
        let result = HuggingFaceRowSource::resolve_remote_candidates_from_siblings(
1✔
3779
            &config, &siblings, &accepted,
1✔
3780
        );
3781
        assert!(result.is_err());
1✔
3782
    }
1✔
3783

3784
    #[test]
3785
    fn resolve_remote_candidates_from_siblings_returns_empty_when_no_matches_and_no_parquet() {
1✔
3786
        let dir = tempdir().unwrap();
1✔
3787
        let config = test_config(dir.path().to_path_buf());
1✔
3788
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
3789
        let siblings = vec!["train/notes.txt".to_string()];
1✔
3790

3791
        let (candidates, sizes) = HuggingFaceRowSource::resolve_remote_candidates_from_siblings(
1✔
3792
            &config, &siblings, &accepted,
1✔
3793
        )
1✔
3794
        .unwrap();
1✔
3795
        assert!(candidates.is_empty());
1✔
3796
        assert!(sizes.is_empty());
1✔
3797
    }
1✔
3798

3799
    #[test]
3800
    fn parse_global_row_count_response_applies_max_rows() {
1✔
3801
        let dir = tempdir().unwrap();
1✔
3802
        let mut config = test_config(dir.path().to_path_buf());
1✔
3803
        config.max_rows = Some(3);
1✔
3804
        let body = serde_json::to_string(&json!({
1✔
3805
            "size": {
1✔
3806
                "splits": [
1✔
3807
                    {"config": "default", "split": "train", "num_rows": 10}
1✔
3808
                ]
1✔
3809
            }
1✔
3810
        }))
1✔
3811
        .unwrap();
1✔
3812

3813
        let rows = HuggingFaceRowSource::parse_global_row_count_response(&config, &body)
1✔
3814
            .unwrap()
1✔
3815
            .unwrap();
1✔
3816
        assert_eq!(rows, 3);
1✔
3817
    }
1✔
3818

3819
    #[test]
3820
    fn parse_global_row_count_response_errors_on_invalid_json() {
1✔
3821
        let dir = tempdir().unwrap();
1✔
3822
        let config = test_config(dir.path().to_path_buf());
1✔
3823
        let parsed = HuggingFaceRowSource::parse_global_row_count_response(&config, "{bad-json");
1✔
3824
        assert!(parsed.is_err());
1✔
3825
    }
1✔
3826

3827
    #[test]
3828
    fn parse_parquet_manifest_response_errors_on_invalid_json() {
1✔
3829
        let dir = tempdir().unwrap();
1✔
3830
        let config = test_config(dir.path().to_path_buf());
1✔
3831
        let parsed = HuggingFaceRowSource::parse_parquet_manifest_response(&config, "{bad-json");
1✔
3832
        assert!(parsed.is_err());
1✔
3833
    }
1✔
3834

3835
    #[test]
3836
    fn parse_parquet_manifest_response_returns_candidates() {
1✔
3837
        let dir = tempdir().unwrap();
1✔
3838
        let config = test_config(dir.path().to_path_buf());
1✔
3839
        let body = serde_json::to_string(&json!({
1✔
3840
            "parquet_files": [
1✔
3841
                {"url": "https://host/datasets/x/resolve/main/train/0.parquet", "size": 5}
1✔
3842
            ]
1✔
3843
        }))
1✔
3844
        .unwrap();
1✔
3845

3846
        let (candidates, sizes) =
1✔
3847
            HuggingFaceRowSource::parse_parquet_manifest_response(&config, &body).unwrap();
1✔
3848
        assert_eq!(candidates.len(), 1);
1✔
3849
        assert_eq!(sizes.len(), 1);
1✔
3850
    }
1✔
3851

3852
    #[test]
3853
    fn download_and_materialize_shard_downloads_url_candidate() {
1✔
3854
        let dir = tempdir().unwrap();
1✔
3855
        let config = test_config(dir.path().to_path_buf());
1✔
3856
        let payload = b"{\"text\":\"a\"}\n{\"text\":\"b\"}\n".to_vec();
1✔
3857
        let (base_url, server) = spawn_one_shot_http(payload.clone());
1✔
3858
        let candidate =
1✔
3859
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-000.ndjson");
1✔
3860

3861
        let target =
1✔
3862
            HuggingFaceRowSource::download_and_materialize_shard(&config, &candidate, None)
1✔
3863
                .unwrap();
1✔
3864

3865
        server.join().unwrap();
1✔
3866
        assert!(target.exists());
1✔
3867
        assert_eq!(fs::read(&target).unwrap(), payload);
1✔
3868
    }
1✔
3869

3870
    #[test]
3871
    fn download_and_materialize_shard_replaces_incomplete_existing_target() {
1✔
3872
        let dir = tempdir().unwrap();
1✔
3873
        let config = test_config(dir.path().to_path_buf());
1✔
3874
        let payload = b"{\"text\":\"a\"}\n".to_vec();
1✔
3875
        let (base_url, server) = spawn_one_shot_http(payload.clone());
1✔
3876
        let candidate =
1✔
3877
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-009.ndjson");
1✔
3878

3879
        let target = HuggingFaceRowSource::candidate_target_path(&config, &candidate);
1✔
3880
        fs::create_dir_all(target.parent().unwrap()).unwrap();
1✔
3881
        fs::write(&target, b"bad").unwrap();
1✔
3882

3883
        let refreshed = HuggingFaceRowSource::download_and_materialize_shard(
1✔
3884
            &config,
1✔
3885
            &candidate,
1✔
3886
            Some(payload.len() as u64),
1✔
3887
        )
3888
        .unwrap();
1✔
3889

3890
        server.join().unwrap();
1✔
3891
        assert_eq!(refreshed, target);
1✔
3892
        assert_eq!(fs::read(&target).unwrap(), payload);
1✔
3893
    }
1✔
3894

3895
    #[test]
3896
    fn download_next_remote_shard_materializes_and_indexes_rows() {
1✔
3897
        let dir = tempdir().unwrap();
1✔
3898
        let config = test_config(dir.path().to_path_buf());
1✔
3899
        let source = test_source(config);
1✔
3900
        let payload = b"{\"text\":\"a\"}\n{\"text\":\"b\"}\n".to_vec();
1✔
3901
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3902
        let candidate =
1✔
3903
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-001.ndjson");
1✔
3904

3905
        {
1✔
3906
            let mut state = source.state.lock().unwrap();
1✔
3907
            state.remote_candidates = Some(vec![candidate.clone()]);
1✔
3908
            state.remote_candidate_sizes.insert(candidate, 24);
1✔
3909
            state.next_remote_idx = 0;
1✔
3910
        }
1✔
3911

3912
        assert!(source.download_next_remote_shard().unwrap());
1✔
3913
        server.join().unwrap();
1✔
3914

3915
        let state = source.state.lock().unwrap();
1✔
3916
        assert_eq!(state.materialized_rows, 2);
1✔
3917
        assert_eq!(state.shards.len(), 1);
1✔
3918
        assert_eq!(state.next_remote_idx, 1);
1✔
3919
    }
1✔
3920

3921
    #[test]
3922
    fn ensure_row_available_triggers_lazy_download_for_remote_candidates() {
1✔
3923
        let dir = tempdir().unwrap();
1✔
3924
        let config = test_config(dir.path().to_path_buf());
1✔
3925
        let source = test_source(config);
1✔
3926
        let payload = b"{\"text\":\"x\"}\n{\"text\":\"y\"}\n".to_vec();
1✔
3927
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3928
        let candidate =
1✔
3929
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-002.ndjson");
1✔
3930

3931
        {
1✔
3932
            let mut state = source.state.lock().unwrap();
1✔
3933
            state.materialized_rows = 0;
1✔
3934
            state.remote_candidates = Some(vec![candidate.clone()]);
1✔
3935
            state.remote_candidate_sizes.insert(candidate, 24);
1✔
3936
            state.next_remote_idx = 0;
1✔
3937
        }
1✔
3938

3939
        assert!(source.ensure_row_available(0).unwrap());
1✔
3940
        server.join().unwrap();
1✔
3941

3942
        let state = source.state.lock().unwrap();
1✔
3943
        assert!(state.materialized_rows >= 1);
1✔
3944
        assert_eq!(state.next_remote_idx, 1);
1✔
3945
    }
1✔
3946

3947
    #[test]
3948
    fn extract_split_row_count_reads_split_entries() {
1✔
3949
        let payload = json!({
1✔
3950
            "size": {
1✔
3951
                "splits": [
1✔
3952
                    {"config": "default", "split": "train", "num_rows": 123u64},
1✔
3953
                    {"config": "default", "split": "validation", "num_rows": 45u64}
1✔
3954
                ]
3955
            }
3956
        });
3957

3958
        let count = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
3959
            &payload,
1✔
3960
            "default",
1✔
3961
            "validation",
1✔
3962
        );
3963
        assert_eq!(count, Some(45));
1✔
3964
    }
1✔
3965

3966
    #[test]
3967
    fn extract_split_row_count_reads_config_fallback_and_dataset_total() {
1✔
3968
        let payload = json!({
1✔
3969
            "size": {
1✔
3970
                "configs": [
1✔
3971
                    {
3972
                        "config": "default",
1✔
3973
                        "splits": [{"name": "test", "num_rows": 77u64}],
1✔
3974
                        "num_rows": 200u64
1✔
3975
                    }
3976
                ],
3977
                "dataset": {"num_rows": 999u64}
1✔
3978
            }
3979
        });
3980

3981
        let split_count = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
3982
            &payload, "default", "test",
1✔
3983
        );
3984
        assert_eq!(split_count, Some(77));
1✔
3985

3986
        let empty_split_count = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
3987
            &payload, "default", "",
1✔
3988
        );
3989
        assert_eq!(empty_split_count, Some(200));
1✔
3990
    }
1✔
3991

3992
    #[test]
3993
    fn shard_candidate_seed_is_seeded_and_source_scoped() {
1✔
3994
        let dir = tempdir().unwrap();
1✔
3995
        let mut a = test_config(dir.path().join("a"));
1✔
3996
        let mut b = test_config(dir.path().join("b"));
1✔
3997
        a.source_id = "source_a".to_string();
1✔
3998
        b.source_id = "source_b".to_string();
1✔
3999

4000
        let with_seed_a = HuggingFaceRowSource::shard_candidate_seed(&a, 100, 42);
1✔
4001
        let with_seed_a_again = HuggingFaceRowSource::shard_candidate_seed(&a, 100, 42);
1✔
4002
        assert_eq!(with_seed_a, with_seed_a_again);
1✔
4003

4004
        let with_seed_b = HuggingFaceRowSource::shard_candidate_seed(&b, 100, 42);
1✔
4005
        assert_ne!(with_seed_a, with_seed_b);
1✔
4006

4007
        let different_seed_a = HuggingFaceRowSource::shard_candidate_seed(&a, 100, 7);
1✔
4008
        assert_ne!(with_seed_a, different_seed_a);
1✔
4009
    }
1✔
4010

4011
    #[test]
4012
    fn remote_shard_permutation_is_deterministic_by_sampler_seed() {
1✔
4013
        let dir = tempdir().unwrap();
1✔
4014
        let config = test_config(dir.path().to_path_buf());
1✔
4015
        let total = 8usize;
1✔
4016

4017
        let seed_a = HuggingFaceRowSource::shard_candidate_seed(&config, total, 7);
1✔
4018
        let seed_b = HuggingFaceRowSource::shard_candidate_seed(&config, total, 7);
1✔
4019
        let seed_c = HuggingFaceRowSource::shard_candidate_seed(&config, total, 123);
1✔
4020

4021
        let mut perm_a = crate::source::IndexPermutation::new(total, seed_a, 0);
1✔
4022
        let mut perm_b = crate::source::IndexPermutation::new(total, seed_b, 0);
1✔
4023
        let mut perm_c = crate::source::IndexPermutation::new(total, seed_c, 0);
1✔
4024

4025
        let take = 6usize;
1✔
4026
        let order_a: Vec<usize> = (0..take).map(|_| perm_a.next()).collect();
6✔
4027
        let order_b: Vec<usize> = (0..take).map(|_| perm_b.next()).collect();
6✔
4028
        let order_c: Vec<usize> = (0..take).map(|_| perm_c.next()).collect();
6✔
4029

4030
        assert_eq!(order_a, order_b);
1✔
4031
        assert_ne!(order_a, order_c);
1✔
4032
    }
1✔
4033

4034
    #[test]
4035
    fn build_shard_index_ignores_manifest_cache_artifacts() {
1✔
4036
        let dir = tempdir().unwrap();
1✔
4037
        let mut config = test_config(dir.path().to_path_buf());
1✔
4038
        config.shard_extensions = vec!["ndjson".to_string()];
1✔
4039

4040
        let local = dir.path().join("local.ndjson");
1✔
4041
        fs::write(&local, b"{\"id\":\"l1\",\"text\":\"x\"}\n").unwrap();
1✔
4042

4043
        let manifest_cached = dir
1✔
4044
            .path()
1✔
4045
            .join("_parquet_manifest")
1✔
4046
            .join("main/train/cached.ndjson");
1✔
4047
        fs::create_dir_all(manifest_cached.parent().unwrap()).unwrap();
1✔
4048
        fs::write(&manifest_cached, b"{\"id\":\"r1\",\"text\":\"y\"}\n").unwrap();
1✔
4049

4050
        let (shards, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
4051
        assert_eq!(discovered, 1);
1✔
4052
        assert_eq!(shards.len(), 1);
1✔
4053
        assert_eq!(shards[0].path, local);
1✔
4054
    }
1✔
4055

4056
    #[test]
4057
    fn expansion_headroom_uses_sampler_ingestion_max_records_when_configured() {
1✔
4058
        let dir = tempdir().unwrap();
1✔
4059
        let config = test_config(dir.path().to_path_buf());
1✔
4060
        let source = test_source(config);
1✔
4061

4062
        assert_eq!(source.effective_expansion_headroom_rows(), 30);
1✔
4063

4064
        let sampler = SamplerConfig {
1✔
4065
            ingestion_max_records: 7,
1✔
4066
            ..SamplerConfig::default()
1✔
4067
        };
1✔
4068
        source.configure_sampler(&sampler);
1✔
4069
        assert_eq!(source.effective_expansion_headroom_rows(), 21);
1✔
4070
    }
1✔
4071

4072
    #[test]
4073
    fn persisted_shard_sequence_roundtrip_respects_sampler_seed() {
1✔
4074
        let dir = tempdir().unwrap();
1✔
4075
        let config = test_config(dir.path().to_path_buf());
1✔
4076
        let source = test_source(config.clone());
1✔
4077

4078
        {
1✔
4079
            let sampler = SamplerConfig {
1✔
4080
                seed: 4242,
1✔
4081
                ..SamplerConfig::default()
1✔
4082
            };
1✔
4083
            source.configure_sampler(&sampler);
1✔
4084
        }
1✔
4085

4086
        let mut state = SourceState {
1✔
4087
            materialized_rows: 0,
1✔
4088
            total_rows: None,
1✔
4089
            shards: Vec::new(),
1✔
4090
            remote_candidates: Some(vec![
1✔
4091
                "url::https://x/resolve/main/train/000.parquet".to_string(),
1✔
4092
                "url::https://x/resolve/main/train/001.parquet".to_string(),
1✔
4093
            ]),
1✔
4094
            remote_candidate_sizes: HashMap::new(),
1✔
4095
            next_remote_idx: 1,
1✔
4096
        };
1✔
4097
        state.remote_candidate_sizes.insert(
1✔
4098
            "url::https://x/resolve/main/train/000.parquet".to_string(),
1✔
4099
            10,
4100
        );
4101

4102
        source.persist_shard_sequence_locked(&state).unwrap();
1✔
4103

4104
        let restored = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 4242).unwrap();
1✔
4105
        assert!(restored.is_some());
1✔
4106
        let restored = restored.unwrap();
1✔
4107
        assert_eq!(restored.next_remote_idx, 1);
1✔
4108
        assert_eq!(restored.candidates.len(), 2);
1✔
4109

4110
        let rejected = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 9999).unwrap();
1✔
4111
        assert!(rejected.is_none());
1✔
4112
    }
1✔
4113

4114
    #[test]
4115
    fn value_to_text_handles_scalar_and_structured_values() {
1✔
4116
        assert_eq!(HuggingFaceRowSource::value_to_text(&json!(null)), None);
1✔
4117
        assert_eq!(HuggingFaceRowSource::value_to_text(&json!("   ")), None);
1✔
4118
        assert_eq!(
1✔
4119
            HuggingFaceRowSource::value_to_text(&json!("hello")),
1✔
4120
            Some("hello".into())
1✔
4121
        );
4122
        assert_eq!(
1✔
4123
            HuggingFaceRowSource::value_to_text(&json!(true)),
1✔
4124
            Some("true".into())
1✔
4125
        );
4126
        assert_eq!(
1✔
4127
            HuggingFaceRowSource::value_to_text(&json!(3.5)),
1✔
4128
            Some("3.5".into())
1✔
4129
        );
4130
        assert_eq!(
1✔
4131
            HuggingFaceRowSource::value_to_text(&json!([1, 2])),
1✔
4132
            Some("[1,2]".into())
1✔
4133
        );
4134
    }
1✔
4135

4136
    #[test]
4137
    fn parse_row_auto_detects_text_fields_and_skips_id() {
1✔
4138
        let dir = tempdir().unwrap();
1✔
4139
        let mut config = test_config(dir.path().to_path_buf());
1✔
4140
        config.id_column = Some("id".into());
1✔
4141
        let source = test_source(config);
1✔
4142

4143
        let row = source
1✔
4144
            .parse_row(
1✔
4145
                5,
4146
                &json!({
1✔
4147
                    "id": "row-5",
1✔
4148
                    "title": "Anchor text",
1✔
4149
                    "body": "Context text",
1✔
4150
                    "flag": true
1✔
4151
                }),
1✔
4152
            )
4153
            .unwrap();
1✔
4154

4155
        assert_eq!(row.row_id.as_deref(), Some("row-5"));
1✔
4156
        assert!(row.text_fields.iter().any(|f| f.name == "title"));
3✔
4157
        assert!(row.text_fields.iter().any(|f| f.name == "body"));
1✔
4158
        assert!(row.text_fields.iter().all(|f| f.name != "id"));
3✔
4159
    }
1✔
4160

4161
    #[test]
4162
    fn parse_row_with_required_columns_errors_when_missing() {
1✔
4163
        let dir = tempdir().unwrap();
1✔
4164
        let mut config = test_config(dir.path().to_path_buf());
1✔
4165
        config.anchor_column = Some("anchor".into());
1✔
4166
        config.positive_column = Some("positive".into());
1✔
4167
        config.context_columns = vec!["context".into()];
1✔
4168
        let source = test_source(config);
1✔
4169

4170
        let err = source.parse_row(0, &json!({"anchor": "x", "context": "z"}));
1✔
4171
        assert!(err.is_err());
1✔
4172
    }
1✔
4173

4174
    #[test]
4175
    fn parse_row_errors_when_payload_is_not_object() {
1✔
4176
        let dir = tempdir().unwrap();
1✔
4177
        let config = test_config(dir.path().to_path_buf());
1✔
4178
        let source = test_source(config);
1✔
4179

4180
        let err = source.parse_row(0, &json!("not-an-object"));
1✔
4181
        assert!(err.is_err());
1✔
4182
    }
1✔
4183

4184
    #[test]
4185
    fn row_to_record_builds_expected_sections() {
1✔
4186
        let dir = tempdir().unwrap();
1✔
4187
        let config = test_config(dir.path().to_path_buf());
1✔
4188
        let source = test_source(config);
1✔
4189
        let row = RowView {
1✔
4190
            row_id: Some("abc".into()),
1✔
4191
            timestamp: Some(Utc::now()),
1✔
4192
            text_fields: vec![
1✔
4193
                RowTextField {
1✔
4194
                    name: "title".into(),
1✔
4195
                    text: "anchor".into(),
1✔
4196
                },
1✔
4197
                RowTextField {
1✔
4198
                    name: "pos".into(),
1✔
4199
                    text: "positive".into(),
1✔
4200
                },
1✔
4201
                RowTextField {
1✔
4202
                    name: "ctx".into(),
1✔
4203
                    text: "extra".into(),
1✔
4204
                },
1✔
4205
            ],
1✔
4206
        };
1✔
4207

4208
        let record = source.row_to_record(&row, 1).unwrap().unwrap();
1✔
4209
        assert_eq!(record.sections.len(), 3);
1✔
4210
        assert_eq!(record.sections[0].role, SectionRole::Anchor);
1✔
4211
        assert_eq!(record.sections[1].role, SectionRole::Context);
1✔
4212
        assert_eq!(record.id, "hf_test::abc");
1✔
4213
    }
1✔
4214

4215
    #[test]
4216
    fn effective_refresh_batch_target_uses_multiplier_floor_of_one() {
1✔
4217
        let dir = tempdir().unwrap();
1✔
4218
        let mut config = test_config(dir.path().to_path_buf());
1✔
4219
        config.refresh_batch_multiplier = 0;
1✔
4220
        let source = test_source(config);
1✔
4221
        assert_eq!(source.effective_refresh_batch_target(7), 7);
1✔
4222
    }
1✔
4223

4224
    #[test]
4225
    fn locate_shard_and_recompute_offsets_work() {
1✔
4226
        let mut shards = vec![
1✔
4227
            ShardIndex {
1✔
4228
                path: PathBuf::from("a"),
1✔
4229
                global_start: 10,
1✔
4230
                row_count: 3,
1✔
4231
                is_parquet: false,
1✔
4232
                parquet_row_groups: Vec::new(),
1✔
4233
                checkpoints: vec![0],
1✔
4234
            },
1✔
4235
            ShardIndex {
1✔
4236
                path: PathBuf::from("b"),
1✔
4237
                global_start: 20,
1✔
4238
                row_count: 2,
1✔
4239
                is_parquet: false,
1✔
4240
                parquet_row_groups: Vec::new(),
1✔
4241
                checkpoints: vec![0],
1✔
4242
            },
1✔
4243
        ];
4244
        let hit = HuggingFaceRowSource::locate_shard(&shards, 11).unwrap();
1✔
4245
        assert_eq!(hit.1, 1);
1✔
4246

4247
        let mut state = SourceState {
1✔
4248
            materialized_rows: 0,
1✔
4249
            total_rows: None,
1✔
4250
            shards: std::mem::take(&mut shards),
1✔
4251
            remote_candidates: None,
1✔
4252
            remote_candidate_sizes: HashMap::new(),
1✔
4253
            next_remote_idx: 0,
1✔
4254
        };
1✔
4255
        HuggingFaceRowSource::recompute_shard_offsets(&mut state);
1✔
4256
        assert_eq!(state.shards[0].global_start, 0);
1✔
4257
        assert_eq!(state.shards[1].global_start, 3);
1✔
4258
        assert_eq!(state.materialized_rows, 5);
1✔
4259
    }
1✔
4260

4261
    #[test]
4262
    fn len_hint_covers_known_and_empty_paths() {
1✔
4263
        let dir = tempdir().unwrap();
1✔
4264
        let mut config = test_config(dir.path().to_path_buf());
1✔
4265
        config.max_rows = Some(9);
1✔
4266
        let source = test_source(config);
1✔
4267

4268
        {
1✔
4269
            let mut state = source.state.lock().unwrap();
1✔
4270
            state.materialized_rows = 5;
1✔
4271
            state.total_rows = Some(100);
1✔
4272
        }
1✔
4273
        assert_eq!(source.len_hint(), Some(9));
1✔
4274

4275
        {
1✔
4276
            let mut state = source.state.lock().unwrap();
1✔
4277
            state.materialized_rows = 0;
1✔
4278
            state.total_rows = Some(0);
1✔
4279
        }
1✔
4280
        assert_eq!(source.len_hint(), Some(0));
1✔
4281
    }
1✔
4282

4283
    #[test]
4284
    fn len_hint_defaults_to_one_when_unknown_and_not_exhausted() {
1✔
4285
        let dir = tempdir().unwrap();
1✔
4286
        let config = test_config(dir.path().to_path_buf());
1✔
4287
        let source = test_source(config);
1✔
4288
        assert_eq!(source.len_hint(), Some(1));
1✔
4289
    }
1✔
4290

4291
    #[test]
4292
    fn read_line_at_reads_expected_row_with_checkpoints() {
1✔
4293
        let dir = tempdir().unwrap();
1✔
4294
        let path = dir.path().join("rows.jsonl");
1✔
4295
        let mut file = File::create(&path).unwrap();
1✔
4296
        file.write_all(b"{\"text\":\"a\"}\n").unwrap();
1✔
4297
        file.write_all(b"{\"text\":\"b\"}\n").unwrap();
1✔
4298
        file.write_all(b"{\"text\":\"c\"}\n").unwrap();
1✔
4299

4300
        let mut config = test_config(dir.path().to_path_buf());
1✔
4301
        config.checkpoint_stride = 1;
1✔
4302
        let source = test_source(config.clone());
1✔
4303
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4304
            .unwrap()
1✔
4305
            .unwrap();
1✔
4306

4307
        let line = source.read_line_at(&shard, 2).unwrap();
1✔
4308
        assert!(line.contains("\"c\""));
1✔
4309
    }
1✔
4310

4311
    #[test]
4312
    fn read_line_at_errors_when_checkpoint_is_missing() {
1✔
4313
        let dir = tempdir().unwrap();
1✔
4314
        let path = dir.path().join("rows.jsonl");
1✔
4315
        fs::write(&path, b"{\"text\":\"a\"}\n").unwrap();
1✔
4316

4317
        let mut config = test_config(dir.path().to_path_buf());
1✔
4318
        config.checkpoint_stride = 1;
1✔
4319
        let source = test_source(config.clone());
1✔
4320
        let mut shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4321
            .unwrap()
1✔
4322
            .unwrap();
1✔
4323
        shard.checkpoints.clear();
1✔
4324

4325
        let err = source.read_line_at(&shard, 0);
1✔
4326
        assert!(err.is_err());
1✔
4327
    }
1✔
4328

4329
    #[test]
4330
    fn load_persisted_shard_sequence_clamps_next_index_to_candidate_len() {
1✔
4331
        let dir = tempdir().unwrap();
1✔
4332
        let config = test_config(dir.path().to_path_buf());
1✔
4333
        let state_path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
4334
        fs::create_dir_all(state_path.parent().unwrap()).unwrap();
1✔
4335
        fs::write(
1✔
4336
            &state_path,
1✔
4337
            serde_json::to_vec_pretty(&json!({
1✔
4338
                "version": 1,
1✔
4339
                "source_id": config.source_id,
1✔
4340
                "dataset": config.dataset,
1✔
4341
                "config": config.config,
1✔
4342
                "split": config.split,
1✔
4343
                "sampler_seed": 1,
1✔
4344
                "candidates": ["url::http://x/resolve/main/train/000.ndjson"],
1✔
4345
                "candidate_sizes": {},
1✔
4346
                "next_remote_idx": 99
1✔
4347
            }))
1✔
4348
            .unwrap(),
1✔
4349
        )
4350
        .unwrap();
1✔
4351

4352
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, 1)
1✔
4353
            .unwrap()
1✔
4354
            .unwrap();
1✔
4355
        assert_eq!(loaded.next_remote_idx, 1);
1✔
4356
    }
1✔
4357

4358
    #[test]
4359
    fn materialize_local_file_copies_and_is_idempotent_when_size_matches() {
1✔
4360
        let dir = tempdir().unwrap();
1✔
4361
        let config = test_config(dir.path().to_path_buf());
1✔
4362
        let src = dir.path().join("src.ndjson");
1✔
4363
        let dst = dir.path().join("nested/dst.ndjson");
1✔
4364

4365
        fs::write(&src, b"line\n").unwrap();
1✔
4366
        HuggingFaceRowSource::materialize_local_file(&config, &src, &dst).unwrap();
1✔
4367
        let first = fs::read(&dst).unwrap();
1✔
4368
        HuggingFaceRowSource::materialize_local_file(&config, &src, &dst).unwrap();
1✔
4369
        let second = fs::read(&dst).unwrap();
1✔
4370
        assert_eq!(first, second);
1✔
4371
    }
1✔
4372

4373
    #[test]
4374
    fn enforce_disk_cap_evicts_old_manifest_shards() {
1✔
4375
        let dir = tempdir().unwrap();
1✔
4376
        let mut config = test_config(dir.path().to_path_buf());
1✔
4377
        config.local_disk_cap_bytes = Some(10);
1✔
4378
        config.min_resident_shards = 0;
1✔
4379
        let source = test_source(config);
1✔
4380

4381
        let manifest_root = source.manifest_cache_root();
1✔
4382
        fs::create_dir_all(&manifest_root).unwrap();
1✔
4383
        let evict_path = manifest_root.join("a.parquet");
1✔
4384
        let keep_path = manifest_root.join("b.parquet");
1✔
4385
        fs::write(&evict_path, vec![1u8; 8]).unwrap();
1✔
4386
        fs::write(&keep_path, vec![2u8; 8]).unwrap();
1✔
4387

4388
        let mut state = SourceState {
1✔
4389
            materialized_rows: 16,
1✔
4390
            total_rows: None,
1✔
4391
            shards: vec![
1✔
4392
                ShardIndex {
1✔
4393
                    path: evict_path.clone(),
1✔
4394
                    global_start: 0,
1✔
4395
                    row_count: 8,
1✔
4396
                    is_parquet: true,
1✔
4397
                    parquet_row_groups: vec![(0, 8)],
1✔
4398
                    checkpoints: Vec::new(),
1✔
4399
                },
1✔
4400
                ShardIndex {
1✔
4401
                    path: keep_path.clone(),
1✔
4402
                    global_start: 8,
1✔
4403
                    row_count: 8,
1✔
4404
                    is_parquet: true,
1✔
4405
                    parquet_row_groups: vec![(0, 8)],
1✔
4406
                    checkpoints: Vec::new(),
1✔
4407
                },
1✔
4408
            ],
1✔
4409
            remote_candidates: None,
1✔
4410
            remote_candidate_sizes: HashMap::new(),
1✔
4411
            next_remote_idx: 0,
1✔
4412
        };
1✔
4413

4414
        let evicted = source
1✔
4415
            .enforce_disk_cap_locked(&mut state, &keep_path)
1✔
4416
            .unwrap();
1✔
4417
        assert!(evicted);
1✔
4418
        assert!(!evict_path.exists());
1✔
4419
        assert!(keep_path.exists());
1✔
4420
        assert_eq!(state.shards.len(), 1);
1✔
4421
    }
1✔
4422

4423
    #[test]
4424
    fn enforce_disk_cap_errors_when_min_resident_prevents_eviction() {
1✔
4425
        let dir = tempdir().unwrap();
1✔
4426
        let mut config = test_config(dir.path().to_path_buf());
1✔
4427
        config.local_disk_cap_bytes = Some(4);
1✔
4428
        config.min_resident_shards = 1;
1✔
4429
        let source = test_source(config);
1✔
4430

4431
        let manifest_root = source.manifest_cache_root();
1✔
4432
        fs::create_dir_all(&manifest_root).unwrap();
1✔
4433
        let protected = manifest_root.join("only.parquet");
1✔
4434
        fs::write(&protected, vec![1u8; 8]).unwrap();
1✔
4435

4436
        let mut state = SourceState {
1✔
4437
            materialized_rows: 8,
1✔
4438
            total_rows: None,
1✔
4439
            shards: vec![ShardIndex {
1✔
4440
                path: protected.clone(),
1✔
4441
                global_start: 0,
1✔
4442
                row_count: 8,
1✔
4443
                is_parquet: true,
1✔
4444
                parquet_row_groups: vec![(0, 8)],
1✔
4445
                checkpoints: Vec::new(),
1✔
4446
            }],
1✔
4447
            remote_candidates: None,
1✔
4448
            remote_candidate_sizes: HashMap::new(),
1✔
4449
            next_remote_idx: 0,
1✔
4450
        };
1✔
4451

4452
        let err = source.enforce_disk_cap_locked(&mut state, &protected);
1✔
4453
        assert!(err.is_err());
1✔
4454
        assert!(!protected.exists());
1✔
4455
    }
1✔
4456

4457
    #[test]
4458
    fn build_shard_index_discovers_local_jsonl_shards() {
1✔
4459
        let dir = tempdir().unwrap();
1✔
4460
        let root = dir.path().to_path_buf();
1✔
4461
        fs::write(root.join("a.jsonl"), b"{\"text\":\"a\"}\n").unwrap();
1✔
4462
        fs::write(root.join("b.ndjson"), b"{\"text\":\"b\"}\n").unwrap();
1✔
4463

4464
        let config = test_config(root.clone());
1✔
4465
        let (shards, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
4466
        assert_eq!(discovered, 2);
1✔
4467
        assert_eq!(shards.len(), 2);
1✔
4468
    }
1✔
4469

4470
    #[test]
4471
    fn index_single_shard_returns_none_for_empty_file() {
1✔
4472
        let dir = tempdir().unwrap();
1✔
4473
        let config = test_config(dir.path().to_path_buf());
1✔
4474
        let path = dir.path().join("empty.jsonl");
1✔
4475
        fs::write(&path, b"").unwrap();
1✔
4476
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0).unwrap();
1✔
4477
        assert!(shard.is_none());
1✔
4478
    }
1✔
4479

4480
    #[test]
4481
    fn refresh_reads_local_rows_and_advances_cursor() {
1✔
4482
        let dir = tempdir().unwrap();
1✔
4483
        let path = dir.path().join("rows.jsonl");
1✔
4484
        fs::write(
1✔
4485
            &path,
1✔
4486
            b"{\"id\":\"r1\",\"text\":\"alpha\"}\n{\"id\":\"r2\",\"text\":\"beta\"}\n{\"id\":\"r3\",\"text\":\"gamma\"}\n",
4487
        )
4488
        .unwrap();
1✔
4489

4490
        let mut config = test_config(dir.path().to_path_buf());
1✔
4491
        config.checkpoint_stride = 1;
1✔
4492
        config.refresh_batch_multiplier = 1;
1✔
4493
        let source = test_source(config.clone());
1✔
4494
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4495
            .unwrap()
1✔
4496
            .unwrap();
1✔
4497
        {
1✔
4498
            let mut state = source.state.lock().unwrap();
1✔
4499
            state.materialized_rows = shard.row_count;
1✔
4500
            state.total_rows = Some(shard.row_count);
1✔
4501
            state.shards = vec![shard];
1✔
4502
        }
1✔
4503

4504
        let snapshot = source.refresh(None, Some(2)).unwrap();
1✔
4505
        assert_eq!(snapshot.records.len(), 2);
1✔
4506
        assert!(snapshot.cursor.revision > 0);
1✔
4507
    }
1✔
4508

4509
    #[test]
4510
    fn reported_record_count_uses_len_hint_for_local_state() {
1✔
4511
        let dir = tempdir().unwrap();
1✔
4512
        let config = test_config(dir.path().to_path_buf());
1✔
4513
        let source = test_source(config);
1✔
4514
        {
1✔
4515
            let mut state = source.state.lock().unwrap();
1✔
4516
            state.materialized_rows = 4;
1✔
4517
            state.total_rows = Some(4);
1✔
4518
        }
1✔
4519
        assert_eq!(source.reported_record_count().unwrap(), 4);
1✔
4520
    }
1✔
4521

4522
    #[test]
4523
    fn rotate_candidates_deterministically_preserves_membership() {
1✔
4524
        let dir = tempdir().unwrap();
1✔
4525
        let config = test_config(dir.path().to_path_buf());
1✔
4526
        let original = vec!["a".to_string(), "b".to_string(), "c".to_string()];
1✔
4527
        let mut rotated = original.clone();
1✔
4528
        HuggingFaceRowSource::rotate_candidates_deterministically(&config, &mut rotated);
1✔
4529
        let mut sorted_original = original;
1✔
4530
        let mut sorted_rotated = rotated;
1✔
4531
        sorted_original.sort();
1✔
4532
        sorted_rotated.sort();
1✔
4533
        assert_eq!(sorted_rotated, sorted_original);
1✔
4534
    }
1✔
4535

4536
    #[test]
4537
    fn parse_row_supports_row_wrapped_payload_and_text_columns() {
1✔
4538
        let dir = tempdir().unwrap();
1✔
4539
        let mut config = test_config(dir.path().to_path_buf());
1✔
4540
        config.text_columns = vec!["headline".into(), "body".into()];
1✔
4541
        config.id_column = Some("rid".into());
1✔
4542
        let source = test_source(config);
1✔
4543

4544
        let parsed = source
1✔
4545
            .parse_row(
1✔
4546
                0,
4547
                &json!({"row": {"rid": "r-1", "headline": "h", "body": "b"}}),
1✔
4548
            )
4549
            .unwrap();
1✔
4550

4551
        assert_eq!(parsed.row_id.as_deref(), Some("r-1"));
1✔
4552
        assert_eq!(parsed.text_fields.len(), 2);
1✔
4553
        assert_eq!(parsed.text_fields[0].name, "headline");
1✔
4554
    }
1✔
4555

4556
    #[test]
4557
    fn row_to_record_returns_none_for_empty_fields() {
1✔
4558
        let dir = tempdir().unwrap();
1✔
4559
        let config = test_config(dir.path().to_path_buf());
1✔
4560
        let source = test_source(config);
1✔
4561
        let row = RowView {
1✔
4562
            row_id: Some("x".into()),
1✔
4563
            timestamp: None,
1✔
4564
            text_fields: Vec::new(),
1✔
4565
        };
1✔
4566
        assert!(source.row_to_record(&row, 0).unwrap().is_none());
1✔
4567
    }
1✔
4568

4569
    #[test]
4570
    fn ensure_row_available_handles_materialized_max_and_exhausted_candidates() {
1✔
4571
        let dir = tempdir().unwrap();
1✔
4572
        let mut config = test_config(dir.path().to_path_buf());
1✔
4573
        config.max_rows = Some(2);
1✔
4574
        let source = test_source(config);
1✔
4575
        {
1✔
4576
            let mut state = source.state.lock().unwrap();
1✔
4577
            state.materialized_rows = 1;
1✔
4578
            state.remote_candidates = Some(vec![]);
1✔
4579
            state.next_remote_idx = 0;
1✔
4580
        }
1✔
4581

4582
        assert!(source.ensure_row_available(0).unwrap());
1✔
4583
        assert!(!source.ensure_row_available(3).unwrap());
1✔
4584
        assert!(!source.ensure_row_available(1).unwrap());
1✔
4585
    }
1✔
4586

4587
    #[test]
4588
    fn read_row_batch_uses_cached_rows_and_respects_limit() {
1✔
4589
        let dir = tempdir().unwrap();
1✔
4590
        let config = test_config(dir.path().to_path_buf());
1✔
4591
        let source = test_source(config.clone());
1✔
4592

4593
        {
1✔
4594
            let mut state = source.state.lock().unwrap();
1✔
4595
            state.materialized_rows = 2;
1✔
4596
            state.total_rows = Some(2);
1✔
4597
        }
1✔
4598

4599
        let row0 = RowView {
1✔
4600
            row_id: Some("r0".into()),
1✔
4601
            timestamp: Some(Utc::now()),
1✔
4602
            text_fields: vec![RowTextField {
1✔
4603
                name: "text".into(),
1✔
4604
                text: "alpha".into(),
1✔
4605
            }],
1✔
4606
        };
1✔
4607
        let row1 = RowView {
1✔
4608
            row_id: Some("r1".into()),
1✔
4609
            timestamp: Some(Utc::now()),
1✔
4610
            text_fields: vec![RowTextField {
1✔
4611
                name: "text".into(),
1✔
4612
                text: "beta".into(),
1✔
4613
            }],
1✔
4614
        };
1✔
4615
        {
1✔
4616
            let mut cache = source.cache.lock().unwrap();
1✔
4617
            cache.insert(0, row0, config.cache_capacity);
1✔
4618
            cache.insert(1, row1, config.cache_capacity);
1✔
4619
        }
1✔
4620

4621
        let mut out = Vec::new();
1✔
4622
        source.read_row_batch(&[0, 1], &mut out, Some(1)).unwrap();
1✔
4623
        assert_eq!(out.len(), 1);
1✔
4624
    }
1✔
4625

4626
    #[test]
4627
    fn read_row_batch_errors_on_invalid_json_line() {
1✔
4628
        let dir = tempdir().unwrap();
1✔
4629
        let path = dir.path().join("broken.jsonl");
1✔
4630
        fs::write(&path, b"not-json\n").unwrap();
1✔
4631

4632
        let mut config = test_config(dir.path().to_path_buf());
1✔
4633
        config.checkpoint_stride = 1;
1✔
4634
        let source = test_source(config.clone());
1✔
4635
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4636
            .unwrap()
1✔
4637
            .unwrap();
1✔
4638
        {
1✔
4639
            let mut state = source.state.lock().unwrap();
1✔
4640
            state.materialized_rows = 1;
1✔
4641
            state.total_rows = Some(1);
1✔
4642
            state.shards = vec![shard];
1✔
4643
        }
1✔
4644

4645
        let mut out = Vec::new();
1✔
4646
        let result = source.read_row_batch(&[0], &mut out, Some(1));
1✔
4647
        assert!(result.is_err());
1✔
4648
    }
1✔
4649

4650
    #[test]
4651
    fn build_shard_index_errors_when_no_matching_extensions() {
1✔
4652
        let dir = tempdir().unwrap();
1✔
4653
        fs::write(dir.path().join("data.txt"), b"x\n").unwrap();
1✔
4654
        let config = test_config(dir.path().to_path_buf());
1✔
4655
        let result = HuggingFaceRowSource::build_shard_index(&config);
1✔
4656
        assert!(result.is_err());
1✔
4657
    }
1✔
4658

4659
    #[test]
4660
    fn build_shard_index_honors_max_rows() {
1✔
4661
        let dir = tempdir().unwrap();
1✔
4662
        fs::write(
1✔
4663
            dir.path().join("rows.jsonl"),
1✔
4664
            b"{\"text\":\"1\"}\n{\"text\":\"2\"}\n{\"text\":\"3\"}\n",
4665
        )
4666
        .unwrap();
1✔
4667
        let mut config = test_config(dir.path().to_path_buf());
1✔
4668
        config.max_rows = Some(2);
1✔
4669

4670
        let (_, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
4671
        assert_eq!(discovered, 2);
1✔
4672
    }
1✔
4673

4674
    #[test]
4675
    fn refresh_handles_empty_total_and_cursor_wrap() {
1✔
4676
        let dir = tempdir().unwrap();
1✔
4677
        let config = test_config(dir.path().to_path_buf());
1✔
4678
        let source = test_source(config.clone());
1✔
4679

4680
        {
1✔
4681
            let mut state = source.state.lock().unwrap();
1✔
4682
            state.materialized_rows = 0;
1✔
4683
            state.total_rows = Some(0);
1✔
4684
        }
1✔
4685
        let empty = source.refresh(None, Some(5)).unwrap();
1✔
4686
        assert!(empty.records.is_empty());
1✔
4687
        assert_eq!(empty.cursor.revision, 0);
1✔
4688

4689
        let path = dir.path().join("rows.jsonl");
1✔
4690
        fs::write(
1✔
4691
            &path,
1✔
4692
            b"{\"id\":\"a\",\"text\":\"A\"}\n{\"id\":\"b\",\"text\":\"B\"}\n",
4693
        )
4694
        .unwrap();
1✔
4695
        let mut cfg2 = config;
1✔
4696
        cfg2.checkpoint_stride = 1;
1✔
4697
        let source2 = test_source(cfg2.clone());
1✔
4698
        let shard = HuggingFaceRowSource::index_single_shard(&cfg2, &path, 0)
1✔
4699
            .unwrap()
1✔
4700
            .unwrap();
1✔
4701
        {
1✔
4702
            let mut state = source2.state.lock().unwrap();
1✔
4703
            state.materialized_rows = 2;
1✔
4704
            state.total_rows = Some(2);
1✔
4705
            state.shards = vec![shard];
1✔
4706
        }
1✔
4707
        let cursor = SourceCursor {
1✔
4708
            last_seen: Utc::now(),
1✔
4709
            revision: 99,
1✔
4710
        };
1✔
4711
        let snapshot = source2.refresh(Some(&cursor), Some(1)).unwrap();
1✔
4712
        assert_eq!(snapshot.records.len(), 1);
1✔
4713
    }
1✔
4714

4715
    #[test]
4716
    fn new_rejects_zero_checkpoint_stride() {
1✔
4717
        let dir = tempdir().unwrap();
1✔
4718
        let mut config = test_config(dir.path().to_path_buf());
1✔
4719
        config.checkpoint_stride = 0;
1✔
4720
        let result = HuggingFaceRowSource::new(config);
1✔
4721
        assert!(result.is_err());
1✔
4722
    }
1✔
4723

4724
    #[test]
4725
    fn parse_global_row_count_response_returns_none_when_split_missing() {
1✔
4726
        let dir = tempdir().unwrap();
1✔
4727
        let config = test_config(dir.path().to_path_buf());
1✔
4728
        let body = r#"{
1✔
4729
            "size": {
1✔
4730
                "splits": [
1✔
4731
                    {"config":"main","split":"test","num_rows":7}
1✔
4732
                ]
1✔
4733
            }
1✔
4734
        }"#;
1✔
4735

4736
        let parsed = HuggingFaceRowSource::parse_global_row_count_response(&config, body).unwrap();
1✔
4737
        assert_eq!(parsed, None);
1✔
4738
    }
1✔
4739

4740
    #[test]
4741
    fn extract_split_row_count_uses_config_num_rows_when_split_empty() {
1✔
4742
        let payload = serde_json::json!({
1✔
4743
            "size": {
1✔
4744
                "configs": [
1✔
4745
                    {
4746
                        "config": "main",
1✔
4747
                        "num_rows": 123,
1✔
4748
                        "splits": [
1✔
4749
                            {"split": "train", "num_rows": 999}
1✔
4750
                        ]
4751
                    }
4752
                ]
4753
            }
4754
        });
4755

4756
        let rows =
1✔
4757
            HuggingFaceRowSource::extract_split_row_count_from_size_response(&payload, "main", "");
1✔
4758
        assert_eq!(rows, Some(123));
1✔
4759
    }
1✔
4760

4761
    #[test]
4762
    fn extract_split_row_count_uses_dataset_num_rows_when_split_empty() {
1✔
4763
        let payload = serde_json::json!({
1✔
4764
            "size": {
1✔
4765
                "dataset": {
1✔
4766
                    "num_rows": 77
1✔
4767
                }
4768
            }
4769
        });
4770

4771
        let rows =
1✔
4772
            HuggingFaceRowSource::extract_split_row_count_from_size_response(&payload, "main", "");
1✔
4773
        assert_eq!(rows, Some(77));
1✔
4774
    }
1✔
4775

4776
    #[test]
4777
    fn refresh_order_uses_sampler_seed_for_local_rows() {
1✔
4778
        let dir = tempdir().unwrap();
1✔
4779
        let path = dir.path().join("rows.jsonl");
1✔
4780
        let mut payload = String::new();
1✔
4781
        for idx in 0..12 {
12✔
4782
            payload.push_str(&format!("{{\"id\":\"r{idx}\",\"text\":\"v{idx}\"}}\n"));
12✔
4783
        }
12✔
4784
        fs::write(&path, payload).unwrap();
1✔
4785

4786
        let mut config = test_config(dir.path().to_path_buf());
1✔
4787
        config.checkpoint_stride = 1;
1✔
4788
        config.refresh_batch_multiplier = 1;
1✔
4789

4790
        let source_a = test_source(config.clone());
1✔
4791
        let source_b = test_source(config.clone());
1✔
4792
        let source_c = test_source(config.clone());
1✔
4793
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4794
            .unwrap()
1✔
4795
            .unwrap();
1✔
4796

4797
        for source in [&source_a, &source_b, &source_c] {
3✔
4798
            let mut state = source.state.lock().unwrap();
3✔
4799
            state.materialized_rows = 12;
3✔
4800
            state.total_rows = Some(12);
3✔
4801
            state.shards = vec![shard.clone()];
3✔
4802
        }
3✔
4803

4804
        let seed_1 = SamplerConfig {
1✔
4805
            seed: 7,
1✔
4806
            ..SamplerConfig::default()
1✔
4807
        };
1✔
4808
        let seed_2 = SamplerConfig {
1✔
4809
            seed: 7,
1✔
4810
            ..SamplerConfig::default()
1✔
4811
        };
1✔
4812
        let seed_3 = SamplerConfig {
1✔
4813
            seed: 123,
1✔
4814
            ..SamplerConfig::default()
1✔
4815
        };
1✔
4816

4817
        source_a.configure_sampler(&seed_1);
1✔
4818
        source_b.configure_sampler(&seed_2);
1✔
4819
        source_c.configure_sampler(&seed_3);
1✔
4820

4821
        let ids_a: Vec<String> = source_a
1✔
4822
            .refresh(None, Some(8))
1✔
4823
            .unwrap()
1✔
4824
            .records
1✔
4825
            .into_iter()
1✔
4826
            .map(|record| record.id)
1✔
4827
            .collect();
1✔
4828
        let ids_b: Vec<String> = source_b
1✔
4829
            .refresh(None, Some(8))
1✔
4830
            .unwrap()
1✔
4831
            .records
1✔
4832
            .into_iter()
1✔
4833
            .map(|record| record.id)
1✔
4834
            .collect();
1✔
4835
        let ids_c: Vec<String> = source_c
1✔
4836
            .refresh(None, Some(8))
1✔
4837
            .unwrap()
1✔
4838
            .records
1✔
4839
            .into_iter()
1✔
4840
            .map(|record| record.id)
1✔
4841
            .collect();
1✔
4842

4843
        assert_eq!(ids_a, ids_b);
1✔
4844
        assert_ne!(ids_a, ids_c);
1✔
4845
    }
1✔
4846
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc