• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jzombie / rust-triplets / 22337785815

24 Feb 2026 05:20AM UTC coverage: 91.641% (-1.0%) from 92.675%
22337785815

Pull #7

github

web-flow
Merge 3a3d7e9b7 into 980559192
Pull Request #7: Add HF source

3602 of 4094 new or added lines in 6 files covered. (87.98%)

99 existing lines in 3 files now uncovered.

13057 of 14248 relevant lines covered (91.64%)

2770.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.84
/src/source/backends/huggingface_source.rs
1
use hf_hub::Repo;
2
use hf_hub::RepoType;
3
use hf_hub::api::sync::ApiBuilder;
4
use parquet::file::reader::{FileReader, SerializedFileReader};
5
use parquet::record::reader::RowIter;
6
use rayon::prelude::*;
7
use serde::{Deserialize, Serialize};
8
use serde_json::Value;
9
use std::cmp::Ordering;
10
use std::collections::hash_map::DefaultHasher;
11
use std::collections::{BTreeMap, HashMap, VecDeque};
12
use std::fs;
13
use std::fs::File;
14
use std::hash::{Hash, Hasher};
15
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
16
use std::path::Path;
17
use std::path::PathBuf;
18
use std::sync::{Arc, Mutex};
19
use std::thread;
20
use std::time::Duration;
21
use std::time::Instant;
22
use tracing::{info, warn};
23
use walkdir::WalkDir;
24

25
use crate::SamplerError;
26
use crate::config::{NegativeStrategy, SamplerConfig, Selector, TripletRecipe};
27
use crate::data::{DataRecord, QualityScore, SectionRole};
28
use crate::utils::make_section;
29
use chrono::{DateTime, Utc};
30

31
use crate::source::{DataSource, SourceCursor, SourceSnapshot};
32

33
const REMOTE_URL_PREFIX: &str = "url::";
34
/// Extra row-index headroom above currently materialized rows exposed via `len_hint`.
35
///
36
/// This is not a file count. It lets sampling look slightly past the local row
37
/// frontier so lazy remote expansion can continue without jumping to the full
38
/// global row domain at once.
39
/// Multiplies the sampler ingestion base (`SamplerConfig.ingestion_max_records`)
40
/// to compute `len_hint` expansion headroom rows.
41
const REMOTE_EXPANSION_HEADROOM_MULTIPLIER: usize = 4;
42
/// Number of initial remote shards to materialize when bootstrapping an empty
43
/// local snapshot before regular lazy expansion.
44
const REMOTE_BOOTSTRAP_SHARDS: usize = 4;
45
/// Multiplies the source `refresh` limit passed by `IngestionManager`
46
/// (`step.unwrap_or(max_records)`) to set this source's internal row-read
47
/// batch target for each refresh pass.
48
const HUGGINGFACE_REFRESH_BATCH_MULTIPLIER: usize = 32;
49
const SHARD_SEQUENCE_STATE_VERSION: u32 = 1;
50
const SHARD_SEQUENCE_STATE_FILE: &str = "_sequence_state.json";
51

52
#[derive(Clone, Debug)]
53
struct RowTextField {
54
    name: String,
55
    text: String,
56
}
57

58
#[derive(Clone, Debug)]
59
struct RowView {
60
    row_id: Option<String>,
61
    timestamp: Option<DateTime<Utc>>,
62
    text_fields: Vec<RowTextField>,
63
}
64

65
/// Configuration for a bulk Hugging Face row source backed by local snapshot files.
66
#[derive(Clone, Debug)]
67
pub struct HuggingFaceRowsConfig {
68
    /// Stable sampler source id used in record ids and metrics.
69
    pub source_id: String,
70
    /// Hugging Face dataset id, e.g. `HuggingFaceFW/fineweb`.
71
    pub dataset: String,
72
    /// Dataset config name, e.g. `default`.
73
    pub config: String,
74
    /// Split name, e.g. `train`.
75
    pub split: String,
76
    /// Local path to a snapshot directory for this split.
77
    pub snapshot_dir: PathBuf,
78
    /// File extensions accepted as shard files.
79
    pub shard_extensions: Vec<String>,
80
    /// Number of rows between seek checkpoints while indexing a shard.
81
    pub checkpoint_stride: usize,
82
    /// Maximum number of rows cached in-memory.
83
    pub cache_capacity: usize,
84
    /// Maximum number of decoded parquet row groups cached in-memory.
85
    pub parquet_row_group_cache_capacity: usize,
86
    /// Multiplier applied to current refresh `limit` when building a read batch target.
87
    ///
88
    /// Effective target is `limit * refresh_batch_multiplier`.
89
    pub refresh_batch_multiplier: usize,
90
    /// Multiplier applied to ingestion-sized base records for `len_hint` headroom.
91
    ///
92
    /// Effective headroom is `cache_capacity * remote_expansion_headroom_multiplier`.
93
    pub remote_expansion_headroom_multiplier: usize,
94
    /// Optional maximum row cap exposed by the source.
95
    pub max_rows: Option<usize>,
96
    /// Hard cap for local manifest-shard cache bytes.
97
    ///
98
    /// When exceeded, oldest cached manifest shards are evicted.
99
    pub local_disk_cap_bytes: Option<u64>,
100
    /// Minimum number of manifest shards to keep resident during eviction.
101
    pub min_resident_shards: usize,
102
    /// Optional row id column name. Falls back to synthetic id when missing.
103
    pub id_column: Option<String>,
104
    /// Text columns to extract. Empty means auto-detect textual scalar columns.
105
    pub text_columns: Vec<String>,
106
    /// Optional column used for anchor text.
107
    ///
108
    /// When set (or when `positive_column`/`context_columns` are set), role-based
109
    /// extraction is used instead of `text_columns`/auto-detect mode.
110
    pub anchor_column: Option<String>,
111
    /// Optional column used for positive text.
112
    ///
113
    /// Positive text is emitted as a `SectionRole::Context` section.
114
    pub positive_column: Option<String>,
115
    /// Optional ordered context columns.
116
    ///
117
    /// Used only in role-based extraction mode.
118
    pub context_columns: Vec<String>,
119
}
120

121
impl HuggingFaceRowsConfig {
122
    /// Create a config with required dataset identity values and local snapshot path.
123
    pub fn new(
96✔
124
        source_id: impl Into<String>,
96✔
125
        dataset: impl Into<String>,
96✔
126
        config: impl Into<String>,
96✔
127
        split: impl Into<String>,
96✔
128
        snapshot_dir: impl Into<PathBuf>,
96✔
129
    ) -> Self {
96✔
130
        Self {
96✔
131
            source_id: source_id.into(),
96✔
132
            dataset: dataset.into(),
96✔
133
            config: config.into(),
96✔
134
            split: split.into(),
96✔
135
            snapshot_dir: snapshot_dir.into(),
96✔
136
            shard_extensions: vec![
96✔
137
                "parquet".to_string(),
96✔
138
                "jsonl".to_string(),
96✔
139
                "ndjson".to_string(),
96✔
140
            ],
96✔
141
            checkpoint_stride: 4096,
96✔
142
            cache_capacity: SamplerConfig::default().ingestion_max_records,
96✔
143
            parquet_row_group_cache_capacity: 8,
96✔
144
            refresh_batch_multiplier: HUGGINGFACE_REFRESH_BATCH_MULTIPLIER,
96✔
145
            remote_expansion_headroom_multiplier: REMOTE_EXPANSION_HEADROOM_MULTIPLIER,
96✔
146
            max_rows: None,
96✔
147
            local_disk_cap_bytes: Some(32 * 1024 * 1024 * 1024),
96✔
148
            min_resident_shards: REMOTE_BOOTSTRAP_SHARDS,
96✔
149
            id_column: Some("id".to_string()),
96✔
150
            text_columns: Vec::new(),
96✔
151
            anchor_column: None,
96✔
152
            positive_column: None,
96✔
153
            context_columns: Vec::new(),
96✔
154
        }
96✔
155
    }
96✔
156
}
157

158
#[derive(Default)]
159
struct ParquetCache {
160
    readers: HashMap<PathBuf, Arc<SerializedFileReader<File>>>,
161
}
162

163
impl ParquetCache {
164
    /// Return a cached parquet reader for `path`, opening and caching it when missing.
165
    fn reader_for(
4✔
166
        &mut self,
4✔
167
        source_id: &str,
4✔
168
        path: &Path,
4✔
169
    ) -> Result<Arc<SerializedFileReader<File>>, SamplerError> {
4✔
170
        if let Some(reader) = self.readers.get(path) {
4✔
NEW
171
            return Ok(reader.clone());
×
172
        }
4✔
173

174
        let file = File::open(path).map_err(|err| SamplerError::SourceUnavailable {
4✔
175
            source_id: source_id.to_string(),
2✔
176
            reason: format!("failed opening parquet shard {}: {err}", path.display()),
2✔
177
        })?;
2✔
178
        let reader =
1✔
179
            SerializedFileReader::new(file).map_err(|err| SamplerError::SourceUnavailable {
2✔
180
                source_id: source_id.to_string(),
1✔
181
                reason: format!("failed reading parquet shard {}: {err}", path.display()),
1✔
182
            })?;
1✔
183
        let reader = Arc::new(reader);
1✔
184
        self.readers.insert(path.to_path_buf(), reader.clone());
1✔
185
        Ok(reader)
1✔
186
    }
4✔
187
}
188

189
#[derive(Clone, Debug)]
190
struct ShardIndex {
191
    path: PathBuf,
192
    global_start: usize,
193
    row_count: usize,
194
    is_parquet: bool,
195
    parquet_row_groups: Vec<(usize, usize)>,
196
    checkpoints: Vec<u64>,
197
}
198

199
#[derive(Default)]
200
struct RowCache {
201
    rows: HashMap<usize, RowView>,
202
    order: VecDeque<usize>,
203
}
204

205
impl RowCache {
206
    /// Return a cloned cached row by absolute index.
207
    fn get(&self, idx: usize) -> Option<RowView> {
24✔
208
        self.rows.get(&idx).cloned()
24✔
209
    }
24✔
210

211
    /// Insert or refresh a cached row and evict oldest entries over `capacity`.
212
    fn insert(&mut self, idx: usize, row: RowView, capacity: usize) {
18✔
213
        if capacity == 0 {
18✔
214
            return;
1✔
215
        }
17✔
216
        if !self.rows.contains_key(&idx) {
17✔
217
            self.order.push_back(idx);
17✔
218
        }
17✔
219
        self.rows.insert(idx, row);
17✔
220
        while self.rows.len() > capacity {
18✔
221
            if let Some(old) = self.order.pop_front() {
1✔
222
                self.rows.remove(&old);
1✔
223
            } else {
1✔
NEW
224
                break;
×
225
            }
226
        }
227
    }
18✔
228
}
229

230
/// Bulk-oriented Hugging Face source backed by local shard files.
231
pub struct HuggingFaceRowSource {
232
    config: HuggingFaceRowsConfig,
233
    sampler_config: Mutex<Option<SamplerConfig>>,
234
    state: Mutex<SourceState>,
235
    cache: Mutex<RowCache>,
236
    parquet_cache: Mutex<ParquetCache>,
237
}
238

239
#[derive(Debug)]
240
struct SourceState {
241
    materialized_rows: usize,
242
    total_rows: Option<usize>,
243
    shards: Vec<ShardIndex>,
244
    remote_candidates: Option<Vec<String>>,
245
    remote_candidate_sizes: HashMap<String, u64>,
246
    next_remote_idx: usize,
247
}
248

249
type ParquetGroupKey = (PathBuf, usize);
250
type ParquetGroupRequest = (usize, usize, ShardIndex);
251

252
#[derive(Clone, Debug, Serialize, Deserialize)]
253
struct PersistedShardSequence {
254
    version: u32,
255
    source_id: String,
256
    dataset: String,
257
    config: String,
258
    split: String,
259
    #[serde(default)]
260
    sampler_seed: Option<u64>,
261
    candidates: Vec<String>,
262
    candidate_sizes: HashMap<String, u64>,
263
    next_remote_idx: usize,
264
}
265

266
impl HuggingFaceRowSource {
267
    /// Build a new source by indexing local shard files.
268
    pub fn new(config: HuggingFaceRowsConfig) -> Result<Self, SamplerError> {
3✔
269
        let start_new = Instant::now();
3✔
270
        if config.checkpoint_stride == 0 {
3✔
271
            return Err(SamplerError::Configuration(
1✔
272
                "huggingface source checkpoint_stride must be > 0".to_string(),
1✔
273
            ));
1✔
274
        }
2✔
275

276
        fs::create_dir_all(&config.snapshot_dir).map_err(|err| {
2✔
NEW
277
            SamplerError::SourceUnavailable {
×
NEW
278
                source_id: config.source_id.clone(),
×
NEW
279
                reason: format!(
×
NEW
280
                    "failed creating snapshot_dir {}: {err}",
×
NEW
281
                    config.snapshot_dir.display()
×
NEW
282
                ),
×
NEW
283
            }
×
NEW
284
        })?;
×
285

286
        info!(
2✔
287
            "[triplets:hf] indexing local shards in {}",
NEW
288
            config.snapshot_dir.display()
×
289
        );
290
        let (shards, discovered) = Self::build_shard_index(&config).unwrap_or_default();
2✔
291
        if discovered == 0 {
2✔
NEW
292
            info!(
×
293
                "[triplets:hf] no local shards found in {} — lazy remote download enabled",
NEW
294
                config.snapshot_dir.display()
×
295
            );
296
        }
2✔
297

298
        let materialized_rows = config
2✔
299
            .max_rows
2✔
300
            .map(|cap| cap.min(discovered))
2✔
301
            .unwrap_or(discovered);
2✔
302
        let total_rows = match Self::fetch_global_row_count(&config) {
2✔
NEW
303
            Ok(value) => value,
×
304
            Err(err) => {
2✔
305
                warn!(
2✔
306
                    "[triplets:hf] global row count request failed; continuing with discovered rows only: {}",
307
                    err
308
                );
309
                None
2✔
310
            }
311
        };
312

313
        if let Some(global_total) = total_rows {
2✔
NEW
314
            info!(
×
315
                "[triplets:hf] global split row count reported: {} (known_local_rows={})",
316
                global_total, materialized_rows
317
            );
318
        }
2✔
319

320
        info!(
2✔
321
            "[triplets:hf] source ready in {:.2}s (rows={}, shards={})",
NEW
322
            start_new.elapsed().as_secs_f64(),
×
323
            materialized_rows,
NEW
324
            shards.len()
×
325
        );
326

327
        Ok(Self {
2✔
328
            config,
2✔
329
            sampler_config: Mutex::new(None),
2✔
330
            state: Mutex::new(SourceState {
2✔
331
                materialized_rows,
2✔
332
                total_rows,
2✔
333
                shards,
2✔
334
                remote_candidates: None,
2✔
335
                remote_candidate_sizes: HashMap::new(),
2✔
336
                next_remote_idx: 0,
2✔
337
            }),
2✔
338
            cache: Mutex::new(RowCache::default()),
2✔
339
            parquet_cache: Mutex::new(ParquetCache::default()),
2✔
340
        })
2✔
341
    }
3✔
342

343
    /// Compute the effective internal row read target from refresh `limit`.
344
    fn effective_refresh_batch_target(&self, limit: usize) -> usize {
9✔
345
        let multiplier = self.config.refresh_batch_multiplier.max(1);
9✔
346
        limit.saturating_mul(multiplier)
9✔
347
    }
9✔
348

349
    /// Compute dynamic `len_hint` headroom rows based on sampler and source config.
350
    fn effective_expansion_headroom_rows(&self) -> usize {
8✔
351
        let multiplier = self.config.remote_expansion_headroom_multiplier.max(1);
8✔
352
        let base = self
8✔
353
            .sampler_config
8✔
354
            .lock()
8✔
355
            .ok()
8✔
356
            .and_then(|config| config.as_ref().map(|value| value.ingestion_max_records))
8✔
357
            .unwrap_or(self.config.cache_capacity)
8✔
358
            .max(1);
8✔
359
        base.saturating_mul(multiplier)
8✔
360
    }
8✔
361

362
    fn normalized_shard_extensions(config: &HuggingFaceRowsConfig) -> Vec<String> {
11✔
363
        config
11✔
364
            .shard_extensions
11✔
365
            .iter()
11✔
366
            .map(|value| value.trim().trim_start_matches('.').to_ascii_lowercase())
30✔
367
            .collect::<Vec<_>>()
11✔
368
    }
11✔
369

370
    fn collect_candidates_from_siblings(
8✔
371
        config: &HuggingFaceRowsConfig,
8✔
372
        siblings: &[String],
8✔
373
        accepted: &[String],
8✔
374
        respect_split: bool,
8✔
375
    ) -> (Vec<String>, bool) {
8✔
376
        let mut saw_parquet = false;
8✔
377
        let mut candidates = Vec::new();
8✔
378
        for remote_path in siblings {
14✔
379
            if respect_split && !config.split.is_empty() {
14✔
380
                let split_tag = format!("{}/", config.split);
10✔
381
                let split_token = format!("-{}-", config.split);
10✔
382
                let split_prefix = format!("{}-", config.split);
10✔
383
                if !remote_path.contains(&split_tag)
10✔
384
                    && !remote_path.contains(&split_token)
5✔
385
                    && !Path::new(remote_path)
5✔
386
                        .file_name()
5✔
387
                        .and_then(|name| name.to_str())
5✔
388
                        .is_some_and(|name| name.starts_with(&split_prefix))
5✔
389
                {
390
                    continue;
3✔
391
                }
7✔
392
            }
4✔
393

394
            let ext = Path::new(remote_path)
11✔
395
                .extension()
11✔
396
                .and_then(|v| v.to_str())
11✔
397
                .map(|v| v.to_ascii_lowercase());
11✔
398
            if ext.as_deref() == Some("parquet") {
11✔
399
                saw_parquet = true;
3✔
400
            }
8✔
401
            if ext
11✔
402
                .as_deref()
11✔
403
                .is_some_and(|ext| accepted.iter().any(|allowed| allowed == ext))
27✔
404
            {
405
                let target = Self::candidate_target_path(config, remote_path);
6✔
406
                if target.exists() {
6✔
407
                    continue;
1✔
408
                }
5✔
409
                candidates.push(remote_path.clone());
5✔
410
            }
5✔
411
        }
412
        (candidates, saw_parquet)
8✔
413
    }
8✔
414

415
    fn resolve_remote_candidates_from_siblings(
3✔
416
        config: &HuggingFaceRowsConfig,
3✔
417
        siblings: &[String],
3✔
418
        accepted: &[String],
3✔
419
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
3✔
420
        let (mut candidates, mut saw_parquet) =
3✔
421
            Self::collect_candidates_from_siblings(config, siblings, accepted, true);
3✔
422
        if candidates.is_empty() && !config.split.is_empty() {
3✔
423
            let (fallback_candidates, fallback_saw_parquet) =
3✔
424
                Self::collect_candidates_from_siblings(config, siblings, accepted, false);
3✔
425
            if !fallback_candidates.is_empty() {
3✔
426
                warn!(
1✔
427
                    "[triplets:hf] split filter '{}' matched no remote files; falling back to extension-only remote candidate scan",
428
                    config.split
429
                );
430
                candidates = fallback_candidates;
1✔
431
                saw_parquet = fallback_saw_parquet;
1✔
432
            }
2✔
NEW
433
        }
×
434

435
        candidates.sort();
3✔
436
        info!(
3✔
437
            "[triplets:hf] remote candidates matching {:?}: {}",
438
            config.shard_extensions,
439
            candidates.len()
3✔
440
        );
441
        if candidates.is_empty() {
3✔
442
            if saw_parquet {
2✔
443
                return Err(SamplerError::SourceUnavailable {
1✔
444
                    source_id: config.source_id.clone(),
1✔
445
                    reason: format!(
1✔
446
                        "dataset '{}' appears to be parquet-only, but shard_extensions does not include parquet ({:?}).",
1✔
447
                        config.dataset, config.shard_extensions
1✔
448
                    ),
1✔
449
                });
1✔
450
            }
1✔
451
            warn!(
1✔
452
                "[triplets:hf] no remote candidates found for dataset='{}' split='{}' extensions={:?}; source will be treated as exhausted",
453
                config.dataset, config.split, config.shard_extensions
454
            );
455
            return Ok((Vec::new(), HashMap::new()));
1✔
456
        }
1✔
457

458
        Ok((candidates, HashMap::new()))
1✔
459
    }
3✔
460

461
    fn candidates_from_parquet_manifest_json(
5✔
462
        config: &HuggingFaceRowsConfig,
5✔
463
        json: &Value,
5✔
464
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
5✔
465
        let accepted = Self::normalized_shard_extensions(config);
5✔
466

467
        let mut candidates = Vec::new();
5✔
468
        let mut candidate_sizes = HashMap::new();
5✔
469
        if let Some(entries) = json.get("parquet_files").and_then(Value::as_array) {
5✔
470
            for entry in entries {
8✔
471
                let Some(url) = entry.get("url").and_then(Value::as_str) else {
8✔
472
                    continue;
1✔
473
                };
474

475
                let ext = Path::new(url)
7✔
476
                    .extension()
7✔
477
                    .and_then(|value| value.to_str())
7✔
478
                    .map(|value| value.to_ascii_lowercase());
7✔
479
                if !ext
7✔
480
                    .as_deref()
7✔
481
                    .is_some_and(|value| accepted.iter().any(|allowed| allowed == value))
11✔
482
                {
483
                    continue;
1✔
484
                }
6✔
485

486
                let candidate = format!("{REMOTE_URL_PREFIX}{url}");
6✔
487
                let expected_size = entry.get("size").and_then(Value::as_u64);
6✔
488
                let target = Self::candidate_target_path(config, &candidate);
6✔
489
                if target.exists() {
6✔
490
                    if Self::target_matches_expected_size(&target, expected_size) {
3✔
491
                        continue;
1✔
492
                    }
2✔
493
                    warn!(
2✔
494
                        "[triplets:hf] incomplete cached shard detected (will redownload): {}",
495
                        target.display()
2✔
496
                    );
497
                    if let Err(err) = fs::remove_file(&target)
2✔
498
                        && err.kind() != std::io::ErrorKind::NotFound
1✔
499
                    {
500
                        return Err(SamplerError::SourceUnavailable {
1✔
501
                            source_id: config.source_id.clone(),
1✔
502
                            reason: format!(
1✔
503
                                "failed removing incomplete shard {}: {err}",
1✔
504
                                target.display()
1✔
505
                            ),
1✔
506
                        });
1✔
507
                    }
1✔
508
                }
3✔
509
                if let Some(size) = expected_size {
4✔
510
                    candidate_sizes.insert(candidate.clone(), size);
4✔
511
                }
4✔
512
                candidates.push(candidate);
4✔
513
            }
514
        }
1✔
515

516
        candidates.sort();
4✔
517
        Ok((candidates, candidate_sizes))
4✔
518
    }
5✔
519

520
    /// Resolve and filter remote shard candidates from manifest or repository listing.
NEW
521
    fn list_remote_candidates(
×
NEW
522
        config: &HuggingFaceRowsConfig,
×
NEW
523
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
×
NEW
524
        if let Ok((candidates, candidate_sizes)) =
×
NEW
525
            Self::list_remote_candidates_from_parquet_manifest(config)
×
NEW
526
            && !candidates.is_empty()
×
527
        {
NEW
528
            info!(
×
529
                "[triplets:hf] remote parquet manifest candidates matching {:?}: {}",
530
                config.shard_extensions,
NEW
531
                candidates.len()
×
532
            );
NEW
533
            return Ok((candidates, candidate_sizes));
×
NEW
534
        }
×
535

NEW
536
        let api = ApiBuilder::new()
×
NEW
537
            .with_progress(true)
×
NEW
538
            .with_retries(5)
×
NEW
539
            .with_token(None)
×
NEW
540
            .build()
×
NEW
541
            .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
542
                source_id: config.source_id.clone(),
×
NEW
543
                reason: format!("failed building hf-hub client: {err}"),
×
NEW
544
            })?;
×
545

NEW
546
        let repo = Repo::new(config.dataset.clone(), RepoType::Dataset);
×
NEW
547
        let repo_api = api.repo(repo);
×
NEW
548
        info!(
×
549
            "[triplets:hf] reading remote file list for dataset {}",
550
            config.dataset
551
        );
NEW
552
        let info = repo_api
×
NEW
553
            .info()
×
NEW
554
            .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
555
                source_id: config.source_id.clone(),
×
NEW
556
                reason: format!("failed reading hf-hub repository info: {err}"),
×
NEW
557
            })?;
×
558

NEW
559
        let accepted = Self::normalized_shard_extensions(config);
×
560

NEW
561
        let siblings = info
×
NEW
562
            .siblings
×
NEW
563
            .into_iter()
×
NEW
564
            .map(|entry| entry.rfilename)
×
NEW
565
            .collect::<Vec<_>>();
×
566

NEW
567
        Self::resolve_remote_candidates_from_siblings(config, &siblings, &accepted)
×
NEW
568
    }
×
569

570
    /// Return the persistence file path for shard sequence state.
571
    fn shard_sequence_state_path(config: &HuggingFaceRowsConfig) -> PathBuf {
24✔
572
        config
24✔
573
            .snapshot_dir
24✔
574
            .join("_parquet_manifest")
24✔
575
            .join(SHARD_SEQUENCE_STATE_FILE)
24✔
576
    }
24✔
577

578
    /// Load persisted shard candidate sequence when metadata and sampler seed match.
579
    fn load_persisted_shard_sequence(
8✔
580
        config: &HuggingFaceRowsConfig,
8✔
581
        current_sampler_seed: Option<u64>,
8✔
582
    ) -> Result<Option<PersistedShardSequence>, SamplerError> {
8✔
583
        let path = Self::shard_sequence_state_path(config);
8✔
584
        if !path.exists() {
8✔
585
            return Ok(None);
1✔
586
        }
7✔
587

588
        let raw = fs::read_to_string(&path).map_err(|err| SamplerError::SourceUnavailable {
7✔
NEW
589
            source_id: config.source_id.clone(),
×
NEW
590
            reason: format!(
×
591
                "failed reading shard-sequence state {}: {err}",
NEW
592
                path.display()
×
593
            ),
NEW
594
        })?;
×
595

596
        let mut persisted: PersistedShardSequence =
6✔
597
            serde_json::from_str(&raw).map_err(|err| SamplerError::SourceUnavailable {
7✔
598
                source_id: config.source_id.clone(),
1✔
599
                reason: format!(
1✔
600
                    "failed parsing shard-sequence state {}: {err}",
601
                    path.display()
1✔
602
                ),
603
            })?;
1✔
604

605
        if persisted.version != SHARD_SEQUENCE_STATE_VERSION
6✔
606
            || persisted.source_id != config.source_id
6✔
607
            || persisted.dataset != config.dataset
5✔
608
            || persisted.config != config.config
5✔
609
            || persisted.split != config.split
5✔
610
            || persisted.sampler_seed != current_sampler_seed
5✔
611
        {
612
            warn!(
2✔
613
                "[triplets:hf] shard-sequence state mismatch for {}; rebuilding candidate order",
614
                path.display()
2✔
615
            );
616
            return Ok(None);
2✔
617
        }
4✔
618

619
        if persisted.next_remote_idx > persisted.candidates.len() {
4✔
620
            persisted.next_remote_idx = persisted.candidates.len();
1✔
621
        }
3✔
622

623
        Ok(Some(persisted))
4✔
624
    }
8✔
625

626
    /// Persist current shard candidate sequence and position atomically.
627
    fn persist_shard_sequence_locked(&self, state: &SourceState) -> Result<(), SamplerError> {
10✔
628
        let Some(candidates) = state.remote_candidates.as_ref() else {
10✔
629
            return Ok(());
1✔
630
        };
631

632
        let path = Self::shard_sequence_state_path(&self.config);
9✔
633
        if let Some(parent) = path.parent() {
9✔
634
            fs::create_dir_all(parent).map_err(|err| SamplerError::SourceUnavailable {
9✔
NEW
635
                source_id: self.config.source_id.clone(),
×
NEW
636
                reason: format!(
×
637
                    "failed creating shard-sequence state dir {}: {err}",
NEW
638
                    parent.display()
×
639
                ),
NEW
640
            })?;
×
NEW
641
        }
×
642

643
        let persisted = PersistedShardSequence {
9✔
644
            version: SHARD_SEQUENCE_STATE_VERSION,
645
            source_id: self.config.source_id.clone(),
9✔
646
            dataset: self.config.dataset.clone(),
9✔
647
            config: self.config.config.clone(),
9✔
648
            split: self.config.split.clone(),
9✔
649
            sampler_seed: self
9✔
650
                .sampler_config
9✔
651
                .lock()
9✔
652
                .ok()
9✔
653
                .and_then(|config| config.as_ref().map(|value| value.seed)),
9✔
654
            candidates: candidates.clone(),
9✔
655
            candidate_sizes: state.remote_candidate_sizes.clone(),
9✔
656
            next_remote_idx: state.next_remote_idx.min(candidates.len()),
9✔
657
        };
658

659
        let raw = serde_json::to_vec_pretty(&persisted).map_err(|err| {
9✔
NEW
660
            SamplerError::SourceUnavailable {
×
NEW
661
                source_id: self.config.source_id.clone(),
×
NEW
662
                reason: format!(
×
NEW
663
                    "failed encoding shard-sequence state {}: {err}",
×
NEW
664
                    path.display()
×
NEW
665
                ),
×
NEW
666
            }
×
NEW
667
        })?;
×
668

669
        let tmp_path = path.with_extension("tmp");
9✔
670
        fs::write(&tmp_path, raw).map_err(|err| SamplerError::SourceUnavailable {
9✔
NEW
671
            source_id: self.config.source_id.clone(),
×
NEW
672
            reason: format!(
×
673
                "failed writing shard-sequence state temp {}: {err}",
NEW
674
                tmp_path.display()
×
675
            ),
NEW
676
        })?;
×
677
        fs::rename(&tmp_path, &path).map_err(|err| SamplerError::SourceUnavailable {
9✔
NEW
678
            source_id: self.config.source_id.clone(),
×
NEW
679
            reason: format!(
×
680
                "failed replacing shard-sequence state {}: {err}",
NEW
681
                path.display()
×
682
            ),
NEW
683
        })?;
×
684

685
        Ok(())
9✔
686
    }
10✔
687

688
    /// Rotate candidate ordering deterministically using source identity.
689
    fn rotate_candidates_deterministically(
2✔
690
        config: &HuggingFaceRowsConfig,
2✔
691
        candidates: &mut [String],
2✔
692
    ) {
2✔
693
        if candidates.len() <= 1 {
2✔
694
            return;
1✔
695
        }
1✔
696
        let mut hasher = DefaultHasher::new();
1✔
697
        config.source_id.hash(&mut hasher);
1✔
698
        config.dataset.hash(&mut hasher);
1✔
699
        config.config.hash(&mut hasher);
1✔
700
        config.split.hash(&mut hasher);
1✔
701
        let offset = (hasher.finish() as usize) % candidates.len();
1✔
702
        candidates.rotate_left(offset);
1✔
703
    }
2✔
704

705
    /// Build deterministic seed used to permute remote shard candidate order.
706
    fn shard_candidate_seed(
11✔
707
        config: &HuggingFaceRowsConfig,
11✔
708
        total_candidates: usize,
11✔
709
        sampler_seed: Option<u64>,
11✔
710
    ) -> u64 {
11✔
711
        let mut hasher = DefaultHasher::new();
11✔
712
        "hf_shard_candidate_sequence_v1".hash(&mut hasher);
11✔
713
        if let Some(seed) = sampler_seed {
11✔
714
            seed.hash(&mut hasher);
2✔
715
        } else {
9✔
716
            config.source_id.hash(&mut hasher);
9✔
717
            config.dataset.hash(&mut hasher);
9✔
718
            config.config.hash(&mut hasher);
9✔
719
            config.split.hash(&mut hasher);
9✔
720
        }
9✔
721
        total_candidates.hash(&mut hasher);
11✔
722
        hasher.finish()
11✔
723
    }
11✔
724

725
    /// Query datasets-server parquet manifest and derive shard candidates.
NEW
726
    fn list_remote_candidates_from_parquet_manifest(
×
NEW
727
        config: &HuggingFaceRowsConfig,
×
NEW
728
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
×
NEW
729
        let endpoint = "https://datasets-server.huggingface.co/parquet";
×
NEW
730
        info!(
×
731
            "[triplets:hf] reading datasets-server parquet manifest for dataset {}",
732
            config.dataset
733
        );
NEW
734
        let response = ureq::get(endpoint)
×
NEW
735
            .query("dataset", &config.dataset)
×
NEW
736
            .query("config", &config.config)
×
NEW
737
            .query("split", &config.split)
×
NEW
738
            .call()
×
NEW
739
            .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
740
                source_id: config.source_id.clone(),
×
NEW
741
                reason: format!("failed querying datasets-server parquet endpoint: {err}"),
×
NEW
742
            })?;
×
743

NEW
744
        let body = response.into_body().read_to_string().map_err(|err| {
×
NEW
745
            SamplerError::SourceUnavailable {
×
NEW
746
                source_id: config.source_id.clone(),
×
NEW
747
                reason: format!("failed reading datasets-server parquet response body: {err}"),
×
NEW
748
            }
×
NEW
749
        })?;
×
750

NEW
751
        Self::parse_parquet_manifest_response(config, &body)
×
NEW
752
    }
×
753

754
    fn parse_parquet_manifest_response(
2✔
755
        config: &HuggingFaceRowsConfig,
2✔
756
        body: &str,
2✔
757
    ) -> Result<(Vec<String>, HashMap<String, u64>), SamplerError> {
2✔
758
        let json: Value =
1✔
759
            serde_json::from_str(body).map_err(|err| SamplerError::SourceUnavailable {
2✔
760
                source_id: config.source_id.clone(),
1✔
761
                reason: format!("failed parsing datasets-server parquet response: {err}"),
1✔
762
            })?;
1✔
763

764
        Self::candidates_from_parquet_manifest_json(config, &json)
1✔
765
    }
2✔
766

767
    /// Map a candidate identifier to the local snapshot target path.
768
    fn candidate_target_path(config: &HuggingFaceRowsConfig, candidate: &str) -> PathBuf {
33✔
769
        if let Some(url) = candidate.strip_prefix(REMOTE_URL_PREFIX) {
33✔
770
            let suffix = url
25✔
771
                .split("/resolve/")
25✔
772
                .nth(1)
25✔
773
                .map(|value| value.trim_start_matches('/'))
25✔
774
                .filter(|value| !value.is_empty())
25✔
775
                .unwrap_or("parquet/unknown.parquet");
25✔
776
            return config.snapshot_dir.join("_parquet_manifest").join(suffix);
25✔
777
        }
8✔
778
        config.snapshot_dir.join(candidate)
8✔
779
    }
33✔
780

781
    /// Validate target file size against expected bytes when available.
782
    fn target_matches_expected_size(path: &Path, expected_bytes: Option<u64>) -> bool {
9✔
783
        if !path.exists() {
9✔
784
            return false;
1✔
785
        }
8✔
786
        if let Some(expected) = expected_bytes
8✔
787
            && expected > 0
7✔
788
        {
789
            return fs::metadata(path)
7✔
790
                .map(|meta| meta.len() == expected)
7✔
791
                .unwrap_or(false);
7✔
792
        }
1✔
793
        true
1✔
794
    }
9✔
795

796
    /// Return root directory used for manifest-cached remote shards.
797
    fn manifest_cache_root(&self) -> PathBuf {
18✔
798
        self.config.snapshot_dir.join("_parquet_manifest")
18✔
799
    }
18✔
800

801
    /// Return on-disk size for a shard path, or 0 if metadata lookup fails.
802
    fn shard_size_bytes(path: &Path) -> u64 {
17✔
803
        fs::metadata(path).map(|meta| meta.len()).unwrap_or(0)
17✔
804
    }
17✔
805

806
    /// Recompute shard `global_start` offsets and total materialized row count.
807
    fn recompute_shard_offsets(state: &mut SourceState) {
2✔
808
        let mut running = 0usize;
2✔
809
        for shard in &mut state.shards {
3✔
810
            shard.global_start = running;
3✔
811
            running = running.saturating_add(shard.row_count);
3✔
812
        }
3✔
813
        state.materialized_rows = running;
2✔
814
    }
2✔
815

816
    /// Enforce local disk cap by evicting oldest manifest shards when possible.
817
    fn enforce_disk_cap_locked(
9✔
818
        &self,
9✔
819
        state: &mut SourceState,
9✔
820
        protected_path: &Path,
9✔
821
    ) -> Result<bool, SamplerError> {
9✔
822
        let Some(cap_bytes) = self.config.local_disk_cap_bytes else {
9✔
823
            return Ok(false);
1✔
824
        };
825

826
        let manifest_root = self.manifest_cache_root();
8✔
827
        let mut usage_bytes = state
8✔
828
            .shards
8✔
829
            .iter()
8✔
830
            .filter(|shard| shard.path.starts_with(&manifest_root))
9✔
831
            .map(|shard| Self::shard_size_bytes(&shard.path))
9✔
832
            .sum::<u64>();
8✔
833

834
        if usage_bytes <= cap_bytes {
8✔
835
            return Ok(false);
6✔
836
        }
2✔
837

838
        let mut evicted_any = false;
2✔
839
        loop {
840
            if usage_bytes <= cap_bytes {
3✔
841
                break;
1✔
842
            }
2✔
843

844
            let resident_manifest_count = state
2✔
845
                .shards
2✔
846
                .iter()
2✔
847
                .filter(|shard| shard.path.starts_with(&manifest_root))
3✔
848
                .count();
2✔
849
            if resident_manifest_count <= self.config.min_resident_shards {
2✔
850
                break;
1✔
851
            }
1✔
852

853
            let evict_pos = state.shards.iter().position(|shard| {
1✔
854
                shard.path.starts_with(&manifest_root) && shard.path != protected_path
1✔
855
            });
1✔
856
            let Some(pos) = evict_pos else {
1✔
NEW
857
                break;
×
858
            };
859

860
            let shard = state.shards.remove(pos);
1✔
861
            let shard_size = Self::shard_size_bytes(&shard.path);
1✔
862
            if let Err(err) = fs::remove_file(&shard.path)
1✔
NEW
863
                && err.kind() != std::io::ErrorKind::NotFound
×
864
            {
NEW
865
                return Err(SamplerError::SourceUnavailable {
×
NEW
866
                    source_id: self.config.source_id.clone(),
×
NEW
867
                    reason: format!(
×
NEW
868
                        "failed evicting shard {} under disk cap: {err}",
×
NEW
869
                        shard.path.display()
×
NEW
870
                    ),
×
NEW
871
                });
×
872
            }
1✔
873

874
            usage_bytes = usage_bytes.saturating_sub(shard_size);
1✔
875
            evicted_any = true;
1✔
876
            warn!(
1✔
877
                "[triplets:hf] evicted shard for disk cap: {} (usage={:.2} GiB cap={:.2} GiB)",
878
                shard.path.display(),
1✔
879
                usage_bytes as f64 / (1024.0 * 1024.0 * 1024.0),
1✔
880
                cap_bytes as f64 / (1024.0 * 1024.0 * 1024.0)
1✔
881
            );
882
        }
883

884
        if usage_bytes > cap_bytes {
2✔
885
            if protected_path.exists() {
1✔
886
                let _ = fs::remove_file(protected_path);
1✔
887
            }
1✔
888
            return Err(SamplerError::SourceUnavailable {
1✔
889
                source_id: self.config.source_id.clone(),
1✔
890
                reason: format!(
1✔
891
                    "local disk cap exceeded and cannot evict further (usage={} bytes cap={} bytes)",
1✔
892
                    usage_bytes, cap_bytes
1✔
893
                ),
1✔
894
            });
1✔
895
        }
1✔
896

897
        if evicted_any {
1✔
898
            Self::recompute_shard_offsets(state);
1✔
899
        }
1✔
900
        Ok(evicted_any)
1✔
901
    }
9✔
902

903
    /// Return total on-disk bytes used by manifest-backed shards.
904
    fn manifest_usage_bytes_locked(&self, state: &SourceState) -> u64 {
6✔
905
        let manifest_root = self.manifest_cache_root();
6✔
906
        state
6✔
907
            .shards
6✔
908
            .iter()
6✔
909
            .filter(|shard| shard.path.starts_with(&manifest_root))
7✔
910
            .map(|shard| Self::shard_size_bytes(&shard.path))
6✔
911
            .sum::<u64>()
6✔
912
    }
6✔
913

914
    /// Fetch exact split row count metadata from datasets-server size endpoint.
915
    fn fetch_global_row_count(
2✔
916
        config: &HuggingFaceRowsConfig,
2✔
917
    ) -> Result<Option<usize>, SamplerError> {
2✔
918
        let endpoint = "https://datasets-server.huggingface.co/size";
2✔
919
        info!(
2✔
920
            "[triplets:hf] requesting global row count dataset='{}' config='{}' split='{}'",
921
            config.dataset, config.config, config.split
922
        );
923

924
        let response = ureq::get(endpoint)
2✔
925
            .query("dataset", &config.dataset)
2✔
926
            .query("config", &config.config)
2✔
927
            .query("split", &config.split)
2✔
928
            .call()
2✔
929
            .map_err(|err| SamplerError::SourceUnavailable {
2✔
930
                source_id: config.source_id.clone(),
2✔
931
                reason: format!("failed querying datasets-server size endpoint: {err}"),
2✔
932
            })?;
2✔
933

NEW
934
        let body = response.into_body().read_to_string().map_err(|err| {
×
NEW
935
            SamplerError::SourceUnavailable {
×
NEW
936
                source_id: config.source_id.clone(),
×
NEW
937
                reason: format!("failed reading datasets-server size response body: {err}"),
×
NEW
938
            }
×
NEW
939
        })?;
×
940

NEW
941
        Self::parse_global_row_count_response(config, &body)
×
942
    }
2✔
943

944
    fn parse_global_row_count_response(
3✔
945
        config: &HuggingFaceRowsConfig,
3✔
946
        body: &str,
3✔
947
    ) -> Result<Option<usize>, SamplerError> {
3✔
948
        let json: Value =
2✔
949
            serde_json::from_str(body).map_err(|err| SamplerError::SourceUnavailable {
3✔
950
                source_id: config.source_id.clone(),
1✔
951
                reason: format!("failed parsing datasets-server size response: {err}"),
1✔
952
            })?;
1✔
953

954
        let mut count =
2✔
955
            Self::extract_split_row_count_from_size_response(&json, &config.config, &config.split);
2✔
956
        if let (Some(max_rows), Some(rows)) = (config.max_rows, count) {
2✔
957
            count = Some(rows.min(max_rows));
1✔
958
        }
1✔
959
        Ok(count)
2✔
960
    }
3✔
961

962
    /// Extract split row count from datasets-server size payload variants.
963
    fn extract_split_row_count_from_size_response(
8✔
964
        json: &Value,
8✔
965
        config_name: &str,
8✔
966
        split_name: &str,
8✔
967
    ) -> Option<usize> {
8✔
968
        let to_usize = |value: &Value| value.as_u64().and_then(|raw| usize::try_from(raw).ok());
8✔
969

970
        let size = json.get("size")?;
8✔
971

972
        if let Some(splits) = size.get("splits").and_then(Value::as_array) {
8✔
973
            for entry in splits {
4✔
974
                let entry_config = entry
4✔
975
                    .get("config")
4✔
976
                    .or_else(|| entry.get("config_name"))
4✔
977
                    .and_then(Value::as_str)
4✔
978
                    .unwrap_or_default();
4✔
979
                let entry_split = entry
4✔
980
                    .get("split")
4✔
981
                    .or_else(|| entry.get("name"))
4✔
982
                    .and_then(Value::as_str)
4✔
983
                    .unwrap_or_default();
4✔
984
                if entry_config == config_name
4✔
985
                    && entry_split == split_name
3✔
986
                    && let Some(rows) = entry.get("num_rows").and_then(to_usize)
2✔
987
                {
988
                    return Some(rows);
2✔
989
                }
2✔
990
            }
991
        }
5✔
992

993
        if let Some(configs) = size.get("configs").and_then(Value::as_array) {
6✔
994
            for config_entry in configs {
4✔
995
                let entry_config = config_entry
4✔
996
                    .get("config")
4✔
997
                    .or_else(|| config_entry.get("config_name"))
4✔
998
                    .and_then(Value::as_str)
4✔
999
                    .unwrap_or_default();
4✔
1000
                if entry_config != config_name {
4✔
1001
                    continue;
1✔
1002
                }
3✔
1003

1004
                if let Some(splits) = config_entry.get("splits").and_then(Value::as_array) {
3✔
1005
                    for split_entry in splits {
3✔
1006
                        let entry_split = split_entry
3✔
1007
                            .get("split")
3✔
1008
                            .or_else(|| split_entry.get("name"))
3✔
1009
                            .and_then(Value::as_str)
3✔
1010
                            .unwrap_or_default();
3✔
1011
                        if entry_split == split_name
3✔
1012
                            && let Some(rows) = split_entry.get("num_rows").and_then(to_usize)
1✔
1013
                        {
1014
                            return Some(rows);
1✔
1015
                        }
2✔
1016
                    }
NEW
1017
                }
×
1018

1019
                if split_name.is_empty()
2✔
1020
                    && let Some(rows) = config_entry.get("num_rows").and_then(to_usize)
2✔
1021
                {
1022
                    return Some(rows);
2✔
NEW
1023
                }
×
1024
            }
1025
        }
2✔
1026

1027
        if split_name.is_empty() {
3✔
1028
            return size
1✔
1029
                .get("dataset")
1✔
1030
                .and_then(|dataset| dataset.get("num_rows"))
1✔
1031
                .and_then(to_usize);
1✔
1032
        }
2✔
1033

1034
        None
2✔
1035
    }
8✔
1036

1037
    /// Download a shard (URL or hf-hub path) and materialize it under snapshot dir.
1038
    fn download_and_materialize_shard(
11✔
1039
        config: &HuggingFaceRowsConfig,
11✔
1040
        remote_path: &str,
11✔
1041
        expected_bytes: Option<u64>,
11✔
1042
    ) -> Result<PathBuf, SamplerError> {
11✔
1043
        if let Some(remote_url) = remote_path.strip_prefix(REMOTE_URL_PREFIX) {
11✔
1044
            let target = Self::candidate_target_path(config, remote_path);
11✔
1045
            if target.exists() {
11✔
1046
                if Self::target_matches_expected_size(&target, expected_bytes) {
2✔
1047
                    return Ok(target);
1✔
1048
                }
1✔
1049
                warn!(
1✔
1050
                    "[triplets:hf] replacing incomplete shard before retry: {}",
1051
                    target.display()
1✔
1052
                );
1053
                fs::remove_file(&target).map_err(|err| SamplerError::SourceUnavailable {
1✔
NEW
1054
                    source_id: config.source_id.clone(),
×
NEW
1055
                    reason: format!(
×
1056
                        "failed removing incomplete shard {}: {err}",
NEW
1057
                        target.display()
×
1058
                    ),
NEW
1059
                })?;
×
1060
            }
9✔
1061

1062
            if let Some(parent) = target.parent() {
10✔
1063
                fs::create_dir_all(parent).map_err(|err| SamplerError::SourceUnavailable {
10✔
NEW
1064
                    source_id: config.source_id.clone(),
×
NEW
1065
                    reason: format!(
×
1066
                        "failed creating snapshot subdir {}: {err}",
NEW
1067
                        parent.display()
×
1068
                    ),
NEW
1069
                })?;
×
NEW
1070
            }
×
1071

1072
            let temp_target = target.with_extension("part");
10✔
1073
            if temp_target.exists() {
10✔
1074
                let _ = fs::remove_file(&temp_target);
1✔
1075
            }
9✔
1076

1077
            let response =
10✔
1078
                ureq::get(remote_url)
10✔
1079
                    .call()
10✔
1080
                    .map_err(|err| SamplerError::SourceUnavailable {
10✔
NEW
1081
                        source_id: config.source_id.clone(),
×
NEW
1082
                        reason: format!("failed downloading shard URL '{}': {err}", remote_url),
×
NEW
1083
                    })?;
×
1084
            let mut reader = response.into_body().into_reader();
10✔
1085
            let mut file =
10✔
1086
                File::create(&temp_target).map_err(|err| SamplerError::SourceUnavailable {
10✔
NEW
1087
                    source_id: config.source_id.clone(),
×
NEW
1088
                    reason: format!(
×
1089
                        "failed creating target shard {}: {err}",
NEW
1090
                        temp_target.display()
×
1091
                    ),
NEW
1092
                })?;
×
1093
            info!(
10✔
1094
                "[triplets:hf] downloading shard payload -> {}",
1095
                target.display()
10✔
1096
            );
1097
            let started = Instant::now();
10✔
1098
            let mut total_bytes = 0u64;
10✔
1099
            let mut buffer = vec![0u8; 8 * 1024 * 1024];
10✔
1100
            let mut last_report = Instant::now();
10✔
1101
            loop {
1102
                let read =
19✔
1103
                    reader
19✔
1104
                        .read(&mut buffer)
19✔
1105
                        .map_err(|err| SamplerError::SourceUnavailable {
19✔
NEW
1106
                            source_id: config.source_id.clone(),
×
NEW
1107
                            reason: format!("failed reading shard stream '{}': {err}", remote_url),
×
NEW
1108
                        })?;
×
1109
                if read == 0 {
19✔
1110
                    break;
10✔
1111
                }
9✔
1112
                file.write_all(&buffer[..read])
9✔
1113
                    .map_err(|err| SamplerError::SourceUnavailable {
9✔
NEW
1114
                        source_id: config.source_id.clone(),
×
NEW
1115
                        reason: format!(
×
1116
                            "failed writing target shard {}: {err}",
NEW
1117
                            temp_target.display()
×
1118
                        ),
NEW
1119
                    })?;
×
1120
                total_bytes = total_bytes.saturating_add(read as u64);
9✔
1121
                if last_report.elapsed() >= Duration::from_secs(2) {
9✔
NEW
1122
                    let elapsed = started.elapsed().as_secs_f64();
×
NEW
1123
                    if let Some(expected) = expected_bytes
×
NEW
1124
                        && expected > 0
×
1125
                    {
NEW
1126
                        let pct =
×
NEW
1127
                            ((total_bytes as f64 / expected as f64) * 100.0).clamp(0.0, 100.0);
×
NEW
1128
                        let rate = if elapsed > 0.0 {
×
NEW
1129
                            total_bytes as f64 / elapsed
×
1130
                        } else {
NEW
1131
                            0.0
×
1132
                        };
NEW
1133
                        let eta_secs = if rate > 0.0 && total_bytes < expected {
×
NEW
1134
                            (expected.saturating_sub(total_bytes) as f64) / rate
×
1135
                        } else {
NEW
1136
                            0.0
×
1137
                        };
NEW
1138
                        info!(
×
1139
                            "[triplets:hf] download progress {}: {:.1}/{:.1} MiB ({:.1}%, {:.1}s elapsed, ETA {:.1}s)",
NEW
1140
                            target.display(),
×
NEW
1141
                            total_bytes as f64 / (1024.0 * 1024.0),
×
NEW
1142
                            expected as f64 / (1024.0 * 1024.0),
×
1143
                            pct,
1144
                            elapsed,
NEW
1145
                            eta_secs.max(0.0)
×
1146
                        );
1147
                    } else {
NEW
1148
                        info!(
×
1149
                            "[triplets:hf] download progress {}: {:.1} MiB ({:.1}s)",
NEW
1150
                            target.display(),
×
NEW
1151
                            total_bytes as f64 / (1024.0 * 1024.0),
×
1152
                            elapsed
1153
                        );
1154
                    }
NEW
1155
                    last_report = Instant::now();
×
1156
                }
9✔
1157
            }
1158
            let elapsed = started.elapsed().as_secs_f64();
10✔
1159
            if let Some(expected) = expected_bytes
10✔
1160
                && expected > 0
3✔
1161
            {
1162
                let pct = ((total_bytes as f64 / expected as f64) * 100.0).clamp(0.0, 100.0);
3✔
1163
                info!(
3✔
1164
                    "[triplets:hf] download complete {}: {:.1}/{:.1} MiB ({:.1}%) in {:.1}s",
1165
                    target.display(),
3✔
1166
                    total_bytes as f64 / (1024.0 * 1024.0),
3✔
1167
                    expected as f64 / (1024.0 * 1024.0),
3✔
1168
                    pct,
1169
                    elapsed
1170
                );
1171
            } else {
1172
                info!(
7✔
1173
                    "[triplets:hf] download complete {}: {:.1} MiB in {:.1}s",
1174
                    target.display(),
7✔
1175
                    total_bytes as f64 / (1024.0 * 1024.0),
7✔
1176
                    elapsed
1177
                );
1178
            }
1179

1180
            fs::rename(&temp_target, &target).map_err(|err| SamplerError::SourceUnavailable {
10✔
NEW
1181
                source_id: config.source_id.clone(),
×
NEW
1182
                reason: format!(
×
1183
                    "failed moving downloaded shard {} -> {}: {err}",
NEW
1184
                    temp_target.display(),
×
NEW
1185
                    target.display()
×
1186
                ),
NEW
1187
            })?;
×
1188
            return Ok(target);
10✔
NEW
1189
        }
×
1190

NEW
1191
        let api = ApiBuilder::new()
×
NEW
1192
            .with_progress(true)
×
NEW
1193
            .with_retries(5)
×
NEW
1194
            .with_token(None)
×
NEW
1195
            .build()
×
NEW
1196
            .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
1197
                source_id: config.source_id.clone(),
×
NEW
1198
                reason: format!("failed building hf-hub client: {err}"),
×
NEW
1199
            })?;
×
1200

NEW
1201
        let repo = Repo::new(config.dataset.clone(), RepoType::Dataset);
×
NEW
1202
        let repo_api = api.repo(repo);
×
1203

NEW
1204
        let mut local_cached =
×
NEW
1205
            repo_api
×
NEW
1206
                .get(remote_path)
×
NEW
1207
                .map_err(|err| SamplerError::SourceUnavailable {
×
NEW
1208
                    source_id: config.source_id.clone(),
×
NEW
1209
                    reason: format!("failed downloading '{}' from hf-hub: {err}", remote_path),
×
NEW
1210
                })?;
×
NEW
1211
        if !local_cached.exists() {
×
NEW
1212
            for _ in 0..5 {
×
NEW
1213
                local_cached = repo_api.download(remote_path).map_err(|err| {
×
NEW
1214
                    SamplerError::SourceUnavailable {
×
NEW
1215
                        source_id: config.source_id.clone(),
×
NEW
1216
                        reason: format!(
×
NEW
1217
                            "hf-hub returned missing cache path for '{}', and forced download failed: {err}",
×
NEW
1218
                            remote_path
×
NEW
1219
                        ),
×
NEW
1220
                    }
×
NEW
1221
                })?;
×
NEW
1222
                if local_cached.exists() {
×
NEW
1223
                    break;
×
NEW
1224
                }
×
NEW
1225
                thread::sleep(Duration::from_millis(400));
×
1226
            }
NEW
1227
        }
×
NEW
1228
        if !local_cached.exists() {
×
NEW
1229
            return Err(SamplerError::SourceUnavailable {
×
NEW
1230
                source_id: config.source_id.clone(),
×
NEW
1231
                reason: format!(
×
NEW
1232
                    "hf-hub returned non-existent cache file for '{}' at {}",
×
NEW
1233
                    remote_path,
×
NEW
1234
                    local_cached.display()
×
NEW
1235
                ),
×
NEW
1236
            });
×
NEW
1237
        }
×
1238

NEW
1239
        let target = Self::candidate_target_path(config, remote_path);
×
NEW
1240
        Self::materialize_local_file(config, &local_cached, &target)?;
×
NEW
1241
        Ok(target)
×
1242
    }
11✔
1243

1244
    /// Build shard metadata for a single local file.
1245
    fn index_single_shard(
26✔
1246
        config: &HuggingFaceRowsConfig,
26✔
1247
        path: &Path,
26✔
1248
        global_start: usize,
26✔
1249
    ) -> Result<Option<ShardIndex>, SamplerError> {
26✔
1250
        let is_parquet = path
26✔
1251
            .extension()
26✔
1252
            .and_then(|v| v.to_str())
26✔
1253
            .is_some_and(|ext| ext.eq_ignore_ascii_case("parquet"));
26✔
1254

1255
        let (rows, parquet_row_groups, checkpoints) = if is_parquet {
26✔
1256
            let (rows, parquet_row_groups) = Self::parquet_row_group_map(config, path)?;
2✔
1257
            (rows, parquet_row_groups, Vec::new())
2✔
1258
        } else {
1259
            let file = File::open(path).map_err(|err| SamplerError::SourceUnavailable {
24✔
NEW
1260
                source_id: config.source_id.clone(),
×
NEW
1261
                reason: format!("failed opening shard {}: {err}", path.display()),
×
NEW
1262
            })?;
×
1263
            let mut reader = BufReader::new(file);
24✔
1264
            let mut checkpoints = Vec::new();
24✔
1265
            let mut line = String::new();
24✔
1266
            let mut offset = 0u64;
24✔
1267
            let mut rows = 0usize;
24✔
1268

1269
            loop {
1270
                if rows.is_multiple_of(config.checkpoint_stride) {
10,058✔
1271
                    checkpoints.push(offset);
77✔
1272
                }
9,981✔
1273
                line.clear();
10,058✔
1274
                let bytes =
10,058✔
1275
                    reader
10,058✔
1276
                        .read_line(&mut line)
10,058✔
1277
                        .map_err(|err| SamplerError::SourceUnavailable {
10,058✔
NEW
1278
                            source_id: config.source_id.clone(),
×
NEW
1279
                            reason: format!("failed reading shard {}: {err}", path.display()),
×
NEW
1280
                        })?;
×
1281
                if bytes == 0 {
10,058✔
1282
                    break;
24✔
1283
                }
10,034✔
1284
                rows += 1;
10,034✔
1285
                offset = offset.saturating_add(bytes as u64);
10,034✔
1286
            }
1287

1288
            (rows, Vec::new(), checkpoints)
24✔
1289
        };
1290

1291
        if rows == 0 {
26✔
1292
            return Ok(None);
3✔
1293
        }
23✔
1294

1295
        Ok(Some(ShardIndex {
23✔
1296
            path: path.to_path_buf(),
23✔
1297
            global_start,
23✔
1298
            row_count: rows,
23✔
1299
            is_parquet,
23✔
1300
            parquet_row_groups,
23✔
1301
            checkpoints,
23✔
1302
        }))
23✔
1303
    }
26✔
1304

1305
    /// Build parquet row-group map for random-access row reads.
1306
    fn parquet_row_group_map(
3✔
1307
        config: &HuggingFaceRowsConfig,
3✔
1308
        path: &Path,
3✔
1309
    ) -> Result<(usize, Vec<(usize, usize)>), SamplerError> {
3✔
1310
        let file = File::open(path).map_err(|err| SamplerError::SourceUnavailable {
3✔
NEW
1311
            source_id: config.source_id.clone(),
×
NEW
1312
            reason: format!("failed opening parquet shard {}: {err}", path.display()),
×
NEW
1313
        })?;
×
1314
        let reader =
3✔
1315
            SerializedFileReader::new(file).map_err(|err| SamplerError::SourceUnavailable {
3✔
NEW
1316
                source_id: config.source_id.clone(),
×
NEW
1317
                reason: format!("failed reading parquet metadata {}: {err}", path.display()),
×
NEW
1318
            })?;
×
1319

1320
        let mut row_groups = Vec::new();
3✔
1321
        let mut running = 0usize;
3✔
1322
        for meta in reader.metadata().row_groups() {
3✔
1323
            let group_rows =
3✔
1324
                usize::try_from(meta.num_rows()).map_err(|_| SamplerError::SourceUnavailable {
3✔
NEW
1325
                    source_id: config.source_id.clone(),
×
NEW
1326
                    reason: format!("parquet row group size overflow in {}", path.display()),
×
NEW
1327
                })?;
×
1328
            if group_rows == 0 {
3✔
NEW
1329
                continue;
×
1330
            }
3✔
1331
            row_groups.push((running, group_rows));
3✔
1332
            running = running.saturating_add(group_rows);
3✔
1333
        }
1334
        if running > 0 {
3✔
1335
            return Ok((running, row_groups));
3✔
NEW
1336
        }
×
1337

NEW
1338
        let total_rows =
×
NEW
1339
            usize::try_from(reader.metadata().file_metadata().num_rows()).map_err(|_| {
×
NEW
1340
                SamplerError::SourceUnavailable {
×
NEW
1341
                    source_id: config.source_id.clone(),
×
NEW
1342
                    reason: format!("parquet row count overflow in {}", path.display()),
×
NEW
1343
                }
×
NEW
1344
            })?;
×
NEW
1345
        if total_rows == 0 {
×
NEW
1346
            return Ok((0, Vec::new()));
×
NEW
1347
        }
×
NEW
1348
        Ok((total_rows, vec![(0, total_rows)]))
×
1349
    }
3✔
1350

1351
    /// Ensure row index is available, expanding remote shard set lazily if needed.
1352
    fn ensure_row_available(&self, idx: usize) -> Result<bool, SamplerError> {
27✔
1353
        loop {
1354
            {
1355
                let state = self
30✔
1356
                    .state
30✔
1357
                    .lock()
30✔
1358
                    .map_err(|_| SamplerError::SourceUnavailable {
30✔
NEW
1359
                        source_id: self.config.source_id.clone(),
×
NEW
1360
                        reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1361
                    })?;
×
1362

1363
                if idx < state.materialized_rows {
30✔
1364
                    return Ok(true);
23✔
1365
                }
7✔
1366

1367
                if self.config.max_rows.is_some_and(|max_rows| idx >= max_rows) {
7✔
1368
                    return Ok(false);
1✔
1369
                }
6✔
1370

1371
                if let Some(candidates) = &state.remote_candidates
6✔
1372
                    && state.next_remote_idx >= candidates.len()
4✔
1373
                {
1374
                    return Ok(false);
3✔
1375
                }
3✔
1376
            }
1377

1378
            let need_candidates = {
3✔
1379
                let state = self
3✔
1380
                    .state
3✔
1381
                    .lock()
3✔
1382
                    .map_err(|_| SamplerError::SourceUnavailable {
3✔
NEW
1383
                        source_id: self.config.source_id.clone(),
×
NEW
1384
                        reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1385
                    })?;
×
1386
                state.remote_candidates.is_none()
3✔
1387
            };
1388

1389
            if need_candidates {
3✔
1390
                let mut state = self
2✔
1391
                    .state
2✔
1392
                    .lock()
2✔
1393
                    .map_err(|_| SamplerError::SourceUnavailable {
2✔
NEW
1394
                        source_id: self.config.source_id.clone(),
×
NEW
1395
                        reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1396
                    })?;
×
1397
                if state.remote_candidates.is_none() {
2✔
1398
                    let sampler_seed = self
2✔
1399
                        .sampler_config
2✔
1400
                        .lock()
2✔
1401
                        .ok()
2✔
1402
                        .and_then(|config| config.as_ref().map(|value| value.seed));
2✔
1403
                    if let Some(persisted) =
2✔
1404
                        Self::load_persisted_shard_sequence(&self.config, sampler_seed)?
2✔
1405
                    {
1406
                        let candidate_count = persisted.candidates.len();
2✔
1407
                        state.remote_candidates = Some(persisted.candidates);
2✔
1408
                        state.remote_candidate_sizes = persisted.candidate_sizes;
2✔
1409
                        state.next_remote_idx = persisted.next_remote_idx.min(candidate_count);
2✔
1410
                        info!(
2✔
1411
                            "[triplets:hf] resumed shard sequence state: next={}/{}",
1412
                            state.next_remote_idx, candidate_count
2✔
1413
                        );
1414
                    } else {
NEW
1415
                        let (mut candidates, candidate_sizes) =
×
NEW
1416
                            Self::list_remote_candidates(&self.config)?;
×
NEW
1417
                        Self::rotate_candidates_deterministically(&self.config, &mut candidates);
×
NEW
1418
                        state.remote_candidates = Some(candidates);
×
NEW
1419
                        state.remote_candidate_sizes = candidate_sizes;
×
NEW
1420
                        state.next_remote_idx = 0;
×
1421
                    }
1422

1423
                    self.persist_shard_sequence_locked(&state)?;
2✔
1424

1425
                    let candidate_count = state
2✔
1426
                        .remote_candidates
2✔
1427
                        .as_ref()
2✔
1428
                        .map(|values| values.len())
2✔
1429
                        .unwrap_or(0);
2✔
1430
                    let bootstrap_needed = state.materialized_rows == 0
2✔
1431
                        && candidate_count > 0
2✔
1432
                        && state.next_remote_idx == 0;
2✔
1433
                    let known_rows = state.materialized_rows;
2✔
1434
                    let shard_count = state.shards.len();
2✔
1435
                    info!(
2✔
1436
                        "[triplets:hf] state: candidates={} known_rows={} active_shards={} disk_cap={} min_resident_shards={}",
1437
                        candidate_count,
1438
                        known_rows,
1439
                        shard_count,
1440
                        self.config
2✔
1441
                            .local_disk_cap_bytes
2✔
1442
                            .map(|bytes| format!(
2✔
1443
                                "{:.2} GiB",
1444
                                bytes as f64 / (1024.0 * 1024.0 * 1024.0)
2✔
1445
                            ))
1446
                            .unwrap_or_else(|| "disabled".to_string()),
2✔
1447
                        self.config.min_resident_shards,
1448
                    );
1449
                    drop(state);
2✔
1450

1451
                    if bootstrap_needed {
2✔
1452
                        let bootstrap_target = REMOTE_BOOTSTRAP_SHARDS.min(candidate_count);
2✔
1453
                        info!(
2✔
1454
                            "[triplets:hf] bootstrapping remote shard diversity: target={} shard(s)",
1455
                            bootstrap_target
1456
                        );
1457
                        for step in 0..bootstrap_target {
2✔
1458
                            info!(
2✔
1459
                                "[triplets:hf] bootstrap progress: {}/{}",
1460
                                step + 1,
2✔
1461
                                bootstrap_target
1462
                            );
1463
                            if !self.download_next_remote_shard()? {
2✔
NEW
1464
                                break;
×
1465
                            }
2✔
1466
                        }
1467
                        info!("[triplets:hf] bootstrap complete");
2✔
NEW
1468
                    }
×
NEW
1469
                } else {
×
NEW
1470
                    drop(state);
×
NEW
1471
                }
×
1472
                continue;
2✔
1473
            }
1✔
1474
            if !self.download_next_remote_shard()? {
1✔
NEW
1475
                return Ok(false);
×
1476
            }
1✔
1477
        }
1478
    }
27✔
1479

1480
    /// Download and register the next remote shard candidate.
1481
    fn download_next_remote_shard(&self) -> Result<bool, SamplerError> {
7✔
1482
        let (remote_ordinal, remote_total, remote_path, expected_bytes) = {
7✔
1483
            let mut state = self
7✔
1484
                .state
7✔
1485
                .lock()
7✔
1486
                .map_err(|_| SamplerError::SourceUnavailable {
7✔
NEW
1487
                    source_id: self.config.source_id.clone(),
×
NEW
1488
                    reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1489
                })?;
×
1490
            let Some(candidates) = &state.remote_candidates else {
7✔
NEW
1491
                return Ok(false);
×
1492
            };
1493
            if state.next_remote_idx >= candidates.len() {
7✔
NEW
1494
                return Ok(false);
×
1495
            }
7✔
1496
            let sequence_pos = state.next_remote_idx;
7✔
1497
            let remote_ordinal = sequence_pos + 1;
7✔
1498
            let remote_total = candidates.len();
7✔
1499
            let sampler_seed = self
7✔
1500
                .sampler_config
7✔
1501
                .lock()
7✔
1502
                .ok()
7✔
1503
                .and_then(|config| config.as_ref().map(|value| value.seed));
7✔
1504
            let seed = Self::shard_candidate_seed(&self.config, remote_total, sampler_seed);
7✔
1505
            let mut permutation =
7✔
1506
                crate::source::IndexPermutation::new(remote_total, seed, sequence_pos as u64);
7✔
1507
            let candidate_idx = permutation.next();
7✔
1508
            let remote_path = candidates[candidate_idx].clone();
7✔
1509
            let expected_bytes = state.remote_candidate_sizes.get(&remote_path).copied();
7✔
1510
            state.next_remote_idx += 1;
7✔
1511
            (remote_ordinal, remote_total, remote_path, expected_bytes)
7✔
1512
        };
1513

1514
        info!(
7✔
1515
            "[triplets:hf] lazy downloading shard {}/{}: {}",
1516
            remote_ordinal,
1517
            remote_total,
1518
            remote_path.as_str()
7✔
1519
        );
1520
        let local_path =
7✔
1521
            Self::download_and_materialize_shard(&self.config, &remote_path, expected_bytes)?;
7✔
1522

1523
        let global_start = {
7✔
1524
            let state = self
7✔
1525
                .state
7✔
1526
                .lock()
7✔
1527
                .map_err(|_| SamplerError::SourceUnavailable {
7✔
NEW
1528
                    source_id: self.config.source_id.clone(),
×
NEW
1529
                    reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1530
                })?;
×
1531
            state.materialized_rows
7✔
1532
        };
1533

1534
        let Some(shard) = Self::index_single_shard(&self.config, &local_path, global_start)? else {
7✔
1535
            warn!(
1✔
1536
                "[triplets:hf] downloaded shard had zero rows and was skipped: {}",
1537
                local_path.display()
1✔
1538
            );
1539
            return Ok(true);
1✔
1540
        };
1541

1542
        let mut state = self
6✔
1543
            .state
6✔
1544
            .lock()
6✔
1545
            .map_err(|_| SamplerError::SourceUnavailable {
6✔
NEW
1546
                source_id: self.config.source_id.clone(),
×
NEW
1547
                reason: "huggingface source state lock poisoned".to_string(),
×
NEW
1548
            })?;
×
1549

1550
        if self
6✔
1551
            .config
6✔
1552
            .max_rows
6✔
1553
            .is_some_and(|max_rows| state.materialized_rows >= max_rows)
6✔
1554
        {
1555
            return Ok(true);
1✔
1556
        }
5✔
1557

1558
        let mut rows_to_add = shard.row_count;
5✔
1559
        if let Some(max_rows) = self.config.max_rows {
5✔
1560
            rows_to_add = rows_to_add.min(max_rows.saturating_sub(state.materialized_rows));
1✔
1561
        }
4✔
1562
        if rows_to_add == 0 {
5✔
NEW
1563
            return Ok(true);
×
1564
        }
5✔
1565

1566
        let mut shard = shard;
5✔
1567
        shard.global_start = state.materialized_rows;
5✔
1568
        shard.row_count = rows_to_add;
5✔
1569
        if shard.is_parquet {
5✔
NEW
1570
            shard
×
NEW
1571
                .parquet_row_groups
×
NEW
1572
                .retain(|(start, _)| *start < rows_to_add);
×
NEW
1573
            if let Some((start, count)) = shard.parquet_row_groups.last_mut() {
×
NEW
1574
                let allowed = rows_to_add.saturating_sub(*start);
×
NEW
1575
                *count = (*count).min(allowed);
×
NEW
1576
            }
×
1577
        }
5✔
1578
        state.materialized_rows += rows_to_add;
5✔
1579
        state.shards.push(shard);
5✔
1580

1581
        let evicted_any = self.enforce_disk_cap_locked(&mut state, &local_path)?;
5✔
1582
        self.persist_shard_sequence_locked(&state)?;
5✔
1583
        let materialized_rows = state.materialized_rows;
5✔
1584
        let shard_count = state.shards.len();
5✔
1585
        let remaining_candidates = state
5✔
1586
            .remote_candidates
5✔
1587
            .as_ref()
5✔
1588
            .map(|candidates| candidates.len().saturating_sub(state.next_remote_idx))
5✔
1589
            .unwrap_or(0);
5✔
1590
        let usage_bytes = self.manifest_usage_bytes_locked(&state);
5✔
1591
        let usage_gib = usage_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
5✔
1592
        let cap_str = self
5✔
1593
            .config
5✔
1594
            .local_disk_cap_bytes
5✔
1595
            .map(|bytes| format!("{:.2} GiB", bytes as f64 / (1024.0 * 1024.0 * 1024.0)))
5✔
1596
            .unwrap_or_else(|| "disabled".to_string());
5✔
1597
        drop(state);
5✔
1598

1599
        if evicted_any {
5✔
NEW
1600
            if let Ok(mut cache) = self.cache.lock() {
×
NEW
1601
                cache.rows.clear();
×
NEW
1602
                cache.order.clear();
×
NEW
1603
            }
×
NEW
1604
            if let Ok(mut parquet_cache) = self.parquet_cache.lock() {
×
NEW
1605
                parquet_cache.readers.clear();
×
NEW
1606
            }
×
1607
        }
5✔
1608

1609
        info!(
5✔
1610
            "[triplets:hf] state: rows={} shards={} remaining_candidates={} disk_usage={:.2} GiB cap={}",
1611
            materialized_rows, shard_count, remaining_candidates, usage_gib, cap_str,
1612
        );
1613

1614
        Ok(true)
5✔
1615
    }
7✔
1616

1617
    /// Copy cached/downloaded source file into snapshot tree.
1618
    fn materialize_local_file(
3✔
1619
        config: &HuggingFaceRowsConfig,
3✔
1620
        source_path: &Path,
3✔
1621
        target_path: &Path,
3✔
1622
    ) -> Result<(), SamplerError> {
3✔
1623
        let resolved_source =
3✔
1624
            fs::canonicalize(source_path).unwrap_or_else(|_| source_path.to_path_buf());
3✔
1625

1626
        if let Some(parent) = target_path.parent() {
3✔
1627
            fs::create_dir_all(parent).map_err(|err| SamplerError::SourceUnavailable {
3✔
NEW
1628
                source_id: config.source_id.clone(),
×
NEW
1629
                reason: format!(
×
1630
                    "failed creating snapshot subdir {}: {err}",
NEW
1631
                    parent.display()
×
1632
                ),
NEW
1633
            })?;
×
NEW
1634
        }
×
1635

1636
        if target_path.exists() {
3✔
1637
            let src_meta =
2✔
1638
                fs::metadata(&resolved_source).map_err(|err| SamplerError::SourceUnavailable {
2✔
NEW
1639
                    source_id: config.source_id.clone(),
×
NEW
1640
                    reason: format!(
×
1641
                        "failed reading source metadata {}: {err}",
NEW
1642
                        resolved_source.display()
×
1643
                    ),
NEW
1644
                })?;
×
1645
            let dst_meta =
2✔
1646
                fs::metadata(target_path).map_err(|err| SamplerError::SourceUnavailable {
2✔
NEW
1647
                    source_id: config.source_id.clone(),
×
NEW
1648
                    reason: format!(
×
1649
                        "failed reading target metadata {}: {err}",
NEW
1650
                        target_path.display()
×
1651
                    ),
NEW
1652
                })?;
×
1653
            if src_meta.len() == dst_meta.len() {
2✔
1654
                return Ok(());
1✔
1655
            }
1✔
1656
            fs::remove_file(target_path).map_err(|err| SamplerError::SourceUnavailable {
1✔
NEW
1657
                source_id: config.source_id.clone(),
×
NEW
1658
                reason: format!(
×
1659
                    "failed replacing target file {}: {err}",
NEW
1660
                    target_path.display()
×
1661
                ),
NEW
1662
            })?;
×
1663
        }
1✔
1664

1665
        fs::copy(&resolved_source, target_path).map_err(|err| SamplerError::SourceUnavailable {
2✔
NEW
1666
            source_id: config.source_id.clone(),
×
NEW
1667
            reason: format!(
×
1668
                "failed copying synced file {} -> {}: {err}",
NEW
1669
                resolved_source.display(),
×
NEW
1670
                target_path.display()
×
1671
            ),
NEW
1672
        })?;
×
1673
        Ok(())
2✔
1674
    }
3✔
1675

1676
    /// Build deterministic local shard index for accepted extensions.
1677
    fn build_shard_index(
7✔
1678
        config: &HuggingFaceRowsConfig,
7✔
1679
    ) -> Result<(Vec<ShardIndex>, usize), SamplerError> {
7✔
1680
        let start_index = Instant::now();
7✔
1681
        let mut shard_paths = Vec::new();
7✔
1682
        let accepted = config
7✔
1683
            .shard_extensions
7✔
1684
            .iter()
7✔
1685
            .map(|ext| ext.trim().trim_start_matches('.').to_ascii_lowercase())
15✔
1686
            .collect::<Vec<_>>();
7✔
1687

1688
        let mut saw_parquet = false;
7✔
1689
        for entry in WalkDir::new(&config.snapshot_dir)
16✔
1690
            .follow_links(true)
7✔
1691
            .into_iter()
7✔
1692
            .filter_map(Result::ok)
7✔
1693
        {
1694
            if !entry.file_type().is_file() {
16✔
1695
                continue;
7✔
1696
            }
9✔
1697
            let Some(ext) = entry.path().extension().and_then(|v| v.to_str()) else {
9✔
NEW
1698
                continue;
×
1699
            };
1700
            if ext.eq_ignore_ascii_case("parquet") {
9✔
1701
                saw_parquet = true;
1✔
1702
            }
8✔
1703
            if accepted
9✔
1704
                .iter()
9✔
1705
                .any(|allowed| allowed == &ext.to_ascii_lowercase())
19✔
1706
            {
7✔
1707
                shard_paths.push(entry.path().to_path_buf());
7✔
1708
            }
7✔
1709
        }
1710

1711
        shard_paths.sort();
7✔
1712
        if shard_paths.is_empty() {
7✔
1713
            if saw_parquet && !accepted.iter().any(|value| value == "parquet") {
2✔
1714
                return Err(SamplerError::SourceUnavailable {
1✔
1715
                    source_id: config.source_id.clone(),
1✔
1716
                    reason: format!(
1✔
1717
                        "found parquet files under {}, but shard_extensions does not include parquet.",
1✔
1718
                        config.snapshot_dir.display()
1✔
1719
                    ),
1✔
1720
                });
1✔
1721
            }
1✔
1722
            return Err(SamplerError::SourceUnavailable {
1✔
1723
                source_id: config.source_id.clone(),
1✔
1724
                reason: format!(
1✔
1725
                    "no shard files found under {} with extensions {:?}",
1✔
1726
                    config.snapshot_dir.display(),
1✔
1727
                    config.shard_extensions
1✔
1728
                ),
1✔
1729
            });
1✔
1730
        }
5✔
1731

1732
        let mut indexed_shards = shard_paths
5✔
1733
            .into_par_iter()
5✔
1734
            .enumerate()
5✔
1735
            .map(|(ordinal, path)| {
7✔
1736
                info!(
7✔
1737
                    "[triplets:hf] indexing shard {}: {}",
1738
                    ordinal + 1,
5✔
1739
                    path.display()
5✔
1740
                );
1741
                let shard = Self::index_single_shard(config, &path, 0)?;
7✔
1742
                Ok::<_, SamplerError>((ordinal, shard))
7✔
1743
            })
7✔
1744
            .collect::<Result<Vec<_>, _>>()?;
5✔
1745

1746
        indexed_shards.sort_by_key(|(ordinal, _)| *ordinal);
5✔
1747

1748
        let mut shards = Vec::new();
5✔
1749
        let mut running_total = 0usize;
5✔
1750
        for (_, maybe_shard) in indexed_shards {
7✔
1751
            let Some(mut shard) = maybe_shard else {
7✔
1752
                continue;
1✔
1753
            };
1754

1755
            if let Some(max_rows) = config.max_rows {
6✔
1756
                if running_total >= max_rows {
3✔
NEW
1757
                    break;
×
1758
                }
3✔
1759
                let allowed = max_rows.saturating_sub(running_total);
3✔
1760
                if shard.row_count > allowed {
3✔
1761
                    shard.row_count = allowed;
1✔
1762
                    if shard.is_parquet {
1✔
NEW
1763
                        shard
×
NEW
1764
                            .parquet_row_groups
×
NEW
1765
                            .retain(|(start, _)| *start < shard.row_count);
×
NEW
1766
                        if let Some((start, count)) = shard.parquet_row_groups.last_mut() {
×
NEW
1767
                            let group_allowed = shard.row_count.saturating_sub(*start);
×
NEW
1768
                            *count = (*count).min(group_allowed);
×
NEW
1769
                        }
×
1770
                    }
1✔
1771
                }
2✔
1772
            }
3✔
1773

1774
            if shard.row_count == 0 {
6✔
NEW
1775
                continue;
×
1776
            }
6✔
1777

1778
            shard.global_start = running_total;
6✔
1779
            running_total = running_total.saturating_add(shard.row_count);
6✔
1780
            shards.push(shard);
6✔
1781
        }
1782

1783
        info!(
5✔
1784
            "[triplets:hf] indexing complete in {:.2}s (rows={}, shards={})",
1785
            start_index.elapsed().as_secs_f64(),
3✔
1786
            running_total,
1787
            shards.len()
3✔
1788
        );
1789

1790
        Ok((shards, running_total))
5✔
1791
    }
7✔
1792

1793
    /// Locate containing shard and local offset for a global row index.
1794
    fn locate_shard(shards: &[ShardIndex], idx: usize) -> Option<(&ShardIndex, usize)> {
18✔
1795
        let pos = shards
18✔
1796
            .binary_search_by(|shard| {
18✔
1797
                if idx < shard.global_start {
18✔
1798
                    Ordering::Greater
1✔
1799
                } else if idx >= shard.global_start + shard.row_count {
17✔
1800
                    Ordering::Less
1✔
1801
                } else {
1802
                    Ordering::Equal
16✔
1803
                }
1804
            })
18✔
1805
            .ok()?;
18✔
1806
        let shard = shards.get(pos)?;
16✔
1807
        Some((shard, idx - shard.global_start))
16✔
1808
    }
18✔
1809

1810
    /// Read one JSONL/NDJSON line at a local row offset using checkpoints.
1811
    fn read_line_at(&self, shard: &ShardIndex, local_idx: usize) -> Result<String, SamplerError> {
16✔
1812
        let checkpoint_idx = local_idx / self.config.checkpoint_stride;
16✔
1813
        let checkpoint_line = checkpoint_idx * self.config.checkpoint_stride;
16✔
1814
        let seek_offset = *shard.checkpoints.get(checkpoint_idx).ok_or_else(|| {
16✔
1815
            SamplerError::SourceUnavailable {
2✔
1816
                source_id: self.config.source_id.clone(),
2✔
1817
                reason: format!(
2✔
1818
                    "missing checkpoint for shard {} line {}",
2✔
1819
                    shard.path.display(),
2✔
1820
                    local_idx
2✔
1821
                ),
2✔
1822
            }
2✔
1823
        })?;
2✔
1824

1825
        let mut file = File::open(&shard.path).map_err(|err| SamplerError::SourceUnavailable {
14✔
NEW
1826
            source_id: self.config.source_id.clone(),
×
NEW
1827
            reason: format!("failed opening shard {}: {err}", shard.path.display()),
×
NEW
1828
        })?;
×
1829
        file.seek(SeekFrom::Start(seek_offset))
14✔
1830
            .map_err(|err| SamplerError::SourceUnavailable {
14✔
NEW
1831
                source_id: self.config.source_id.clone(),
×
NEW
1832
                reason: format!("failed seeking shard {}: {err}", shard.path.display()),
×
NEW
1833
            })?;
×
1834

1835
        let mut reader = BufReader::new(file);
14✔
1836
        let mut line = String::new();
14✔
1837
        for _ in checkpoint_line..local_idx {
14✔
1838
            line.clear();
105✔
1839
            let bytes =
105✔
1840
                reader
105✔
1841
                    .read_line(&mut line)
105✔
1842
                    .map_err(|err| SamplerError::SourceUnavailable {
105✔
NEW
1843
                        source_id: self.config.source_id.clone(),
×
NEW
1844
                        reason: format!("failed scanning shard {}: {err}", shard.path.display()),
×
NEW
1845
                    })?;
×
1846
            if bytes == 0 {
105✔
NEW
1847
                return Err(SamplerError::SourceUnavailable {
×
NEW
1848
                    source_id: self.config.source_id.clone(),
×
NEW
1849
                    reason: format!(
×
NEW
1850
                        "unexpected EOF while scanning shard {} at row {}",
×
NEW
1851
                        shard.path.display(),
×
NEW
1852
                        local_idx
×
NEW
1853
                    ),
×
NEW
1854
                });
×
1855
            }
105✔
1856
        }
1857

1858
        line.clear();
14✔
1859
        let bytes = reader
14✔
1860
            .read_line(&mut line)
14✔
1861
            .map_err(|err| SamplerError::SourceUnavailable {
14✔
NEW
1862
                source_id: self.config.source_id.clone(),
×
NEW
1863
                reason: format!("failed reading shard {}: {err}", shard.path.display()),
×
NEW
1864
            })?;
×
1865
        if bytes == 0 {
14✔
1866
            return Err(SamplerError::SourceUnavailable {
1✔
1867
                source_id: self.config.source_id.clone(),
1✔
1868
                reason: format!(
1✔
1869
                    "unexpected EOF while reading shard {} row {}",
1✔
1870
                    shard.path.display(),
1✔
1871
                    local_idx
1✔
1872
                ),
1✔
1873
            });
1✔
1874
        }
13✔
1875
        Ok(line)
13✔
1876
    }
16✔
1877

1878
    /// Locate parquet row-group and in-group row offset for a local row index.
1879
    fn locate_parquet_group(
5✔
1880
        &self,
5✔
1881
        shard: &ShardIndex,
5✔
1882
        local_idx: usize,
5✔
1883
    ) -> Result<(usize, usize), SamplerError> {
5✔
1884
        let group_pos = shard
5✔
1885
            .parquet_row_groups
5✔
1886
            .binary_search_by(|(start, count)| {
9✔
1887
                if local_idx < *start {
9✔
1888
                    Ordering::Greater
1✔
1889
                } else if local_idx >= start.saturating_add(*count) {
8✔
1890
                    Ordering::Less
3✔
1891
                } else {
1892
                    Ordering::Equal
5✔
1893
                }
1894
            })
9✔
1895
            .map_err(|_| SamplerError::SourceUnavailable {
5✔
1896
                source_id: self.config.source_id.clone(),
1✔
1897
                reason: format!(
1✔
1898
                    "parquet row {} could not be mapped to a row group in {}",
1899
                    local_idx,
1900
                    shard.path.display()
1✔
1901
                ),
1902
            })?;
1✔
1903
        let (group_start, _) = shard.parquet_row_groups[group_pos];
4✔
1904
        Ok((group_pos, local_idx.saturating_sub(group_start)))
4✔
1905
    }
5✔
1906

1907
    /// Convert a serde JSON value into non-empty text when possible.
1908
    fn value_to_text(value: &Value) -> Option<String> {
51✔
1909
        match value {
51✔
1910
            Value::Null => None,
1✔
1911
            Value::String(s) => {
44✔
1912
                if s.trim().is_empty() {
44✔
1913
                    None
2✔
1914
                } else {
1915
                    Some(s.clone())
42✔
1916
                }
1917
            }
1918
            Value::Bool(b) => Some(b.to_string()),
2✔
1919
            Value::Number(n) => Some(n.to_string()),
3✔
1920
            Value::Array(_) | Value::Object(_) => Some(value.to_string()),
1✔
1921
        }
1922
    }
51✔
1923

1924
    /// Parse a raw row payload into normalized `RowView` fields.
1925
    fn parse_row(&self, absolute_idx: usize, row_value: &Value) -> Result<RowView, SamplerError> {
22✔
1926
        let row_payload = row_value.get("row").unwrap_or(row_value);
22✔
1927
        let row_obj = row_payload
22✔
1928
            .as_object()
22✔
1929
            .ok_or_else(|| SamplerError::SourceUnavailable {
22✔
1930
                source_id: self.config.source_id.clone(),
1✔
1931
                reason: "snapshot row entry missing JSON object payload".to_string(),
1✔
1932
            })?;
1✔
1933

1934
        let row_id = self
21✔
1935
            .config
21✔
1936
            .id_column
21✔
1937
            .as_ref()
21✔
1938
            .and_then(|col| row_obj.get(col))
21✔
1939
            .and_then(Self::value_to_text)
21✔
1940
            .unwrap_or_else(|| {
21✔
1941
                format!(
5✔
1942
                    "{}:{}:{}",
1943
                    self.config.dataset, self.config.split, absolute_idx
1944
                )
1945
            });
5✔
1946

1947
        let mut text_fields = Vec::new();
21✔
1948
        let use_role_columns = self.config.anchor_column.is_some()
21✔
1949
            || self.config.positive_column.is_some()
17✔
1950
            || !self.config.context_columns.is_empty();
17✔
1951

1952
        if use_role_columns {
21✔
1953
            if let Some(name) = &self.config.anchor_column {
4✔
1954
                let value = row_obj
4✔
1955
                    .get(name)
4✔
1956
                    .ok_or_else(|| SamplerError::SourceInconsistent {
4✔
NEW
1957
                        source_id: self.config.source_id.clone(),
×
NEW
1958
                        details: format!("missing configured anchor column '{name}'"),
×
NEW
1959
                    })?;
×
1960
                let text =
3✔
1961
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
4✔
1962
                        source_id: self.config.source_id.clone(),
1✔
1963
                        details: format!("configured anchor column '{name}' has null/empty value"),
1✔
1964
                    })?;
1✔
1965
                text_fields.push(RowTextField {
3✔
1966
                    name: name.clone(),
3✔
1967
                    text,
3✔
1968
                });
3✔
NEW
1969
            }
×
1970

1971
            if let Some(name) = &self.config.positive_column {
3✔
1972
                let value = row_obj
2✔
1973
                    .get(name)
2✔
1974
                    .ok_or_else(|| SamplerError::SourceInconsistent {
2✔
1975
                        source_id: self.config.source_id.clone(),
1✔
1976
                        details: format!("missing configured positive column '{name}'"),
1✔
1977
                    })?;
1✔
1978
                let text =
1✔
1979
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
1✔
NEW
1980
                        source_id: self.config.source_id.clone(),
×
NEW
1981
                        details: format!(
×
1982
                            "configured positive column '{name}' has null/empty value"
1983
                        ),
NEW
1984
                    })?;
×
1985
                text_fields.push(RowTextField {
1✔
1986
                    name: name.clone(),
1✔
1987
                    text,
1✔
1988
                });
1✔
1989
            }
1✔
1990

1991
            for name in &self.config.context_columns {
3✔
1992
                let value = row_obj
3✔
1993
                    .get(name)
3✔
1994
                    .ok_or_else(|| SamplerError::SourceInconsistent {
3✔
1995
                        source_id: self.config.source_id.clone(),
1✔
1996
                        details: format!("missing configured context column '{name}'"),
1✔
1997
                    })?;
1✔
1998
                let text =
2✔
1999
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
2✔
NEW
2000
                        source_id: self.config.source_id.clone(),
×
NEW
2001
                        details: format!("configured context column '{name}' has null/empty value"),
×
NEW
2002
                    })?;
×
2003
                text_fields.push(RowTextField {
2✔
2004
                    name: name.clone(),
2✔
2005
                    text,
2✔
2006
                });
2✔
2007
            }
2008
        } else if self.config.text_columns.is_empty() {
17✔
2009
            for (name, value) in row_obj {
23✔
2010
                if self.config.id_column.as_ref().is_some_and(|id| id == name) {
23✔
2011
                    continue;
10✔
2012
                }
13✔
2013
                if let Some(text) = Self::value_to_text(value) {
13✔
2014
                    text_fields.push(RowTextField {
13✔
2015
                        name: name.clone(),
13✔
2016
                        text,
13✔
2017
                    });
13✔
2018
                }
13✔
2019
            }
2020
        } else {
2021
            for name in &self.config.text_columns {
9✔
2022
                let value = row_obj
9✔
2023
                    .get(name)
9✔
2024
                    .ok_or_else(|| SamplerError::SourceInconsistent {
9✔
NEW
2025
                        source_id: self.config.source_id.clone(),
×
NEW
2026
                        details: format!("missing configured text column '{name}'"),
×
NEW
2027
                    })?;
×
2028
                let text =
9✔
2029
                    Self::value_to_text(value).ok_or_else(|| SamplerError::SourceInconsistent {
9✔
NEW
2030
                        source_id: self.config.source_id.clone(),
×
NEW
2031
                        details: format!("configured text column '{name}' has null/empty value"),
×
NEW
2032
                    })?;
×
2033
                text_fields.push(RowTextField {
9✔
2034
                    name: name.clone(),
9✔
2035
                    text,
9✔
2036
                });
9✔
2037
            }
2038
        }
2039

2040
        if text_fields.is_empty() {
18✔
NEW
2041
            return Err(SamplerError::SourceInconsistent {
×
NEW
2042
                source_id: self.config.source_id.clone(),
×
NEW
2043
                details: "row resolved to zero text fields".to_string(),
×
NEW
2044
            });
×
2045
        }
18✔
2046

2047
        Ok(RowView {
18✔
2048
            row_id: Some(row_id),
18✔
2049
            timestamp: None,
18✔
2050
            text_fields,
18✔
2051
        })
18✔
2052
    }
22✔
2053

2054
    /// Convert a `RowView` into a sampler `DataRecord`.
2055
    fn row_to_record(
22✔
2056
        &self,
22✔
2057
        row: &RowView,
22✔
2058
        row_index: u64,
22✔
2059
    ) -> Result<Option<DataRecord>, SamplerError> {
22✔
2060
        if row.text_fields.is_empty() {
22✔
2061
            return Ok(None);
1✔
2062
        }
21✔
2063

2064
        let record_id = row
21✔
2065
            .row_id
21✔
2066
            .as_ref()
21✔
2067
            .cloned()
21✔
2068
            .unwrap_or_else(|| format!("row_{row_index}"));
21✔
2069
        let id = format!("{}::{}", self.config.source_id, record_id);
21✔
2070

2071
        let mut sections = Vec::new();
21✔
2072
        let anchor = &row.text_fields[0];
21✔
2073
        sections.push(make_section(
21✔
2074
            SectionRole::Anchor,
21✔
2075
            Some(anchor.name.as_str()),
21✔
2076
            anchor.text.as_str(),
21✔
2077
        ));
2078

2079
        let positive = row.text_fields.get(1).unwrap_or(anchor);
21✔
2080
        sections.push(make_section(
21✔
2081
            SectionRole::Context,
21✔
2082
            Some(positive.name.as_str()),
21✔
2083
            positive.text.as_str(),
21✔
2084
        ));
2085

2086
        for field in row.text_fields.iter().skip(2) {
21✔
2087
            sections.push(make_section(
1✔
2088
                SectionRole::Context,
1✔
2089
                Some(field.name.as_str()),
1✔
2090
                field.text.as_str(),
1✔
2091
            ));
1✔
2092
        }
1✔
2093

2094
        let timestamp = row.timestamp.unwrap_or(DateTime::<Utc>::UNIX_EPOCH);
21✔
2095
        Ok(Some(DataRecord {
21✔
2096
            id,
21✔
2097
            source: self.config.source_id.clone(),
21✔
2098
            created_at: timestamp,
21✔
2099
            updated_at: timestamp,
21✔
2100
            quality: QualityScore::default(),
21✔
2101
            taxonomy: vec![
21✔
2102
                format!("dataset={}", self.config.dataset),
21✔
2103
                format!("config={}", self.config.config),
21✔
2104
                format!("split={}", self.config.split),
21✔
2105
            ],
21✔
2106
            sections,
21✔
2107
            meta_prefix: None,
21✔
2108
        }))
21✔
2109
    }
22✔
2110

2111
    /// Materialize records for requested indices into output buffer.
2112
    fn read_row_batch(
14✔
2113
        &self,
14✔
2114
        indices: &[usize],
14✔
2115
        out: &mut Vec<DataRecord>,
14✔
2116
        limit: Option<usize>,
14✔
2117
    ) -> Result<(), SamplerError> {
14✔
2118
        let mut sorted = indices.to_vec();
14✔
2119
        sorted.sort_unstable();
14✔
2120

2121
        let mut fetched = HashMap::with_capacity(sorted.len());
14✔
2122
        let mut pending = Vec::new();
14✔
2123
        for idx in &sorted {
22✔
2124
            if !self.ensure_row_available(*idx)? {
22✔
2125
                fetched.insert(*idx, None);
2✔
2126
                continue;
2✔
2127
            }
20✔
2128

2129
            if let Some(row) = self
20✔
2130
                .cache
20✔
2131
                .lock()
20✔
2132
                .map_err(|_| SamplerError::SourceUnavailable {
20✔
NEW
2133
                    source_id: self.config.source_id.clone(),
×
NEW
2134
                    reason: "huggingface row cache lock poisoned".to_string(),
×
NEW
2135
                })?
×
2136
                .get(*idx)
20✔
2137
            {
2138
                let record = self.row_to_record(&row, *idx as u64)?;
4✔
2139
                fetched.insert(*idx, record);
4✔
2140
                continue;
4✔
2141
            }
16✔
2142

2143
            pending.push(*idx);
16✔
2144
        }
2145

2146
        if !pending.is_empty() {
14✔
2147
            let resolutions =
10✔
2148
                {
2149
                    let state = self
11✔
2150
                        .state
11✔
2151
                        .lock()
11✔
2152
                        .map_err(|_| SamplerError::SourceUnavailable {
11✔
NEW
2153
                            source_id: self.config.source_id.clone(),
×
NEW
2154
                            reason: "huggingface source state lock poisoned".to_string(),
×
NEW
2155
                        })?;
×
2156
                    let mut resolved = Vec::with_capacity(pending.len());
11✔
2157
                    for idx in &pending {
16✔
2158
                        let (shard, local_idx) = Self::locate_shard(&state.shards, *idx)
16✔
2159
                            .ok_or_else(|| SamplerError::SourceUnavailable {
16✔
2160
                                source_id: self.config.source_id.clone(),
1✔
2161
                                reason: format!("row index out of range: {idx}"),
1✔
2162
                            })?;
1✔
2163
                        resolved.push((*idx, shard.clone(), local_idx));
15✔
2164
                    }
2165
                    resolved
10✔
2166
                };
2167

2168
            let mut parquet_groups: HashMap<ParquetGroupKey, Vec<ParquetGroupRequest>> =
10✔
2169
                HashMap::new();
10✔
2170
            for (idx, shard, local_idx) in resolutions {
15✔
2171
                if shard.is_parquet {
15✔
2172
                    let (group_pos, local_in_group) =
3✔
2173
                        self.locate_parquet_group(&shard, local_idx)?;
3✔
2174
                    parquet_groups
3✔
2175
                        .entry((shard.path.clone(), group_pos))
3✔
2176
                        .or_default()
3✔
2177
                        .push((idx, local_in_group, shard));
3✔
2178
                    continue;
3✔
2179
                }
12✔
2180

2181
                let line = self.read_line_at(&shard, local_idx)?;
12✔
2182
                let row_value = serde_json::from_str::<Value>(line.trim()).map_err(|err| {
12✔
2183
                    SamplerError::SourceInconsistent {
1✔
2184
                        source_id: self.config.source_id.clone(),
1✔
2185
                        details: format!(
1✔
2186
                            "failed decoding JSON row from shard {} at local index {}: {err}",
1✔
2187
                            shard.path.display(),
1✔
2188
                            local_idx
1✔
2189
                        ),
1✔
2190
                    }
1✔
2191
                })?;
1✔
2192
                let row = self.parse_row(idx, &row_value)?;
11✔
2193
                let record = self.row_to_record(&row, idx as u64)?;
11✔
2194
                self.cache
11✔
2195
                    .lock()
11✔
2196
                    .map_err(|_| SamplerError::SourceUnavailable {
11✔
NEW
2197
                        source_id: self.config.source_id.clone(),
×
NEW
2198
                        reason: "huggingface row cache lock poisoned".to_string(),
×
NEW
2199
                    })?
×
2200
                    .insert(idx, row, self.config.cache_capacity);
11✔
2201
                fetched.insert(idx, record);
11✔
2202
            }
2203

2204
            for ((shard_path, group_pos), mut requested) in parquet_groups {
9✔
2205
                requested.sort_by_key(|(_, local_in_group, _)| *local_in_group);
2✔
2206
                let shard = requested
2✔
2207
                    .first()
2✔
2208
                    .map(|(_, _, shard)| shard.clone())
2✔
2209
                    .ok_or_else(|| SamplerError::SourceUnavailable {
2✔
NEW
2210
                        source_id: self.config.source_id.clone(),
×
NEW
2211
                        reason: format!(
×
2212
                            "missing parquet request metadata for shard {} row_group {}",
NEW
2213
                            shard_path.display(),
×
2214
                            group_pos
2215
                        ),
NEW
2216
                    })?;
×
2217

2218
                let mut targets: BTreeMap<usize, Vec<usize>> = BTreeMap::new();
2✔
2219
                for (idx, local_in_group, _) in requested {
3✔
2220
                    targets.entry(local_in_group).or_default().push(idx);
3✔
2221
                }
3✔
2222
                let max_target = targets.keys().next_back().copied().unwrap_or(0);
2✔
2223

2224
                let reader = self
2✔
2225
                    .parquet_cache
2✔
2226
                    .lock()
2✔
2227
                    .map_err(|_| SamplerError::SourceUnavailable {
2✔
NEW
2228
                        source_id: self.config.source_id.clone(),
×
NEW
2229
                        reason: "huggingface parquet cache lock poisoned".to_string(),
×
NEW
2230
                    })?
×
2231
                    .reader_for(&self.config.source_id, &shard.path)?;
2✔
2232

2233
                let row_group = reader.get_row_group(group_pos).map_err(|err| {
1✔
NEW
2234
                    SamplerError::SourceUnavailable {
×
NEW
2235
                        source_id: self.config.source_id.clone(),
×
NEW
2236
                        reason: format!(
×
NEW
2237
                            "failed opening parquet row group {} for {}: {err}",
×
NEW
2238
                            group_pos,
×
NEW
2239
                            shard.path.display()
×
NEW
2240
                        ),
×
NEW
2241
                    }
×
NEW
2242
                })?;
×
2243
                let iter = RowIter::from_row_group(None, row_group.as_ref()).map_err(|err| {
1✔
NEW
2244
                    SamplerError::SourceUnavailable {
×
NEW
2245
                        source_id: self.config.source_id.clone(),
×
NEW
2246
                        reason: format!(
×
NEW
2247
                            "failed iterating parquet row group {} for {}: {err}",
×
NEW
2248
                            group_pos,
×
NEW
2249
                            shard.path.display()
×
NEW
2250
                        ),
×
NEW
2251
                    }
×
NEW
2252
                })?;
×
2253

2254
                for (position, row_result) in iter.enumerate() {
2✔
2255
                    if position > max_target {
2✔
NEW
2256
                        break;
×
2257
                    }
2✔
2258
                    let Some(indices_for_position) = targets.remove(&position) else {
2✔
NEW
2259
                        continue;
×
2260
                    };
2261
                    let row_value = row_result.map_err(|err| SamplerError::SourceUnavailable {
2✔
NEW
2262
                        source_id: self.config.source_id.clone(),
×
NEW
2263
                        reason: format!(
×
2264
                            "failed reading parquet row {} in shard {} row_group {}: {err}",
2265
                            position,
NEW
2266
                            shard.path.display(),
×
2267
                            group_pos
2268
                        ),
NEW
2269
                    })?;
×
2270
                    let row_value = row_value.to_json_value();
2✔
2271

2272
                    for idx in indices_for_position {
2✔
2273
                        let row = self.parse_row(idx, &row_value)?;
2✔
2274
                        let record = self.row_to_record(&row, idx as u64)?;
2✔
2275
                        self.cache
2✔
2276
                            .lock()
2✔
2277
                            .map_err(|_| SamplerError::SourceUnavailable {
2✔
NEW
2278
                                source_id: self.config.source_id.clone(),
×
NEW
2279
                                reason: "huggingface row cache lock poisoned".to_string(),
×
NEW
2280
                            })?
×
2281
                            .insert(idx, row, self.config.cache_capacity);
2✔
2282
                        fetched.insert(idx, record);
2✔
2283
                    }
2284

2285
                    if targets.is_empty() {
2✔
2286
                        break;
1✔
2287
                    }
1✔
2288
                }
2289

2290
                if !targets.is_empty() {
1✔
NEW
2291
                    let missing = targets
×
NEW
2292
                        .into_keys()
×
NEW
2293
                        .map(|value| value.to_string())
×
NEW
2294
                        .collect::<Vec<_>>()
×
NEW
2295
                        .join(",");
×
NEW
2296
                    return Err(SamplerError::SourceUnavailable {
×
NEW
2297
                        source_id: self.config.source_id.clone(),
×
NEW
2298
                        reason: format!(
×
NEW
2299
                            "parquet rows missing in shard {} row_group {} at local offsets [{}]",
×
NEW
2300
                            shard.path.display(),
×
NEW
2301
                            group_pos,
×
NEW
2302
                            missing
×
NEW
2303
                        ),
×
NEW
2304
                    });
×
2305
                }
1✔
2306
            }
2307
        }
3✔
2308

2309
        for idx in indices {
19✔
2310
            if limit.is_some_and(|max| out.len() >= max) {
19✔
2311
                break;
1✔
2312
            }
18✔
2313
            if let Some(record) = fetched.remove(idx).flatten() {
18✔
2314
                out.push(record);
16✔
2315
            }
16✔
2316
        }
2317
        Ok(())
11✔
2318
    }
14✔
2319

2320
    /// Return the current index-domain upper bound for refresh paging.
2321
    fn len_hint(&self) -> Option<usize> {
17✔
2322
        let state = self.state.lock().ok()?;
17✔
2323
        let known = state.materialized_rows;
17✔
2324
        if known > 0 {
17✔
2325
            let mut upper = known;
12✔
2326
            if state
12✔
2327
                .total_rows
12✔
2328
                .is_some_and(|total_rows| total_rows > known)
12✔
2329
            {
2330
                let headroom = self.effective_expansion_headroom_rows();
4✔
2331
                upper = known.saturating_add(headroom);
4✔
2332
                if let Some(total_rows) = state.total_rows {
4✔
2333
                    upper = upper.min(total_rows);
4✔
2334
                }
4✔
2335
            }
8✔
2336
            if let Some(max_rows) = self.config.max_rows {
12✔
2337
                upper = upper.min(max_rows);
5✔
2338
            }
7✔
2339
            return Some(upper.max(known));
12✔
2340
        }
5✔
2341

2342
        if state.total_rows.is_some_and(|total_rows| total_rows == 0) {
5✔
2343
            return Some(0);
2✔
2344
        }
3✔
2345

2346
        if state
3✔
2347
            .remote_candidates
3✔
2348
            .as_ref()
3✔
2349
            .is_some_and(|candidates| candidates.is_empty())
3✔
2350
        {
NEW
2351
            return Some(0);
×
2352
        }
3✔
2353

2354
        if self.config.max_rows.is_some_and(|max_rows| max_rows == 0) {
3✔
2355
            return Some(0);
1✔
2356
        }
2✔
2357

2358
        Some(1)
2✔
2359
    }
17✔
2360
}
2361

2362
impl DataSource for HuggingFaceRowSource {
2363
    /// Return stable source id.
NEW
2364
    fn id(&self) -> &str {
×
NEW
2365
        &self.config.source_id
×
NEW
2366
    }
×
2367

2368
    /// Store active sampler configuration for runtime behavior alignment.
2369
    fn configure_sampler(&self, config: &SamplerConfig) {
3✔
2370
        if let Ok(mut slot) = self.sampler_config.lock() {
3✔
2371
            *slot = Some(config.clone());
3✔
2372
        }
3✔
2373
    }
3✔
2374

2375
    /// Refresh source records for the requested cursor and row limit.
2376
    fn refresh(
8✔
2377
        &self,
8✔
2378
        cursor: Option<&SourceCursor>,
8✔
2379
        limit: Option<usize>,
8✔
2380
    ) -> Result<SourceSnapshot, SamplerError> {
8✔
2381
        let total = self
8✔
2382
            .len_hint()
8✔
2383
            .ok_or_else(|| SamplerError::SourceInconsistent {
8✔
NEW
2384
                source_id: self.config.source_id.clone(),
×
NEW
2385
                details: "huggingface source did not provide len_hint".to_string(),
×
NEW
2386
            })?;
×
2387

2388
        if total == 0 {
8✔
2389
            return Ok(SourceSnapshot {
1✔
2390
                records: Vec::new(),
1✔
2391
                cursor: SourceCursor {
1✔
2392
                    last_seen: Utc::now(),
1✔
2393
                    revision: 0,
1✔
2394
                },
1✔
2395
            });
1✔
2396
        }
7✔
2397

2398
        let max = limit.unwrap_or(total);
7✔
2399
        let mut start = cursor.map(|state| state.revision as usize).unwrap_or(0);
7✔
2400
        if start >= total {
7✔
2401
            start = 0;
1✔
2402
        }
6✔
2403

2404
        let source_id = self.config.source_id.clone();
7✔
2405
        let seed = crate::source::IndexablePager::seed_for(&source_id, total);
7✔
2406
        let mut permutation = crate::source::IndexPermutation::new(total, seed, start as u64);
7✔
2407

2408
        let mut records = Vec::new();
7✔
2409
        let read_batch_target = self.effective_refresh_batch_target(max);
7✔
2410
        let mut pending_indices = Vec::with_capacity(read_batch_target);
7✔
2411
        let should_report = total >= 10_000 || max >= 1_024;
7✔
2412
        let report_every = Duration::from_millis(750);
7✔
2413
        let refresh_start = Instant::now();
7✔
2414
        let mut last_report = refresh_start;
7✔
2415
        let mut attempts = 0usize;
7✔
2416

2417
        if should_report {
7✔
2418
            info!(
1✔
2419
                "[triplets:source] refresh start source='{}' total={} target={}",
2420
                source_id, total, max
2421
            );
2422
        }
6✔
2423

2424
        while attempts < total && records.len() < max {
14✔
2425
            pending_indices.clear();
7✔
2426
            let remaining_attempts = total.saturating_sub(attempts);
7✔
2427
            let to_collect = read_batch_target.min(remaining_attempts);
7✔
2428
            for _ in 0..to_collect {
7✔
2429
                if records.len() + pending_indices.len() >= max {
12✔
2430
                    break;
1✔
2431
                }
11✔
2432
                pending_indices.push(permutation.next());
11✔
2433
                attempts += 1;
11✔
2434
            }
2435

2436
            if pending_indices.is_empty() {
7✔
NEW
2437
                break;
×
2438
            }
7✔
2439

2440
            if should_report {
7✔
2441
                info!(
1✔
2442
                    "[triplets:source] refresh batch source='{}' batch_size={} attempted={} fetched={} elapsed={:.1}s",
2443
                    source_id,
2444
                    pending_indices.len(),
1✔
2445
                    attempts,
2446
                    records.len(),
1✔
2447
                    refresh_start.elapsed().as_secs_f64()
1✔
2448
                );
2449
            }
6✔
2450

2451
            self.read_row_batch(&pending_indices, &mut records, Some(max))?;
7✔
2452

2453
            if should_report && last_report.elapsed() >= report_every {
7✔
NEW
2454
                info!(
×
2455
                    "[triplets:source] refresh progress source='{}' attempted={}/{} fetched={}/{} elapsed={:.1}s",
2456
                    source_id,
2457
                    attempts,
2458
                    total,
NEW
2459
                    records.len(),
×
2460
                    max,
NEW
2461
                    refresh_start.elapsed().as_secs_f64()
×
2462
                );
NEW
2463
                last_report = Instant::now();
×
2464
            }
7✔
2465
        }
2466

2467
        if should_report {
7✔
2468
            info!(
1✔
2469
                "[triplets:source] refresh done source='{}' attempted={} fetched={} elapsed={:.2}s",
2470
                source_id,
2471
                attempts,
2472
                records.len(),
1✔
2473
                refresh_start.elapsed().as_secs_f64()
1✔
2474
            );
2475
        }
6✔
2476

2477
        let next_start = permutation.cursor();
7✔
2478
        let last_seen = records
7✔
2479
            .iter()
7✔
2480
            .map(|record| record.updated_at)
7✔
2481
            .max()
7✔
2482
            .unwrap_or_else(Utc::now);
7✔
2483

2484
        Ok(SourceSnapshot {
7✔
2485
            records,
7✔
2486
            cursor: SourceCursor {
7✔
2487
                last_seen,
7✔
2488
                revision: next_start as u64,
7✔
2489
            },
7✔
2490
        })
7✔
2491
    }
8✔
2492

2493
    /// Return exact reported record count from current len hint.
2494
    fn reported_record_count(&self) -> Result<u128, SamplerError> {
4✔
2495
        self.len_hint()
4✔
2496
            .map(|count| count as u128)
4✔
2497
            .ok_or_else(|| SamplerError::SourceInconsistent {
4✔
NEW
2498
                source_id: self.config.source_id.clone(),
×
NEW
2499
                details: "huggingface source did not provide len_hint".to_string(),
×
NEW
2500
            })
×
2501
    }
4✔
2502

2503
    /// Return default triplet recipe used by Hugging Face row sources.
2504
    fn default_triplet_recipes(&self) -> Vec<TripletRecipe> {
1✔
2505
        vec![TripletRecipe {
1✔
2506
            name: "huggingface_anchor_context".into(),
1✔
2507
            anchor: Selector::Role(SectionRole::Anchor),
1✔
2508
            positive_selector: Selector::Role(SectionRole::Context),
1✔
2509
            negative_selector: Selector::Role(SectionRole::Context),
1✔
2510
            negative_strategy: NegativeStrategy::WrongArticle,
1✔
2511
            weight: 1.0,
1✔
2512
            instruction: None,
1✔
2513
        }]
1✔
2514
    }
1✔
2515
}
2516

2517
#[cfg(test)]
2518
mod tests {
2519
    use super::*;
2520
    use parquet::data_type::{ByteArray, ByteArrayType};
2521
    use parquet::file::properties::WriterProperties;
2522
    use parquet::file::writer::SerializedFileWriter;
2523
    use parquet::schema::parser::parse_message_type;
2524
    use serde_json::json;
2525
    use std::io::{Read, Write};
2526
    use std::net::TcpListener;
2527
    use std::thread;
2528
    use tempfile::tempdir;
2529

2530
    fn test_config(snapshot_dir: PathBuf) -> HuggingFaceRowsConfig {
94✔
2531
        let mut config =
94✔
2532
            HuggingFaceRowsConfig::new("hf_test", "org/dataset", "default", "train", snapshot_dir);
94✔
2533
        config.cache_capacity = 10;
94✔
2534
        config.remote_expansion_headroom_multiplier = 3;
94✔
2535
        config
94✔
2536
    }
94✔
2537

2538
    fn test_source(config: HuggingFaceRowsConfig) -> HuggingFaceRowSource {
55✔
2539
        HuggingFaceRowSource {
55✔
2540
            config,
55✔
2541
            sampler_config: Mutex::new(None),
55✔
2542
            state: Mutex::new(SourceState {
55✔
2543
                materialized_rows: 0,
55✔
2544
                total_rows: None,
55✔
2545
                shards: Vec::new(),
55✔
2546
                remote_candidates: None,
55✔
2547
                remote_candidate_sizes: HashMap::new(),
55✔
2548
                next_remote_idx: 0,
55✔
2549
            }),
55✔
2550
            cache: Mutex::new(RowCache::default()),
55✔
2551
            parquet_cache: Mutex::new(ParquetCache::default()),
55✔
2552
        }
55✔
2553
    }
55✔
2554

2555
    fn spawn_one_shot_http(payload: Vec<u8>) -> (String, thread::JoinHandle<()>) {
10✔
2556
        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
10✔
2557
        let addr = listener.local_addr().unwrap();
10✔
2558
        let handle = thread::spawn(move || {
10✔
2559
            let (mut stream, _) = listener.accept().unwrap();
10✔
2560
            let mut request_buf = [0u8; 1024];
10✔
2561
            let _ = stream.read(&mut request_buf);
10✔
2562
            let headers = format!(
10✔
2563
                "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
2564
                payload.len()
10✔
2565
            );
2566
            stream.write_all(headers.as_bytes()).unwrap();
10✔
2567
            stream.write_all(&payload).unwrap();
10✔
2568
            let _ = stream.flush();
10✔
2569
        });
10✔
2570
        (format!("http://{addr}"), handle)
10✔
2571
    }
10✔
2572

2573
    fn write_parquet_fixture(path: &Path, rows: &[(&str, &str)]) {
2✔
2574
        let schema = Arc::new(
2✔
2575
            parse_message_type(
2✔
2576
                "message test_schema {
2✔
2577
                    REQUIRED BINARY id (UTF8);
2✔
2578
                    REQUIRED BINARY text (UTF8);
2✔
2579
                }",
2✔
2580
            )
2581
            .unwrap(),
2✔
2582
        );
2583
        let props = Arc::new(WriterProperties::builder().build());
2✔
2584
        let file = File::create(path).unwrap();
2✔
2585
        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
2✔
2586
        let mut row_group = writer.next_row_group().unwrap();
2✔
2587

2588
        if let Some(mut col_writer) = row_group.next_column().unwrap() {
2✔
2589
            let values = rows
2✔
2590
                .iter()
2✔
2591
                .map(|(id, _)| ByteArray::from(*id))
5✔
2592
                .collect::<Vec<_>>();
2✔
2593
            col_writer
2✔
2594
                .typed::<ByteArrayType>()
2✔
2595
                .write_batch(&values, None, None)
2✔
2596
                .unwrap();
2✔
2597
            col_writer.close().unwrap();
2✔
NEW
2598
        }
×
2599

2600
        if let Some(mut col_writer) = row_group.next_column().unwrap() {
2✔
2601
            let values = rows
2✔
2602
                .iter()
2✔
2603
                .map(|(_, text)| ByteArray::from(*text))
5✔
2604
                .collect::<Vec<_>>();
2✔
2605
            col_writer
2✔
2606
                .typed::<ByteArrayType>()
2✔
2607
                .write_batch(&values, None, None)
2✔
2608
                .unwrap();
2✔
2609
            col_writer.close().unwrap();
2✔
NEW
2610
        }
×
2611

2612
        assert!(row_group.next_column().unwrap().is_none());
2✔
2613
        row_group.close().unwrap();
2✔
2614
        writer.close().unwrap();
2✔
2615
    }
2✔
2616

2617
    #[test]
2618
    fn row_cache_insert_and_evicts_oldest_entry() {
1✔
2619
        let mut cache = RowCache::default();
1✔
2620
        let row_a = RowView {
1✔
2621
            row_id: Some("a".to_string()),
1✔
2622
            timestamp: None,
1✔
2623
            text_fields: vec![RowTextField {
1✔
2624
                name: "text".to_string(),
1✔
2625
                text: "alpha".to_string(),
1✔
2626
            }],
1✔
2627
        };
1✔
2628
        let row_b = RowView {
1✔
2629
            row_id: Some("b".to_string()),
1✔
2630
            timestamp: None,
1✔
2631
            text_fields: vec![RowTextField {
1✔
2632
                name: "text".to_string(),
1✔
2633
                text: "beta".to_string(),
1✔
2634
            }],
1✔
2635
        };
1✔
2636

2637
        cache.insert(0, row_a.clone(), 1);
1✔
2638
        assert!(cache.get(0).is_some());
1✔
2639

2640
        cache.insert(1, row_b, 1);
1✔
2641
        assert!(cache.get(0).is_none());
1✔
2642
        assert_eq!(cache.get(1).unwrap().row_id.as_deref(), Some("b"));
1✔
2643

2644
        let mut zero_cache = RowCache::default();
1✔
2645
        zero_cache.insert(7, row_a, 0);
1✔
2646
        assert!(zero_cache.get(7).is_none());
1✔
2647
    }
1✔
2648

2649
    #[test]
2650
    fn parquet_cache_reader_for_reports_open_and_parse_errors() {
1✔
2651
        let dir = tempdir().unwrap();
1✔
2652
        let parquet_path = dir.path().join("missing.parquet");
1✔
2653
        let mut cache = ParquetCache::default();
1✔
2654
        let missing = cache.reader_for("hf_test", &parquet_path);
1✔
2655
        assert!(missing.is_err());
1✔
2656

2657
        let invalid_parquet = dir.path().join("invalid.parquet");
1✔
2658
        fs::write(&invalid_parquet, b"not parquet").unwrap();
1✔
2659
        let invalid = cache.reader_for("hf_test", &invalid_parquet);
1✔
2660
        assert!(invalid.is_err());
1✔
2661
    }
1✔
2662

2663
    #[test]
2664
    fn effective_targets_respect_minimum_multiplier_and_sampler_override() {
1✔
2665
        let dir = tempdir().unwrap();
1✔
2666
        let mut config = test_config(dir.path().to_path_buf());
1✔
2667
        config.refresh_batch_multiplier = 0;
1✔
2668
        config.remote_expansion_headroom_multiplier = 0;
1✔
2669
        config.cache_capacity = 9;
1✔
2670
        let source = test_source(config.clone());
1✔
2671

2672
        assert_eq!(source.effective_refresh_batch_target(5), 5);
1✔
2673
        assert_eq!(source.effective_expansion_headroom_rows(), 9);
1✔
2674

2675
        let sampler = SamplerConfig {
1✔
2676
            ingestion_max_records: 4,
1✔
2677
            ..SamplerConfig::default()
1✔
2678
        };
1✔
2679
        *source.sampler_config.lock().unwrap() = Some(sampler);
1✔
2680
        assert_eq!(source.effective_expansion_headroom_rows(), 4);
1✔
2681
    }
1✔
2682

2683
    #[test]
2684
    fn collect_candidates_from_siblings_filters_split_and_tracks_parquet() {
1✔
2685
        let dir = tempdir().unwrap();
1✔
2686
        let config = test_config(dir.path().to_path_buf());
1✔
2687
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
2688
        let siblings = vec![
1✔
2689
            "train/a.ndjson".to_string(),
1✔
2690
            "dev/b.ndjson".to_string(),
1✔
2691
            "train-c.parquet".to_string(),
1✔
2692
            "train-z.txt".to_string(),
1✔
2693
        ];
2694

2695
        let (candidates, saw_parquet) = HuggingFaceRowSource::collect_candidates_from_siblings(
1✔
2696
            &config, &siblings, &accepted, true,
1✔
2697
        );
1✔
2698

2699
        assert!(saw_parquet);
1✔
2700
        assert_eq!(
1✔
2701
            candidates,
2702
            vec!["train/a.ndjson".to_string(), "train-c.parquet".to_string()]
1✔
2703
        );
2704
    }
1✔
2705

2706
    #[test]
2707
    fn collect_candidates_from_siblings_skips_existing_targets() {
1✔
2708
        let dir = tempdir().unwrap();
1✔
2709
        let config = test_config(dir.path().to_path_buf());
1✔
2710
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
2711
        let existing = "train/already.ndjson".to_string();
1✔
2712
        let existing_target = HuggingFaceRowSource::candidate_target_path(&config, &existing);
1✔
2713
        fs::create_dir_all(existing_target.parent().unwrap()).unwrap();
1✔
2714
        fs::write(&existing_target, b"x\n").unwrap();
1✔
2715

2716
        let siblings = vec![existing, "train/new.ndjson".to_string()];
1✔
2717
        let (candidates, _) = HuggingFaceRowSource::collect_candidates_from_siblings(
1✔
2718
            &config, &siblings, &accepted, true,
1✔
2719
        );
1✔
2720
        assert_eq!(candidates, vec!["train/new.ndjson".to_string()]);
1✔
2721
    }
1✔
2722

2723
    #[test]
2724
    fn candidates_from_parquet_manifest_json_filters_and_records_sizes() {
1✔
2725
        let dir = tempdir().unwrap();
1✔
2726
        let config = test_config(dir.path().to_path_buf());
1✔
2727
        let payload = json!({
1✔
2728
            "parquet_files": [
1✔
2729
                {"url": "https://host/x/train/000.parquet", "size": 11},
1✔
2730
                {"url": "https://host/x/train/001.ndjson", "size": 13},
1✔
2731
                {"url": "https://host/x/train/002.txt", "size": 5},
1✔
2732
                {"foo": "missing-url"}
1✔
2733
            ]
2734
        });
2735

2736
        let (candidates, sizes) =
1✔
2737
            HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload).unwrap();
1✔
2738
        assert_eq!(candidates.len(), 2);
1✔
2739
        assert!(
1✔
2740
            candidates
1✔
2741
                .iter()
1✔
2742
                .any(|c| c.ends_with("https://host/x/train/000.parquet"))
1✔
2743
        );
2744
        assert!(
1✔
2745
            candidates
1✔
2746
                .iter()
1✔
2747
                .any(|c| c.ends_with("https://host/x/train/001.ndjson"))
2✔
2748
        );
2749
        assert_eq!(sizes.len(), 2);
1✔
2750
    }
1✔
2751

2752
    #[test]
2753
    fn candidates_from_parquet_manifest_skips_complete_cached_and_replaces_incomplete() {
1✔
2754
        let dir = tempdir().unwrap();
1✔
2755
        let config = test_config(dir.path().to_path_buf());
1✔
2756

2757
        let complete_url = "https://host/datasets/org/ds/resolve/main/train/000.parquet";
1✔
2758
        let complete_candidate = format!("{REMOTE_URL_PREFIX}{complete_url}");
1✔
2759
        let complete_target =
1✔
2760
            HuggingFaceRowSource::candidate_target_path(&config, &complete_candidate);
1✔
2761
        fs::create_dir_all(complete_target.parent().unwrap()).unwrap();
1✔
2762
        fs::write(&complete_target, vec![1u8; 7]).unwrap();
1✔
2763

2764
        let stale_url = "https://host/datasets/org/ds/resolve/main/train/001.parquet";
1✔
2765
        let stale_candidate = format!("{REMOTE_URL_PREFIX}{stale_url}");
1✔
2766
        let stale_target = HuggingFaceRowSource::candidate_target_path(&config, &stale_candidate);
1✔
2767
        fs::create_dir_all(stale_target.parent().unwrap()).unwrap();
1✔
2768
        fs::write(&stale_target, vec![2u8; 3]).unwrap();
1✔
2769

2770
        let payload = json!({
1✔
2771
            "parquet_files": [
1✔
2772
                {"url": complete_url, "size": 7},
1✔
2773
                {"url": stale_url, "size": 9}
1✔
2774
            ]
2775
        });
2776

2777
        let (candidates, sizes) =
1✔
2778
            HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload).unwrap();
1✔
2779
        assert_eq!(candidates.len(), 1);
1✔
2780
        assert!(candidates[0].ends_with(stale_url));
1✔
2781
        assert!(!stale_target.exists());
1✔
2782
        assert_eq!(sizes[&candidates[0]], 9);
1✔
2783
        assert!(complete_target.exists());
1✔
2784
    }
1✔
2785

2786
    #[test]
2787
    fn candidates_from_parquet_manifest_errors_when_removing_incomplete_target_fails() {
1✔
2788
        let dir = tempdir().unwrap();
1✔
2789
        let config = test_config(dir.path().to_path_buf());
1✔
2790
        let url = "https://host/datasets/org/ds/resolve/main/train/blocked.parquet";
1✔
2791
        let candidate = format!("{REMOTE_URL_PREFIX}{url}");
1✔
2792
        let target = HuggingFaceRowSource::candidate_target_path(&config, &candidate);
1✔
2793
        fs::create_dir_all(&target).unwrap();
1✔
2794

2795
        let payload = json!({
1✔
2796
            "parquet_files": [
1✔
2797
                {"url": url, "size": 1}
1✔
2798
            ]
2799
        });
2800

2801
        let err = HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload);
1✔
2802
        assert!(err.is_err());
1✔
2803
    }
1✔
2804

2805
    #[test]
2806
    fn normalized_shard_extensions_trims_dots_and_lowercases() {
1✔
2807
        let dir = tempdir().unwrap();
1✔
2808
        let mut config = test_config(dir.path().to_path_buf());
1✔
2809
        config.shard_extensions = vec![".PARQUET".into(), " ndjson ".into()];
1✔
2810
        let normalized = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
2811
        assert_eq!(
1✔
2812
            normalized,
2813
            vec!["parquet".to_string(), "ndjson".to_string()]
1✔
2814
        );
2815
    }
1✔
2816

2817
    #[test]
2818
    fn manifest_usage_bytes_locked_counts_only_manifest_shards() {
1✔
2819
        let dir = tempdir().unwrap();
1✔
2820
        let config = test_config(dir.path().to_path_buf());
1✔
2821
        let source = test_source(config);
1✔
2822
        let manifest_root = source.manifest_cache_root();
1✔
2823
        fs::create_dir_all(&manifest_root).unwrap();
1✔
2824

2825
        let manifest_file = manifest_root.join("a.parquet");
1✔
2826
        fs::write(&manifest_file, vec![1u8; 7]).unwrap();
1✔
2827
        let local_file = source.config.snapshot_dir.join("local.ndjson");
1✔
2828
        fs::write(&local_file, vec![2u8; 9]).unwrap();
1✔
2829

2830
        let state = SourceState {
1✔
2831
            materialized_rows: 2,
1✔
2832
            total_rows: None,
1✔
2833
            shards: vec![
1✔
2834
                ShardIndex {
1✔
2835
                    path: manifest_file,
1✔
2836
                    global_start: 0,
1✔
2837
                    row_count: 1,
1✔
2838
                    is_parquet: true,
1✔
2839
                    parquet_row_groups: vec![(0, 1)],
1✔
2840
                    checkpoints: Vec::new(),
1✔
2841
                },
1✔
2842
                ShardIndex {
1✔
2843
                    path: local_file,
1✔
2844
                    global_start: 1,
1✔
2845
                    row_count: 1,
1✔
2846
                    is_parquet: false,
1✔
2847
                    parquet_row_groups: Vec::new(),
1✔
2848
                    checkpoints: vec![0],
1✔
2849
                },
1✔
2850
            ],
1✔
2851
            remote_candidates: None,
1✔
2852
            remote_candidate_sizes: HashMap::new(),
1✔
2853
            next_remote_idx: 0,
1✔
2854
        };
1✔
2855

2856
        assert_eq!(source.manifest_usage_bytes_locked(&state), 7);
1✔
2857
    }
1✔
2858

2859
    #[test]
2860
    fn build_shard_index_errors_when_parquet_present_but_not_accepted() {
1✔
2861
        let dir = tempdir().unwrap();
1✔
2862
        fs::write(dir.path().join("rows.parquet"), b"fake").unwrap();
1✔
2863
        let mut config = test_config(dir.path().to_path_buf());
1✔
2864
        config.shard_extensions = vec!["ndjson".to_string()];
1✔
2865

2866
        let result = HuggingFaceRowSource::build_shard_index(&config);
1✔
2867
        assert!(result.is_err());
1✔
2868
    }
1✔
2869

2870
    #[test]
2871
    fn locate_parquet_group_maps_offsets_and_reports_missing() {
1✔
2872
        let dir = tempdir().unwrap();
1✔
2873
        let config = test_config(dir.path().to_path_buf());
1✔
2874
        let source = test_source(config);
1✔
2875
        let shard = ShardIndex {
1✔
2876
            path: dir.path().join("rows.parquet"),
1✔
2877
            global_start: 0,
1✔
2878
            row_count: 6,
1✔
2879
            is_parquet: true,
1✔
2880
            parquet_row_groups: vec![(0, 2), (2, 2), (4, 2)],
1✔
2881
            checkpoints: Vec::new(),
1✔
2882
        };
1✔
2883

2884
        let mapped = source.locate_parquet_group(&shard, 3).unwrap();
1✔
2885
        assert_eq!(mapped, (1, 1));
1✔
2886
        let missing = source.locate_parquet_group(&shard, 99);
1✔
2887
        assert!(missing.is_err());
1✔
2888
    }
1✔
2889

2890
    #[test]
2891
    fn parse_row_role_columns_mode_builds_expected_fields() {
1✔
2892
        let dir = tempdir().unwrap();
1✔
2893
        let mut config = test_config(dir.path().to_path_buf());
1✔
2894
        config.anchor_column = Some("anchor".into());
1✔
2895
        config.positive_column = Some("positive".into());
1✔
2896
        config.context_columns = vec!["ctx1".into(), "ctx2".into()];
1✔
2897
        let source = test_source(config);
1✔
2898

2899
        let row = source
1✔
2900
            .parse_row(
1✔
2901
                2,
2902
                &json!({"id":"r","anchor":"a","positive":"p","ctx1":"c1","ctx2":2}),
1✔
2903
            )
2904
            .unwrap();
1✔
2905
        assert_eq!(row.text_fields.len(), 4);
1✔
2906
        assert_eq!(row.text_fields[0].name, "anchor");
1✔
2907
        assert_eq!(row.text_fields[1].name, "positive");
1✔
2908
    }
1✔
2909

2910
    #[test]
2911
    fn parse_row_role_columns_mode_errors_on_missing_or_empty_values() {
1✔
2912
        let dir = tempdir().unwrap();
1✔
2913
        let mut config = test_config(dir.path().to_path_buf());
1✔
2914
        config.anchor_column = Some("anchor".into());
1✔
2915
        config.context_columns = vec!["ctx".into()];
1✔
2916
        let source = test_source(config);
1✔
2917

2918
        let missing = source.parse_row(0, &json!({"anchor":"a"}));
1✔
2919
        assert!(missing.is_err());
1✔
2920

2921
        let empty_anchor = source.parse_row(1, &json!({"anchor":"   ", "ctx":"ok"}));
1✔
2922
        assert!(empty_anchor.is_err());
1✔
2923
    }
1✔
2924

2925
    #[test]
2926
    fn row_to_record_uses_anchor_for_positive_when_single_field() {
1✔
2927
        let dir = tempdir().unwrap();
1✔
2928
        let config = test_config(dir.path().to_path_buf());
1✔
2929
        let source = test_source(config);
1✔
2930
        let row = RowView {
1✔
2931
            row_id: Some("r1".into()),
1✔
2932
            timestamp: None,
1✔
2933
            text_fields: vec![RowTextField {
1✔
2934
                name: "text".into(),
1✔
2935
                text: "alpha".into(),
1✔
2936
            }],
1✔
2937
        };
1✔
2938

2939
        let record = source.row_to_record(&row, 0).unwrap().unwrap();
1✔
2940
        assert_eq!(record.sections.len(), 2);
1✔
2941
        assert_eq!(record.sections[0].text, record.sections[1].text);
1✔
2942
    }
1✔
2943

2944
    #[test]
2945
    fn read_line_at_errors_on_unexpected_eof_while_scanning() {
1✔
2946
        let dir = tempdir().unwrap();
1✔
2947
        let path = dir.path().join("rows.jsonl");
1✔
2948
        fs::write(&path, b"{\"text\":\"a\"}\n").unwrap();
1✔
2949
        let mut config = test_config(dir.path().to_path_buf());
1✔
2950
        config.checkpoint_stride = 1;
1✔
2951
        let source = test_source(config.clone());
1✔
2952
        let mut shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
2953
            .unwrap()
1✔
2954
            .unwrap();
1✔
2955
        shard.checkpoints = vec![0];
1✔
2956

2957
        let err = source.read_line_at(&shard, 3);
1✔
2958
        assert!(err.is_err());
1✔
2959
    }
1✔
2960

2961
    #[test]
2962
    fn target_matches_expected_size_is_false_for_missing_path() {
1✔
2963
        let dir = tempdir().unwrap();
1✔
2964
        let missing = dir.path().join("missing.bin");
1✔
2965
        assert!(!HuggingFaceRowSource::target_matches_expected_size(
1✔
2966
            &missing,
1✔
2967
            Some(1)
1✔
2968
        ));
1✔
2969
    }
1✔
2970

2971
    #[test]
2972
    fn candidate_target_path_uses_fallback_suffix_without_resolve_segment() {
1✔
2973
        let dir = tempdir().unwrap();
1✔
2974
        let config = test_config(dir.path().to_path_buf());
1✔
2975
        let candidate = "url::https://example.com/raw/file.parquet";
1✔
2976
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
2977
        assert!(target.ends_with("_parquet_manifest/parquet/unknown.parquet"));
1✔
2978
    }
1✔
2979

2980
    #[test]
2981
    fn persist_shard_sequence_is_noop_without_remote_candidates() {
1✔
2982
        let dir = tempdir().unwrap();
1✔
2983
        let config = test_config(dir.path().to_path_buf());
1✔
2984
        let source = test_source(config.clone());
1✔
2985
        let state = SourceState {
1✔
2986
            materialized_rows: 0,
1✔
2987
            total_rows: None,
1✔
2988
            shards: Vec::new(),
1✔
2989
            remote_candidates: None,
1✔
2990
            remote_candidate_sizes: HashMap::new(),
1✔
2991
            next_remote_idx: 0,
1✔
2992
        };
1✔
2993

2994
        source.persist_shard_sequence_locked(&state).unwrap();
1✔
2995
        assert!(!HuggingFaceRowSource::shard_sequence_state_path(&config).exists());
1✔
2996
    }
1✔
2997

2998
    #[test]
2999
    fn load_persisted_shard_sequence_returns_none_for_identity_mismatch() {
1✔
3000
        let dir = tempdir().unwrap();
1✔
3001
        let config = test_config(dir.path().to_path_buf());
1✔
3002
        let state_path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
3003
        fs::create_dir_all(state_path.parent().unwrap()).unwrap();
1✔
3004
        fs::write(
1✔
3005
            &state_path,
1✔
3006
            serde_json::to_vec_pretty(&json!({
1✔
3007
                "version": 1,
1✔
3008
                "source_id": "different",
1✔
3009
                "dataset": config.dataset,
1✔
3010
                "config": config.config,
1✔
3011
                "split": config.split,
1✔
3012
                "sampler_seed": null,
1✔
3013
                "candidates": ["train/0.ndjson"],
1✔
3014
                "candidate_sizes": {},
1✔
3015
                "next_remote_idx": 0
1✔
3016
            }))
1✔
3017
            .unwrap(),
1✔
3018
        )
3019
        .unwrap();
1✔
3020

3021
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, None).unwrap();
1✔
3022
        assert!(loaded.is_none());
1✔
3023
    }
1✔
3024

3025
    #[test]
3026
    fn parse_row_falls_back_to_synthetic_id_when_missing_id_column() {
1✔
3027
        let dir = tempdir().unwrap();
1✔
3028
        let mut config = test_config(dir.path().to_path_buf());
1✔
3029
        config.id_column = Some("id".into());
1✔
3030
        let source = test_source(config);
1✔
3031

3032
        let row = source.parse_row(42, &json!({"text": "hello"})).unwrap();
1✔
3033
        assert_eq!(row.row_id, Some("org/dataset:train:42".to_string()));
1✔
3034
    }
1✔
3035

3036
    #[test]
3037
    fn row_to_record_falls_back_to_row_index_when_row_id_missing() {
1✔
3038
        let dir = tempdir().unwrap();
1✔
3039
        let config = test_config(dir.path().to_path_buf());
1✔
3040
        let source = test_source(config);
1✔
3041
        let row = RowView {
1✔
3042
            row_id: None,
1✔
3043
            timestamp: None,
1✔
3044
            text_fields: vec![RowTextField {
1✔
3045
                name: "text".into(),
1✔
3046
                text: "body".into(),
1✔
3047
            }],
1✔
3048
        };
1✔
3049

3050
        let record = source.row_to_record(&row, 7).unwrap().unwrap();
1✔
3051
        assert!(record.id.ends_with("::row_7"));
1✔
3052
    }
1✔
3053

3054
    #[test]
3055
    fn locate_shard_returns_none_for_out_of_range_index() {
1✔
3056
        let shards = vec![ShardIndex {
1✔
3057
            path: PathBuf::from("a.ndjson"),
1✔
3058
            global_start: 0,
1✔
3059
            row_count: 2,
1✔
3060
            is_parquet: false,
1✔
3061
            parquet_row_groups: Vec::new(),
1✔
3062
            checkpoints: vec![0],
1✔
3063
        }];
1✔
3064

3065
        assert!(HuggingFaceRowSource::locate_shard(&shards, 5).is_none());
1✔
3066
    }
1✔
3067

3068
    #[test]
3069
    fn read_row_batch_errors_when_row_not_mappable_to_shard() {
1✔
3070
        let dir = tempdir().unwrap();
1✔
3071
        let config = test_config(dir.path().to_path_buf());
1✔
3072
        let source = test_source(config);
1✔
3073
        {
1✔
3074
            let mut state = source.state.lock().unwrap();
1✔
3075
            state.materialized_rows = 1;
1✔
3076
            state.total_rows = Some(1);
1✔
3077
            state.shards.clear();
1✔
3078
        }
1✔
3079

3080
        let mut out = Vec::new();
1✔
3081
        let err = source.read_row_batch(&[0], &mut out, Some(1));
1✔
3082
        assert!(err.is_err());
1✔
3083
    }
1✔
3084

3085
    #[test]
3086
    fn len_hint_applies_max_rows_cap() {
1✔
3087
        let dir = tempdir().unwrap();
1✔
3088
        let mut config = test_config(dir.path().to_path_buf());
1✔
3089
        config.max_rows = Some(3);
1✔
3090
        let source = test_source(config);
1✔
3091
        {
1✔
3092
            let mut state = source.state.lock().unwrap();
1✔
3093
            state.materialized_rows = 2;
1✔
3094
            state.total_rows = Some(100);
1✔
3095
        }
1✔
3096
        assert_eq!(source.len_hint(), Some(3));
1✔
3097
    }
1✔
3098

3099
    #[test]
3100
    fn enforce_disk_cap_returns_false_when_disabled_or_under_limit() {
1✔
3101
        let dir = tempdir().unwrap();
1✔
3102
        let mut config = test_config(dir.path().to_path_buf());
1✔
3103
        config.local_disk_cap_bytes = None;
1✔
3104
        let source = test_source(config);
1✔
3105
        let mut state = SourceState {
1✔
3106
            materialized_rows: 0,
1✔
3107
            total_rows: None,
1✔
3108
            shards: Vec::new(),
1✔
3109
            remote_candidates: None,
1✔
3110
            remote_candidate_sizes: HashMap::new(),
1✔
3111
            next_remote_idx: 0,
1✔
3112
        };
1✔
3113
        let protected = dir.path().join("p");
1✔
3114
        assert!(
1✔
3115
            !source
1✔
3116
                .enforce_disk_cap_locked(&mut state, &protected)
1✔
3117
                .unwrap()
1✔
3118
        );
3119

3120
        let mut config2 = test_config(dir.path().to_path_buf());
1✔
3121
        config2.local_disk_cap_bytes = Some(10_000);
1✔
3122
        let source2 = test_source(config2);
1✔
3123
        let manifest_root = source2.manifest_cache_root();
1✔
3124
        fs::create_dir_all(&manifest_root).unwrap();
1✔
3125
        let shard_path = manifest_root.join("small.parquet");
1✔
3126
        fs::write(&shard_path, vec![1u8; 32]).unwrap();
1✔
3127
        let mut state2 = SourceState {
1✔
3128
            materialized_rows: 1,
1✔
3129
            total_rows: None,
1✔
3130
            shards: vec![ShardIndex {
1✔
3131
                path: shard_path,
1✔
3132
                global_start: 0,
1✔
3133
                row_count: 1,
1✔
3134
                is_parquet: true,
1✔
3135
                parquet_row_groups: vec![(0, 1)],
1✔
3136
                checkpoints: Vec::new(),
1✔
3137
            }],
1✔
3138
            remote_candidates: None,
1✔
3139
            remote_candidate_sizes: HashMap::new(),
1✔
3140
            next_remote_idx: 0,
1✔
3141
        };
1✔
3142
        assert!(
1✔
3143
            !source2
1✔
3144
                .enforce_disk_cap_locked(&mut state2, &protected)
1✔
3145
                .unwrap()
1✔
3146
        );
3147
    }
1✔
3148

3149
    #[test]
3150
    fn default_triplet_recipes_returns_expected_shape() {
1✔
3151
        let dir = tempdir().unwrap();
1✔
3152
        let config = test_config(dir.path().to_path_buf());
1✔
3153
        let source = test_source(config);
1✔
3154
        let recipes = source.default_triplet_recipes();
1✔
3155
        assert_eq!(recipes.len(), 1);
1✔
3156
        assert_eq!(recipes[0].name, "huggingface_anchor_context");
1✔
3157
    }
1✔
3158

3159
    #[test]
3160
    fn download_and_materialize_shard_url_short_circuits_when_cached_complete() {
1✔
3161
        let dir = tempdir().unwrap();
1✔
3162
        let config = test_config(dir.path().to_path_buf());
1✔
3163
        let candidate = "url::https://host/datasets/org/ds/resolve/main/train/ok.ndjson";
1✔
3164
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
3165
        fs::create_dir_all(target.parent().unwrap()).unwrap();
1✔
3166
        fs::write(&target, b"ok").unwrap();
1✔
3167

3168
        let resolved =
1✔
3169
            HuggingFaceRowSource::download_and_materialize_shard(&config, candidate, Some(2))
1✔
3170
                .unwrap();
1✔
3171
        assert_eq!(resolved, target);
1✔
3172
    }
1✔
3173

3174
    #[test]
3175
    fn download_and_materialize_shard_url_replaces_stale_part_file() {
1✔
3176
        let dir = tempdir().unwrap();
1✔
3177
        let config = test_config(dir.path().to_path_buf());
1✔
3178
        let payload = b"{\"text\":\"a\"}\n".to_vec();
1✔
3179
        let (base_url, server) = spawn_one_shot_http(payload.clone());
1✔
3180
        let candidate = format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-x.ndjson");
1✔
3181
        let target = HuggingFaceRowSource::candidate_target_path(&config, &candidate);
1✔
3182
        let temp_target = target.with_extension("part");
1✔
3183
        fs::create_dir_all(temp_target.parent().unwrap()).unwrap();
1✔
3184
        fs::write(&temp_target, b"stale").unwrap();
1✔
3185

3186
        let out = HuggingFaceRowSource::download_and_materialize_shard(&config, &candidate, None)
1✔
3187
            .unwrap();
1✔
3188
        server.join().unwrap();
1✔
3189

3190
        assert_eq!(out, target);
1✔
3191
        assert_eq!(fs::read(&target).unwrap(), payload);
1✔
3192
    }
1✔
3193

3194
    #[test]
3195
    fn download_next_remote_shard_skips_when_max_rows_already_reached() {
1✔
3196
        let dir = tempdir().unwrap();
1✔
3197
        let mut config = test_config(dir.path().to_path_buf());
1✔
3198
        config.max_rows = Some(0);
1✔
3199
        let source = test_source(config);
1✔
3200
        let payload = b"{\"text\":\"x\"}\n".to_vec();
1✔
3201
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3202
        let candidate =
1✔
3203
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-200.ndjson");
1✔
3204

3205
        {
1✔
3206
            let mut state = source.state.lock().unwrap();
1✔
3207
            state.remote_candidates = Some(vec![candidate]);
1✔
3208
            state.next_remote_idx = 0;
1✔
3209
            state.materialized_rows = 0;
1✔
3210
        }
1✔
3211

3212
        assert!(source.download_next_remote_shard().unwrap());
1✔
3213
        server.join().unwrap();
1✔
3214
        let state = source.state.lock().unwrap();
1✔
3215
        assert_eq!(state.materialized_rows, 0);
1✔
3216
        assert!(state.shards.is_empty());
1✔
3217
    }
1✔
3218

3219
    #[test]
3220
    fn download_next_remote_shard_skips_zero_row_download() {
1✔
3221
        let dir = tempdir().unwrap();
1✔
3222
        let config = test_config(dir.path().to_path_buf());
1✔
3223
        let source = test_source(config);
1✔
3224
        let payload = Vec::<u8>::new();
1✔
3225
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3226
        let candidate =
1✔
3227
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-empty.ndjson");
1✔
3228

3229
        {
1✔
3230
            let mut state = source.state.lock().unwrap();
1✔
3231
            state.remote_candidates = Some(vec![candidate]);
1✔
3232
            state.next_remote_idx = 0;
1✔
3233
        }
1✔
3234

3235
        assert!(source.download_next_remote_shard().unwrap());
1✔
3236
        server.join().unwrap();
1✔
3237
        let state = source.state.lock().unwrap();
1✔
3238
        assert_eq!(state.materialized_rows, 0);
1✔
3239
        assert!(state.shards.is_empty());
1✔
3240
    }
1✔
3241

3242
    #[test]
3243
    fn read_row_batch_errors_when_parquet_reader_cannot_open_file() {
1✔
3244
        let dir = tempdir().unwrap();
1✔
3245
        let config = test_config(dir.path().to_path_buf());
1✔
3246
        let source = test_source(config);
1✔
3247
        {
1✔
3248
            let mut state = source.state.lock().unwrap();
1✔
3249
            state.materialized_rows = 1;
1✔
3250
            state.total_rows = Some(1);
1✔
3251
            state.shards = vec![ShardIndex {
1✔
3252
                path: dir.path().join("missing.parquet"),
1✔
3253
                global_start: 0,
1✔
3254
                row_count: 1,
1✔
3255
                is_parquet: true,
1✔
3256
                parquet_row_groups: vec![(0, 1)],
1✔
3257
                checkpoints: Vec::new(),
1✔
3258
            }];
1✔
3259
        }
1✔
3260

3261
        let mut out = Vec::new();
1✔
3262
        let err = source.read_row_batch(&[0], &mut out, Some(1));
1✔
3263
        assert!(err.is_err());
1✔
3264
    }
1✔
3265

3266
    #[test]
3267
    fn refresh_exercises_large_total_progress_branch() {
1✔
3268
        let dir = tempdir().unwrap();
1✔
3269
        let path = dir.path().join("rows.jsonl");
1✔
3270
        let line = b"{\"id\":\"r\",\"text\":\"v\"}\n";
1✔
3271
        let mut bytes = Vec::with_capacity(line.len() * 10_000);
1✔
3272
        for _ in 0..10_000 {
10,000✔
3273
            bytes.extend_from_slice(line);
10,000✔
3274
        }
10,000✔
3275
        fs::write(&path, bytes).unwrap();
1✔
3276

3277
        let mut config = test_config(dir.path().to_path_buf());
1✔
3278
        config.checkpoint_stride = 256;
1✔
3279
        config.refresh_batch_multiplier = 1;
1✔
3280
        let source = test_source(config.clone());
1✔
3281
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3282
            .unwrap()
1✔
3283
            .unwrap();
1✔
3284

3285
        {
1✔
3286
            let mut state = source.state.lock().unwrap();
1✔
3287
            state.materialized_rows = 10_000;
1✔
3288
            state.total_rows = Some(10_000);
1✔
3289
            state.shards = vec![shard];
1✔
3290
        }
1✔
3291

3292
        let snapshot = source.refresh(None, Some(1)).unwrap();
1✔
3293
        assert_eq!(snapshot.records.len(), 1);
1✔
3294
    }
1✔
3295

3296
    #[test]
3297
    fn shard_size_bytes_returns_zero_for_missing_path() {
1✔
3298
        let dir = tempdir().unwrap();
1✔
3299
        let missing = dir.path().join("missing.file");
1✔
3300
        assert_eq!(HuggingFaceRowSource::shard_size_bytes(&missing), 0);
1✔
3301
    }
1✔
3302

3303
    #[test]
3304
    fn load_persisted_shard_sequence_errors_on_invalid_json() {
1✔
3305
        let dir = tempdir().unwrap();
1✔
3306
        let config = test_config(dir.path().to_path_buf());
1✔
3307
        let path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
3308
        fs::create_dir_all(path.parent().unwrap()).unwrap();
1✔
3309
        fs::write(&path, b"{not-valid-json").unwrap();
1✔
3310

3311
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, None);
1✔
3312
        assert!(loaded.is_err());
1✔
3313
    }
1✔
3314

3315
    #[test]
3316
    fn rotate_candidates_deterministically_is_noop_for_singleton() {
1✔
3317
        let dir = tempdir().unwrap();
1✔
3318
        let config = test_config(dir.path().to_path_buf());
1✔
3319
        let mut candidates = vec!["one".to_string()];
1✔
3320
        HuggingFaceRowSource::rotate_candidates_deterministically(&config, &mut candidates);
1✔
3321
        assert_eq!(candidates, vec!["one".to_string()]);
1✔
3322
    }
1✔
3323

3324
    #[test]
3325
    fn extract_split_row_count_returns_none_when_missing_entries() {
1✔
3326
        let payload = json!({"size": {"configs": [{"config": "other", "splits": []}]}});
1✔
3327
        let rows = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
3328
            &payload, "default", "train",
1✔
3329
        );
3330
        assert!(rows.is_none());
1✔
3331
    }
1✔
3332

3333
    #[test]
3334
    fn candidates_from_parquet_manifest_json_returns_empty_without_entries() {
1✔
3335
        let dir = tempdir().unwrap();
1✔
3336
        let config = test_config(dir.path().to_path_buf());
1✔
3337
        let payload = json!({"other": []});
1✔
3338
        let (candidates, sizes) =
1✔
3339
            HuggingFaceRowSource::candidates_from_parquet_manifest_json(&config, &payload).unwrap();
1✔
3340
        assert!(candidates.is_empty());
1✔
3341
        assert!(sizes.is_empty());
1✔
3342
    }
1✔
3343

3344
    #[test]
3345
    fn read_line_at_errors_on_unexpected_eof_while_reading_target_row() {
1✔
3346
        let dir = tempdir().unwrap();
1✔
3347
        let path = dir.path().join("rows.jsonl");
1✔
3348
        fs::write(&path, b"{\"text\":\"a\"}\n").unwrap();
1✔
3349
        let mut config = test_config(dir.path().to_path_buf());
1✔
3350
        config.checkpoint_stride = 1;
1✔
3351
        let source = test_source(config.clone());
1✔
3352
        let mut shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3353
            .unwrap()
1✔
3354
            .unwrap();
1✔
3355
        let end = fs::metadata(&path).unwrap().len();
1✔
3356
        shard.checkpoints = vec![0, end];
1✔
3357

3358
        let err = source.read_line_at(&shard, 1);
1✔
3359
        assert!(err.is_err());
1✔
3360
    }
1✔
3361

3362
    #[test]
3363
    fn load_persisted_shard_sequence_returns_none_when_state_missing() {
1✔
3364
        let dir = tempdir().unwrap();
1✔
3365
        let config = test_config(dir.path().to_path_buf());
1✔
3366
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, None).unwrap();
1✔
3367
        assert!(loaded.is_none());
1✔
3368
    }
1✔
3369

3370
    #[test]
3371
    fn persist_shard_sequence_clamps_next_index_on_write() {
1✔
3372
        let dir = tempdir().unwrap();
1✔
3373
        let config = test_config(dir.path().to_path_buf());
1✔
3374
        let source = test_source(config.clone());
1✔
3375
        let state = SourceState {
1✔
3376
            materialized_rows: 0,
1✔
3377
            total_rows: None,
1✔
3378
            shards: Vec::new(),
1✔
3379
            remote_candidates: Some(vec!["a".into(), "b".into()]),
1✔
3380
            remote_candidate_sizes: HashMap::new(),
1✔
3381
            next_remote_idx: 99,
1✔
3382
        };
1✔
3383

3384
        source.persist_shard_sequence_locked(&state).unwrap();
1✔
3385
        let raw =
1✔
3386
            fs::read_to_string(HuggingFaceRowSource::shard_sequence_state_path(&config)).unwrap();
1✔
3387
        let parsed: Value = serde_json::from_str(&raw).unwrap();
1✔
3388
        assert_eq!(
1✔
3389
            parsed.get("next_remote_idx").and_then(Value::as_u64),
1✔
3390
            Some(2)
3391
        );
3392
    }
1✔
3393

3394
    #[test]
3395
    fn materialize_local_file_replaces_target_when_size_differs() {
1✔
3396
        let dir = tempdir().unwrap();
1✔
3397
        let config = test_config(dir.path().to_path_buf());
1✔
3398
        let src = dir.path().join("src.ndjson");
1✔
3399
        let dst = dir.path().join("dst.ndjson");
1✔
3400
        fs::write(&src, b"newer\n").unwrap();
1✔
3401
        fs::write(&dst, b"old\n").unwrap();
1✔
3402

3403
        HuggingFaceRowSource::materialize_local_file(&config, &src, &dst).unwrap();
1✔
3404
        assert_eq!(fs::read(&dst).unwrap(), b"newer\n");
1✔
3405
    }
1✔
3406

3407
    #[test]
3408
    fn row_to_record_preserves_explicit_timestamp() {
1✔
3409
        let dir = tempdir().unwrap();
1✔
3410
        let config = test_config(dir.path().to_path_buf());
1✔
3411
        let source = test_source(config);
1✔
3412
        let ts = Utc::now();
1✔
3413
        let row = RowView {
1✔
3414
            row_id: Some("r1".into()),
1✔
3415
            timestamp: Some(ts),
1✔
3416
            text_fields: vec![RowTextField {
1✔
3417
                name: "text".into(),
1✔
3418
                text: "alpha".into(),
1✔
3419
            }],
1✔
3420
        };
1✔
3421

3422
        let record = source.row_to_record(&row, 0).unwrap().unwrap();
1✔
3423
        assert_eq!(record.created_at, ts);
1✔
3424
        assert_eq!(record.updated_at, ts);
1✔
3425
    }
1✔
3426

3427
    #[test]
3428
    fn parse_row_text_columns_accept_numeric_values() {
1✔
3429
        let dir = tempdir().unwrap();
1✔
3430
        let mut config = test_config(dir.path().to_path_buf());
1✔
3431
        config.text_columns = vec!["score".into()];
1✔
3432
        let source = test_source(config);
1✔
3433

3434
        let row = source.parse_row(0, &json!({"score": 123})).unwrap();
1✔
3435
        assert_eq!(row.text_fields.len(), 1);
1✔
3436
        assert_eq!(row.text_fields[0].text, "123");
1✔
3437
    }
1✔
3438

3439
    #[test]
3440
    fn len_hint_returns_zero_when_max_rows_is_zero() {
1✔
3441
        let dir = tempdir().unwrap();
1✔
3442
        let mut config = test_config(dir.path().to_path_buf());
1✔
3443
        config.max_rows = Some(0);
1✔
3444
        let source = test_source(config);
1✔
3445
        assert_eq!(source.len_hint(), Some(0));
1✔
3446
    }
1✔
3447

3448
    #[test]
3449
    fn refresh_limit_none_reads_up_to_total() {
1✔
3450
        let dir = tempdir().unwrap();
1✔
3451
        let path = dir.path().join("rows.jsonl");
1✔
3452
        fs::write(
1✔
3453
            &path,
1✔
3454
            b"{\"id\":\"r1\",\"text\":\"a\"}\n{\"id\":\"r2\",\"text\":\"b\"}\n",
3455
        )
3456
        .unwrap();
1✔
3457
        let mut config = test_config(dir.path().to_path_buf());
1✔
3458
        config.checkpoint_stride = 1;
1✔
3459
        config.refresh_batch_multiplier = 1;
1✔
3460
        let source = test_source(config.clone());
1✔
3461
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3462
            .unwrap()
1✔
3463
            .unwrap();
1✔
3464
        {
1✔
3465
            let mut state = source.state.lock().unwrap();
1✔
3466
            state.materialized_rows = 2;
1✔
3467
            state.total_rows = Some(2);
1✔
3468
            state.shards = vec![shard];
1✔
3469
        }
1✔
3470

3471
        let snapshot = source.refresh(None, None).unwrap();
1✔
3472
        assert_eq!(snapshot.records.len(), 2);
1✔
3473
    }
1✔
3474

3475
    #[test]
3476
    fn read_row_batch_skips_unavailable_indices_without_error() {
1✔
3477
        let dir = tempdir().unwrap();
1✔
3478
        let config = test_config(dir.path().to_path_buf());
1✔
3479
        let source = test_source(config);
1✔
3480
        {
1✔
3481
            let mut state = source.state.lock().unwrap();
1✔
3482
            state.materialized_rows = 0;
1✔
3483
            state.total_rows = Some(0);
1✔
3484
            state.remote_candidates = Some(Vec::new());
1✔
3485
        }
1✔
3486

3487
        let mut out = Vec::new();
1✔
3488
        source.read_row_batch(&[0, 1], &mut out, Some(2)).unwrap();
1✔
3489
        assert!(out.is_empty());
1✔
3490
    }
1✔
3491

3492
    #[test]
3493
    fn candidate_target_path_maps_remote_urls_under_manifest_root() {
1✔
3494
        let dir = tempdir().unwrap();
1✔
3495
        let config = test_config(dir.path().to_path_buf());
1✔
3496
        let candidate =
1✔
3497
            "url::https://huggingface.co/datasets/org/ds/resolve/main/train/part-000.parquet";
1✔
3498
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
3499
        assert!(target.ends_with("_parquet_manifest/main/train/part-000.parquet"));
1✔
3500
    }
1✔
3501

3502
    #[test]
3503
    fn candidate_target_path_keeps_local_candidates_relative() {
1✔
3504
        let dir = tempdir().unwrap();
1✔
3505
        let config = test_config(dir.path().to_path_buf());
1✔
3506
        let candidate = "train/part-001.ndjson";
1✔
3507
        let target = HuggingFaceRowSource::candidate_target_path(&config, candidate);
1✔
3508
        assert_eq!(target, config.snapshot_dir.join(candidate));
1✔
3509
    }
1✔
3510

3511
    #[test]
3512
    fn target_matches_expected_size_validates_when_expected_is_provided() {
1✔
3513
        let dir = tempdir().unwrap();
1✔
3514
        let path = dir.path().join("payload.bin");
1✔
3515
        fs::write(&path, vec![0u8; 5]).unwrap();
1✔
3516

3517
        assert!(HuggingFaceRowSource::target_matches_expected_size(
1✔
3518
            &path,
1✔
3519
            Some(5)
1✔
3520
        ));
3521
        assert!(!HuggingFaceRowSource::target_matches_expected_size(
1✔
3522
            &path,
1✔
3523
            Some(4)
1✔
3524
        ));
1✔
3525
        assert!(HuggingFaceRowSource::target_matches_expected_size(
1✔
3526
            &path, None
1✔
3527
        ));
3528
    }
1✔
3529

3530
    #[test]
3531
    fn parquet_row_group_map_and_index_single_shard_cover_success_path() {
1✔
3532
        let dir = tempdir().unwrap();
1✔
3533
        let path = dir.path().join("rows.parquet");
1✔
3534
        write_parquet_fixture(&path, &[("r1", "alpha"), ("r2", "beta"), ("r3", "gamma")]);
1✔
3535
        let config = test_config(dir.path().to_path_buf());
1✔
3536

3537
        let (total_rows, groups) =
1✔
3538
            HuggingFaceRowSource::parquet_row_group_map(&config, &path).unwrap();
1✔
3539
        assert_eq!(total_rows, 3);
1✔
3540
        assert!(!groups.is_empty());
1✔
3541

3542
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3543
            .unwrap()
1✔
3544
            .unwrap();
1✔
3545
        assert!(shard.is_parquet);
1✔
3546
        assert_eq!(shard.row_count, 3);
1✔
3547
        assert!(shard.checkpoints.is_empty());
1✔
3548
    }
1✔
3549

3550
    #[test]
3551
    fn read_row_batch_reads_parquet_rows_and_uses_cache_on_repeat() {
1✔
3552
        let dir = tempdir().unwrap();
1✔
3553
        let path = dir.path().join("rows.parquet");
1✔
3554
        write_parquet_fixture(&path, &[("r10", "ten"), ("r11", "eleven")]);
1✔
3555

3556
        let config = test_config(dir.path().to_path_buf());
1✔
3557
        let source = test_source(config.clone());
1✔
3558
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
3559
            .unwrap()
1✔
3560
            .unwrap();
1✔
3561
        {
1✔
3562
            let mut state = source.state.lock().unwrap();
1✔
3563
            state.materialized_rows = 2;
1✔
3564
            state.total_rows = Some(2);
1✔
3565
            state.shards = vec![shard];
1✔
3566
        }
1✔
3567

3568
        let mut first = Vec::new();
1✔
3569
        source.read_row_batch(&[0, 1], &mut first, None).unwrap();
1✔
3570
        assert_eq!(first.len(), 2);
1✔
3571
        assert!(first.iter().any(|record| record.id.ends_with("::r10")));
1✔
3572

3573
        let mut second = Vec::new();
1✔
3574
        source.read_row_batch(&[0, 1], &mut second, None).unwrap();
1✔
3575
        assert_eq!(second.len(), 2);
1✔
3576
    }
1✔
3577

3578
    #[test]
3579
    fn ensure_row_available_resumes_persisted_sequence_and_bootstraps() {
1✔
3580
        let dir = tempdir().unwrap();
1✔
3581
        let config = test_config(dir.path().to_path_buf());
1✔
3582
        let source = test_source(config.clone());
1✔
3583

3584
        let payload =
1✔
3585
            b"{\"id\":\"r1\",\"text\":\"alpha\"}\n{\"id\":\"r2\",\"text\":\"beta\"}\n".to_vec();
1✔
3586
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3587
        let candidate =
1✔
3588
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/persisted.ndjson");
1✔
3589

3590
        let state_path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
3591
        fs::create_dir_all(state_path.parent().unwrap()).unwrap();
1✔
3592
        fs::write(
1✔
3593
            &state_path,
1✔
3594
            serde_json::to_vec_pretty(&json!({
1✔
3595
                "version": 1,
1✔
3596
                "source_id": config.source_id,
1✔
3597
                "dataset": config.dataset,
1✔
3598
                "config": config.config,
1✔
3599
                "split": config.split,
1✔
3600
                "sampler_seed": null,
1✔
3601
                "candidates": [candidate],
1✔
3602
                "candidate_sizes": {},
1✔
3603
                "next_remote_idx": 0
1✔
3604
            }))
1✔
3605
            .unwrap(),
1✔
3606
        )
3607
        .unwrap();
1✔
3608

3609
        assert!(source.ensure_row_available(0).unwrap());
1✔
3610
        server.join().unwrap();
1✔
3611

3612
        let state = source.state.lock().unwrap();
1✔
3613
        assert_eq!(state.materialized_rows, 2);
1✔
3614
        assert_eq!(state.next_remote_idx, 1);
1✔
3615
        assert_eq!(state.shards.len(), 1);
1✔
3616
    }
1✔
3617

3618
    #[test]
3619
    fn configure_sampler_updates_len_hint_headroom_via_trait_methods() {
1✔
3620
        let dir = tempdir().unwrap();
1✔
3621
        let mut config = test_config(dir.path().to_path_buf());
1✔
3622
        config.cache_capacity = 10;
1✔
3623
        config.remote_expansion_headroom_multiplier = 3;
1✔
3624
        let source = test_source(config);
1✔
3625
        {
1✔
3626
            let mut state = source.state.lock().unwrap();
1✔
3627
            state.materialized_rows = 5;
1✔
3628
            state.total_rows = Some(100);
1✔
3629
        }
1✔
3630

3631
        assert_eq!(source.reported_record_count().unwrap(), 35);
1✔
3632

3633
        let sampler = SamplerConfig {
1✔
3634
            ingestion_max_records: 2,
1✔
3635
            ..SamplerConfig::default()
1✔
3636
        };
1✔
3637
        source.configure_sampler(&sampler);
1✔
3638

3639
        assert_eq!(source.reported_record_count().unwrap(), 11);
1✔
3640
    }
1✔
3641

3642
    #[test]
3643
    fn refresh_can_bootstrap_from_persisted_remote_sequence() {
1✔
3644
        let dir = tempdir().unwrap();
1✔
3645
        let config = test_config(dir.path().to_path_buf());
1✔
3646
        let source = test_source(config.clone());
1✔
3647

3648
        let payload = b"{\"id\":\"rr\",\"text\":\"hello\"}\n".to_vec();
1✔
3649
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3650
        let candidate =
1✔
3651
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/refresh.ndjson");
1✔
3652

3653
        let state_path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
3654
        fs::create_dir_all(state_path.parent().unwrap()).unwrap();
1✔
3655
        fs::write(
1✔
3656
            &state_path,
1✔
3657
            serde_json::to_vec_pretty(&json!({
1✔
3658
                "version": 1,
1✔
3659
                "source_id": config.source_id,
1✔
3660
                "dataset": config.dataset,
1✔
3661
                "config": config.config,
1✔
3662
                "split": config.split,
1✔
3663
                "sampler_seed": null,
1✔
3664
                "candidates": [candidate],
1✔
3665
                "candidate_sizes": {},
1✔
3666
                "next_remote_idx": 0
1✔
3667
            }))
1✔
3668
            .unwrap(),
1✔
3669
        )
3670
        .unwrap();
1✔
3671

3672
        let snapshot = source.refresh(None, Some(1)).unwrap();
1✔
3673
        server.join().unwrap();
1✔
3674

3675
        assert_eq!(snapshot.records.len(), 1);
1✔
3676
        assert!(snapshot.records[0].id.contains("hf_test::rr"));
1✔
3677
    }
1✔
3678

3679
    #[test]
3680
    fn download_next_remote_shard_trims_rows_to_max_rows_limit() {
1✔
3681
        let dir = tempdir().unwrap();
1✔
3682
        let mut config = test_config(dir.path().to_path_buf());
1✔
3683
        config.max_rows = Some(1);
1✔
3684
        let source = test_source(config);
1✔
3685
        let payload = b"{\"text\":\"a\"}\n{\"text\":\"b\"}\n".to_vec();
1✔
3686
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3687
        let candidate = format!("url::{base_url}/datasets/org/ds/resolve/main/train/trim.ndjson");
1✔
3688

3689
        {
1✔
3690
            let mut state = source.state.lock().unwrap();
1✔
3691
            state.remote_candidates = Some(vec![candidate]);
1✔
3692
            state.next_remote_idx = 0;
1✔
3693
            state.materialized_rows = 0;
1✔
3694
        }
1✔
3695

3696
        assert!(source.download_next_remote_shard().unwrap());
1✔
3697
        server.join().unwrap();
1✔
3698

3699
        let state = source.state.lock().unwrap();
1✔
3700
        assert_eq!(state.materialized_rows, 1);
1✔
3701
        assert_eq!(state.shards.len(), 1);
1✔
3702
        assert_eq!(state.shards[0].row_count, 1);
1✔
3703
    }
1✔
3704

3705
    #[test]
3706
    fn build_shard_index_skips_empty_files_and_keeps_non_empty() {
1✔
3707
        let dir = tempdir().unwrap();
1✔
3708
        fs::write(dir.path().join("a.ndjson"), b"").unwrap();
1✔
3709
        fs::write(dir.path().join("b.ndjson"), b"{\"text\":\"x\"}\n").unwrap();
1✔
3710
        let config = test_config(dir.path().to_path_buf());
1✔
3711

3712
        let (shards, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
3713
        assert_eq!(discovered, 1);
1✔
3714
        assert_eq!(shards.len(), 1);
1✔
3715
        assert_eq!(shards[0].row_count, 1);
1✔
3716
    }
1✔
3717

3718
    #[test]
3719
    fn resolve_remote_candidates_from_siblings_falls_back_when_split_filter_misses() {
1✔
3720
        let dir = tempdir().unwrap();
1✔
3721
        let mut config = test_config(dir.path().to_path_buf());
1✔
3722
        config.split = "train".to_string();
1✔
3723
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
3724
        let siblings = vec![
1✔
3725
            "validation/file-a.ndjson".to_string(),
1✔
3726
            "test/file-b.ndjson".to_string(),
1✔
3727
        ];
3728

3729
        let (candidates, sizes) = HuggingFaceRowSource::resolve_remote_candidates_from_siblings(
1✔
3730
            &config, &siblings, &accepted,
1✔
3731
        )
1✔
3732
        .unwrap();
1✔
3733

3734
        assert!(sizes.is_empty());
1✔
3735
        assert_eq!(candidates.len(), 2);
1✔
3736
    }
1✔
3737

3738
    #[test]
3739
    fn resolve_remote_candidates_from_siblings_errors_for_parquet_only_when_not_accepted() {
1✔
3740
        let dir = tempdir().unwrap();
1✔
3741
        let mut config = test_config(dir.path().to_path_buf());
1✔
3742
        config.shard_extensions = vec!["ndjson".to_string()];
1✔
3743
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
3744
        let siblings = vec!["train/only.parquet".to_string()];
1✔
3745

3746
        let result = HuggingFaceRowSource::resolve_remote_candidates_from_siblings(
1✔
3747
            &config, &siblings, &accepted,
1✔
3748
        );
3749
        assert!(result.is_err());
1✔
3750
    }
1✔
3751

3752
    #[test]
3753
    fn resolve_remote_candidates_from_siblings_returns_empty_when_no_matches_and_no_parquet() {
1✔
3754
        let dir = tempdir().unwrap();
1✔
3755
        let config = test_config(dir.path().to_path_buf());
1✔
3756
        let accepted = HuggingFaceRowSource::normalized_shard_extensions(&config);
1✔
3757
        let siblings = vec!["train/notes.txt".to_string()];
1✔
3758

3759
        let (candidates, sizes) = HuggingFaceRowSource::resolve_remote_candidates_from_siblings(
1✔
3760
            &config, &siblings, &accepted,
1✔
3761
        )
1✔
3762
        .unwrap();
1✔
3763
        assert!(candidates.is_empty());
1✔
3764
        assert!(sizes.is_empty());
1✔
3765
    }
1✔
3766

3767
    #[test]
3768
    fn parse_global_row_count_response_applies_max_rows() {
1✔
3769
        let dir = tempdir().unwrap();
1✔
3770
        let mut config = test_config(dir.path().to_path_buf());
1✔
3771
        config.max_rows = Some(3);
1✔
3772
        let body = serde_json::to_string(&json!({
1✔
3773
            "size": {
1✔
3774
                "splits": [
1✔
3775
                    {"config": "default", "split": "train", "num_rows": 10}
1✔
3776
                ]
1✔
3777
            }
1✔
3778
        }))
1✔
3779
        .unwrap();
1✔
3780

3781
        let rows = HuggingFaceRowSource::parse_global_row_count_response(&config, &body)
1✔
3782
            .unwrap()
1✔
3783
            .unwrap();
1✔
3784
        assert_eq!(rows, 3);
1✔
3785
    }
1✔
3786

3787
    #[test]
3788
    fn parse_global_row_count_response_errors_on_invalid_json() {
1✔
3789
        let dir = tempdir().unwrap();
1✔
3790
        let config = test_config(dir.path().to_path_buf());
1✔
3791
        let parsed = HuggingFaceRowSource::parse_global_row_count_response(&config, "{bad-json");
1✔
3792
        assert!(parsed.is_err());
1✔
3793
    }
1✔
3794

3795
    #[test]
3796
    fn parse_parquet_manifest_response_errors_on_invalid_json() {
1✔
3797
        let dir = tempdir().unwrap();
1✔
3798
        let config = test_config(dir.path().to_path_buf());
1✔
3799
        let parsed = HuggingFaceRowSource::parse_parquet_manifest_response(&config, "{bad-json");
1✔
3800
        assert!(parsed.is_err());
1✔
3801
    }
1✔
3802

3803
    #[test]
3804
    fn parse_parquet_manifest_response_returns_candidates() {
1✔
3805
        let dir = tempdir().unwrap();
1✔
3806
        let config = test_config(dir.path().to_path_buf());
1✔
3807
        let body = serde_json::to_string(&json!({
1✔
3808
            "parquet_files": [
1✔
3809
                {"url": "https://host/datasets/x/resolve/main/train/0.parquet", "size": 5}
1✔
3810
            ]
1✔
3811
        }))
1✔
3812
        .unwrap();
1✔
3813

3814
        let (candidates, sizes) =
1✔
3815
            HuggingFaceRowSource::parse_parquet_manifest_response(&config, &body).unwrap();
1✔
3816
        assert_eq!(candidates.len(), 1);
1✔
3817
        assert_eq!(sizes.len(), 1);
1✔
3818
    }
1✔
3819

3820
    #[test]
3821
    fn download_and_materialize_shard_downloads_url_candidate() {
1✔
3822
        let dir = tempdir().unwrap();
1✔
3823
        let config = test_config(dir.path().to_path_buf());
1✔
3824
        let payload = b"{\"text\":\"a\"}\n{\"text\":\"b\"}\n".to_vec();
1✔
3825
        let (base_url, server) = spawn_one_shot_http(payload.clone());
1✔
3826
        let candidate =
1✔
3827
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-000.ndjson");
1✔
3828

3829
        let target =
1✔
3830
            HuggingFaceRowSource::download_and_materialize_shard(&config, &candidate, None)
1✔
3831
                .unwrap();
1✔
3832

3833
        server.join().unwrap();
1✔
3834
        assert!(target.exists());
1✔
3835
        assert_eq!(fs::read(&target).unwrap(), payload);
1✔
3836
    }
1✔
3837

3838
    #[test]
3839
    fn download_and_materialize_shard_replaces_incomplete_existing_target() {
1✔
3840
        let dir = tempdir().unwrap();
1✔
3841
        let config = test_config(dir.path().to_path_buf());
1✔
3842
        let payload = b"{\"text\":\"a\"}\n".to_vec();
1✔
3843
        let (base_url, server) = spawn_one_shot_http(payload.clone());
1✔
3844
        let candidate =
1✔
3845
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-009.ndjson");
1✔
3846

3847
        let target = HuggingFaceRowSource::candidate_target_path(&config, &candidate);
1✔
3848
        fs::create_dir_all(target.parent().unwrap()).unwrap();
1✔
3849
        fs::write(&target, b"bad").unwrap();
1✔
3850

3851
        let refreshed = HuggingFaceRowSource::download_and_materialize_shard(
1✔
3852
            &config,
1✔
3853
            &candidate,
1✔
3854
            Some(payload.len() as u64),
1✔
3855
        )
3856
        .unwrap();
1✔
3857

3858
        server.join().unwrap();
1✔
3859
        assert_eq!(refreshed, target);
1✔
3860
        assert_eq!(fs::read(&target).unwrap(), payload);
1✔
3861
    }
1✔
3862

3863
    #[test]
3864
    fn download_next_remote_shard_materializes_and_indexes_rows() {
1✔
3865
        let dir = tempdir().unwrap();
1✔
3866
        let config = test_config(dir.path().to_path_buf());
1✔
3867
        let source = test_source(config);
1✔
3868
        let payload = b"{\"text\":\"a\"}\n{\"text\":\"b\"}\n".to_vec();
1✔
3869
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3870
        let candidate =
1✔
3871
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-001.ndjson");
1✔
3872

3873
        {
1✔
3874
            let mut state = source.state.lock().unwrap();
1✔
3875
            state.remote_candidates = Some(vec![candidate.clone()]);
1✔
3876
            state.remote_candidate_sizes.insert(candidate, 24);
1✔
3877
            state.next_remote_idx = 0;
1✔
3878
        }
1✔
3879

3880
        assert!(source.download_next_remote_shard().unwrap());
1✔
3881
        server.join().unwrap();
1✔
3882

3883
        let state = source.state.lock().unwrap();
1✔
3884
        assert_eq!(state.materialized_rows, 2);
1✔
3885
        assert_eq!(state.shards.len(), 1);
1✔
3886
        assert_eq!(state.next_remote_idx, 1);
1✔
3887
    }
1✔
3888

3889
    #[test]
3890
    fn ensure_row_available_triggers_lazy_download_for_remote_candidates() {
1✔
3891
        let dir = tempdir().unwrap();
1✔
3892
        let config = test_config(dir.path().to_path_buf());
1✔
3893
        let source = test_source(config);
1✔
3894
        let payload = b"{\"text\":\"x\"}\n{\"text\":\"y\"}\n".to_vec();
1✔
3895
        let (base_url, server) = spawn_one_shot_http(payload);
1✔
3896
        let candidate =
1✔
3897
            format!("url::{base_url}/datasets/org/ds/resolve/main/train/part-002.ndjson");
1✔
3898

3899
        {
1✔
3900
            let mut state = source.state.lock().unwrap();
1✔
3901
            state.materialized_rows = 0;
1✔
3902
            state.remote_candidates = Some(vec![candidate.clone()]);
1✔
3903
            state.remote_candidate_sizes.insert(candidate, 24);
1✔
3904
            state.next_remote_idx = 0;
1✔
3905
        }
1✔
3906

3907
        assert!(source.ensure_row_available(0).unwrap());
1✔
3908
        server.join().unwrap();
1✔
3909

3910
        let state = source.state.lock().unwrap();
1✔
3911
        assert!(state.materialized_rows >= 1);
1✔
3912
        assert_eq!(state.next_remote_idx, 1);
1✔
3913
    }
1✔
3914

3915
    #[test]
3916
    fn extract_split_row_count_reads_split_entries() {
1✔
3917
        let payload = json!({
1✔
3918
            "size": {
1✔
3919
                "splits": [
1✔
3920
                    {"config": "default", "split": "train", "num_rows": 123u64},
1✔
3921
                    {"config": "default", "split": "validation", "num_rows": 45u64}
1✔
3922
                ]
3923
            }
3924
        });
3925

3926
        let count = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
3927
            &payload,
1✔
3928
            "default",
1✔
3929
            "validation",
1✔
3930
        );
3931
        assert_eq!(count, Some(45));
1✔
3932
    }
1✔
3933

3934
    #[test]
3935
    fn extract_split_row_count_reads_config_fallback_and_dataset_total() {
1✔
3936
        let payload = json!({
1✔
3937
            "size": {
1✔
3938
                "configs": [
1✔
3939
                    {
3940
                        "config": "default",
1✔
3941
                        "splits": [{"name": "test", "num_rows": 77u64}],
1✔
3942
                        "num_rows": 200u64
1✔
3943
                    }
3944
                ],
3945
                "dataset": {"num_rows": 999u64}
1✔
3946
            }
3947
        });
3948

3949
        let split_count = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
3950
            &payload, "default", "test",
1✔
3951
        );
3952
        assert_eq!(split_count, Some(77));
1✔
3953

3954
        let empty_split_count = HuggingFaceRowSource::extract_split_row_count_from_size_response(
1✔
3955
            &payload, "default", "",
1✔
3956
        );
3957
        assert_eq!(empty_split_count, Some(200));
1✔
3958
    }
1✔
3959

3960
    #[test]
3961
    fn shard_candidate_seed_uses_sampler_seed_when_provided() {
1✔
3962
        let dir = tempdir().unwrap();
1✔
3963
        let mut a = test_config(dir.path().join("a"));
1✔
3964
        let mut b = test_config(dir.path().join("b"));
1✔
3965
        a.source_id = "source_a".to_string();
1✔
3966
        b.source_id = "source_b".to_string();
1✔
3967

3968
        let with_seed_a = HuggingFaceRowSource::shard_candidate_seed(&a, 100, Some(42));
1✔
3969
        let with_seed_b = HuggingFaceRowSource::shard_candidate_seed(&b, 100, Some(42));
1✔
3970
        assert_eq!(with_seed_a, with_seed_b);
1✔
3971

3972
        let without_seed_a = HuggingFaceRowSource::shard_candidate_seed(&a, 100, None);
1✔
3973
        let without_seed_b = HuggingFaceRowSource::shard_candidate_seed(&b, 100, None);
1✔
3974
        assert_ne!(without_seed_a, without_seed_b);
1✔
3975
    }
1✔
3976

3977
    #[test]
3978
    fn expansion_headroom_uses_sampler_ingestion_max_records_when_configured() {
1✔
3979
        let dir = tempdir().unwrap();
1✔
3980
        let config = test_config(dir.path().to_path_buf());
1✔
3981
        let source = test_source(config);
1✔
3982

3983
        assert_eq!(source.effective_expansion_headroom_rows(), 30);
1✔
3984

3985
        let sampler = SamplerConfig {
1✔
3986
            ingestion_max_records: 7,
1✔
3987
            ..SamplerConfig::default()
1✔
3988
        };
1✔
3989
        source.configure_sampler(&sampler);
1✔
3990
        assert_eq!(source.effective_expansion_headroom_rows(), 21);
1✔
3991
    }
1✔
3992

3993
    #[test]
3994
    fn persisted_shard_sequence_roundtrip_respects_sampler_seed() {
1✔
3995
        let dir = tempdir().unwrap();
1✔
3996
        let config = test_config(dir.path().to_path_buf());
1✔
3997
        let source = test_source(config.clone());
1✔
3998

3999
        {
1✔
4000
            let sampler = SamplerConfig {
1✔
4001
                seed: 4242,
1✔
4002
                ..SamplerConfig::default()
1✔
4003
            };
1✔
4004
            source.configure_sampler(&sampler);
1✔
4005
        }
1✔
4006

4007
        let mut state = SourceState {
1✔
4008
            materialized_rows: 0,
1✔
4009
            total_rows: None,
1✔
4010
            shards: Vec::new(),
1✔
4011
            remote_candidates: Some(vec![
1✔
4012
                "url::https://x/resolve/main/train/000.parquet".to_string(),
1✔
4013
                "url::https://x/resolve/main/train/001.parquet".to_string(),
1✔
4014
            ]),
1✔
4015
            remote_candidate_sizes: HashMap::new(),
1✔
4016
            next_remote_idx: 1,
1✔
4017
        };
1✔
4018
        state.remote_candidate_sizes.insert(
1✔
4019
            "url::https://x/resolve/main/train/000.parquet".to_string(),
1✔
4020
            10,
4021
        );
4022

4023
        source.persist_shard_sequence_locked(&state).unwrap();
1✔
4024

4025
        let restored =
1✔
4026
            HuggingFaceRowSource::load_persisted_shard_sequence(&config, Some(4242)).unwrap();
1✔
4027
        assert!(restored.is_some());
1✔
4028
        let restored = restored.unwrap();
1✔
4029
        assert_eq!(restored.next_remote_idx, 1);
1✔
4030
        assert_eq!(restored.candidates.len(), 2);
1✔
4031

4032
        let rejected =
1✔
4033
            HuggingFaceRowSource::load_persisted_shard_sequence(&config, Some(9999)).unwrap();
1✔
4034
        assert!(rejected.is_none());
1✔
4035
    }
1✔
4036

4037
    #[test]
4038
    fn value_to_text_handles_scalar_and_structured_values() {
1✔
4039
        assert_eq!(HuggingFaceRowSource::value_to_text(&json!(null)), None);
1✔
4040
        assert_eq!(HuggingFaceRowSource::value_to_text(&json!("   ")), None);
1✔
4041
        assert_eq!(
1✔
4042
            HuggingFaceRowSource::value_to_text(&json!("hello")),
1✔
4043
            Some("hello".into())
1✔
4044
        );
4045
        assert_eq!(
1✔
4046
            HuggingFaceRowSource::value_to_text(&json!(true)),
1✔
4047
            Some("true".into())
1✔
4048
        );
4049
        assert_eq!(
1✔
4050
            HuggingFaceRowSource::value_to_text(&json!(3.5)),
1✔
4051
            Some("3.5".into())
1✔
4052
        );
4053
        assert_eq!(
1✔
4054
            HuggingFaceRowSource::value_to_text(&json!([1, 2])),
1✔
4055
            Some("[1,2]".into())
1✔
4056
        );
4057
    }
1✔
4058

4059
    #[test]
4060
    fn parse_row_auto_detects_text_fields_and_skips_id() {
1✔
4061
        let dir = tempdir().unwrap();
1✔
4062
        let mut config = test_config(dir.path().to_path_buf());
1✔
4063
        config.id_column = Some("id".into());
1✔
4064
        let source = test_source(config);
1✔
4065

4066
        let row = source
1✔
4067
            .parse_row(
1✔
4068
                5,
4069
                &json!({
1✔
4070
                    "id": "row-5",
1✔
4071
                    "title": "Anchor text",
1✔
4072
                    "body": "Context text",
1✔
4073
                    "flag": true
1✔
4074
                }),
1✔
4075
            )
4076
            .unwrap();
1✔
4077

4078
        assert_eq!(row.row_id.as_deref(), Some("row-5"));
1✔
4079
        assert!(row.text_fields.iter().any(|f| f.name == "title"));
3✔
4080
        assert!(row.text_fields.iter().any(|f| f.name == "body"));
1✔
4081
        assert!(row.text_fields.iter().all(|f| f.name != "id"));
3✔
4082
    }
1✔
4083

4084
    #[test]
4085
    fn parse_row_with_required_columns_errors_when_missing() {
1✔
4086
        let dir = tempdir().unwrap();
1✔
4087
        let mut config = test_config(dir.path().to_path_buf());
1✔
4088
        config.anchor_column = Some("anchor".into());
1✔
4089
        config.positive_column = Some("positive".into());
1✔
4090
        config.context_columns = vec!["context".into()];
1✔
4091
        let source = test_source(config);
1✔
4092

4093
        let err = source.parse_row(0, &json!({"anchor": "x", "context": "z"}));
1✔
4094
        assert!(err.is_err());
1✔
4095
    }
1✔
4096

4097
    #[test]
4098
    fn parse_row_errors_when_payload_is_not_object() {
1✔
4099
        let dir = tempdir().unwrap();
1✔
4100
        let config = test_config(dir.path().to_path_buf());
1✔
4101
        let source = test_source(config);
1✔
4102

4103
        let err = source.parse_row(0, &json!("not-an-object"));
1✔
4104
        assert!(err.is_err());
1✔
4105
    }
1✔
4106

4107
    #[test]
4108
    fn row_to_record_builds_expected_sections() {
1✔
4109
        let dir = tempdir().unwrap();
1✔
4110
        let config = test_config(dir.path().to_path_buf());
1✔
4111
        let source = test_source(config);
1✔
4112
        let row = RowView {
1✔
4113
            row_id: Some("abc".into()),
1✔
4114
            timestamp: Some(Utc::now()),
1✔
4115
            text_fields: vec![
1✔
4116
                RowTextField {
1✔
4117
                    name: "title".into(),
1✔
4118
                    text: "anchor".into(),
1✔
4119
                },
1✔
4120
                RowTextField {
1✔
4121
                    name: "pos".into(),
1✔
4122
                    text: "positive".into(),
1✔
4123
                },
1✔
4124
                RowTextField {
1✔
4125
                    name: "ctx".into(),
1✔
4126
                    text: "extra".into(),
1✔
4127
                },
1✔
4128
            ],
1✔
4129
        };
1✔
4130

4131
        let record = source.row_to_record(&row, 1).unwrap().unwrap();
1✔
4132
        assert_eq!(record.sections.len(), 3);
1✔
4133
        assert_eq!(record.sections[0].role, SectionRole::Anchor);
1✔
4134
        assert_eq!(record.sections[1].role, SectionRole::Context);
1✔
4135
        assert_eq!(record.id, "hf_test::abc");
1✔
4136
    }
1✔
4137

4138
    #[test]
4139
    fn effective_refresh_batch_target_uses_multiplier_floor_of_one() {
1✔
4140
        let dir = tempdir().unwrap();
1✔
4141
        let mut config = test_config(dir.path().to_path_buf());
1✔
4142
        config.refresh_batch_multiplier = 0;
1✔
4143
        let source = test_source(config);
1✔
4144
        assert_eq!(source.effective_refresh_batch_target(7), 7);
1✔
4145
    }
1✔
4146

4147
    #[test]
4148
    fn locate_shard_and_recompute_offsets_work() {
1✔
4149
        let mut shards = vec![
1✔
4150
            ShardIndex {
1✔
4151
                path: PathBuf::from("a"),
1✔
4152
                global_start: 10,
1✔
4153
                row_count: 3,
1✔
4154
                is_parquet: false,
1✔
4155
                parquet_row_groups: Vec::new(),
1✔
4156
                checkpoints: vec![0],
1✔
4157
            },
1✔
4158
            ShardIndex {
1✔
4159
                path: PathBuf::from("b"),
1✔
4160
                global_start: 20,
1✔
4161
                row_count: 2,
1✔
4162
                is_parquet: false,
1✔
4163
                parquet_row_groups: Vec::new(),
1✔
4164
                checkpoints: vec![0],
1✔
4165
            },
1✔
4166
        ];
4167
        let hit = HuggingFaceRowSource::locate_shard(&shards, 11).unwrap();
1✔
4168
        assert_eq!(hit.1, 1);
1✔
4169

4170
        let mut state = SourceState {
1✔
4171
            materialized_rows: 0,
1✔
4172
            total_rows: None,
1✔
4173
            shards: std::mem::take(&mut shards),
1✔
4174
            remote_candidates: None,
1✔
4175
            remote_candidate_sizes: HashMap::new(),
1✔
4176
            next_remote_idx: 0,
1✔
4177
        };
1✔
4178
        HuggingFaceRowSource::recompute_shard_offsets(&mut state);
1✔
4179
        assert_eq!(state.shards[0].global_start, 0);
1✔
4180
        assert_eq!(state.shards[1].global_start, 3);
1✔
4181
        assert_eq!(state.materialized_rows, 5);
1✔
4182
    }
1✔
4183

4184
    #[test]
4185
    fn len_hint_covers_known_and_empty_paths() {
1✔
4186
        let dir = tempdir().unwrap();
1✔
4187
        let mut config = test_config(dir.path().to_path_buf());
1✔
4188
        config.max_rows = Some(9);
1✔
4189
        let source = test_source(config);
1✔
4190

4191
        {
1✔
4192
            let mut state = source.state.lock().unwrap();
1✔
4193
            state.materialized_rows = 5;
1✔
4194
            state.total_rows = Some(100);
1✔
4195
        }
1✔
4196
        assert_eq!(source.len_hint(), Some(9));
1✔
4197

4198
        {
1✔
4199
            let mut state = source.state.lock().unwrap();
1✔
4200
            state.materialized_rows = 0;
1✔
4201
            state.total_rows = Some(0);
1✔
4202
        }
1✔
4203
        assert_eq!(source.len_hint(), Some(0));
1✔
4204
    }
1✔
4205

4206
    #[test]
4207
    fn len_hint_defaults_to_one_when_unknown_and_not_exhausted() {
1✔
4208
        let dir = tempdir().unwrap();
1✔
4209
        let config = test_config(dir.path().to_path_buf());
1✔
4210
        let source = test_source(config);
1✔
4211
        assert_eq!(source.len_hint(), Some(1));
1✔
4212
    }
1✔
4213

4214
    #[test]
4215
    fn read_line_at_reads_expected_row_with_checkpoints() {
1✔
4216
        let dir = tempdir().unwrap();
1✔
4217
        let path = dir.path().join("rows.jsonl");
1✔
4218
        let mut file = File::create(&path).unwrap();
1✔
4219
        file.write_all(b"{\"text\":\"a\"}\n").unwrap();
1✔
4220
        file.write_all(b"{\"text\":\"b\"}\n").unwrap();
1✔
4221
        file.write_all(b"{\"text\":\"c\"}\n").unwrap();
1✔
4222

4223
        let mut config = test_config(dir.path().to_path_buf());
1✔
4224
        config.checkpoint_stride = 1;
1✔
4225
        let source = test_source(config.clone());
1✔
4226
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4227
            .unwrap()
1✔
4228
            .unwrap();
1✔
4229

4230
        let line = source.read_line_at(&shard, 2).unwrap();
1✔
4231
        assert!(line.contains("\"c\""));
1✔
4232
    }
1✔
4233

4234
    #[test]
4235
    fn read_line_at_errors_when_checkpoint_is_missing() {
1✔
4236
        let dir = tempdir().unwrap();
1✔
4237
        let path = dir.path().join("rows.jsonl");
1✔
4238
        fs::write(&path, b"{\"text\":\"a\"}\n").unwrap();
1✔
4239

4240
        let mut config = test_config(dir.path().to_path_buf());
1✔
4241
        config.checkpoint_stride = 1;
1✔
4242
        let source = test_source(config.clone());
1✔
4243
        let mut shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4244
            .unwrap()
1✔
4245
            .unwrap();
1✔
4246
        shard.checkpoints.clear();
1✔
4247

4248
        let err = source.read_line_at(&shard, 0);
1✔
4249
        assert!(err.is_err());
1✔
4250
    }
1✔
4251

4252
    #[test]
4253
    fn load_persisted_shard_sequence_clamps_next_index_to_candidate_len() {
1✔
4254
        let dir = tempdir().unwrap();
1✔
4255
        let config = test_config(dir.path().to_path_buf());
1✔
4256
        let state_path = HuggingFaceRowSource::shard_sequence_state_path(&config);
1✔
4257
        fs::create_dir_all(state_path.parent().unwrap()).unwrap();
1✔
4258
        fs::write(
1✔
4259
            &state_path,
1✔
4260
            serde_json::to_vec_pretty(&json!({
1✔
4261
                "version": 1,
1✔
4262
                "source_id": config.source_id,
1✔
4263
                "dataset": config.dataset,
1✔
4264
                "config": config.config,
1✔
4265
                "split": config.split,
1✔
4266
                "sampler_seed": null,
1✔
4267
                "candidates": ["url::http://x/resolve/main/train/000.ndjson"],
1✔
4268
                "candidate_sizes": {},
1✔
4269
                "next_remote_idx": 99
1✔
4270
            }))
1✔
4271
            .unwrap(),
1✔
4272
        )
4273
        .unwrap();
1✔
4274

4275
        let loaded = HuggingFaceRowSource::load_persisted_shard_sequence(&config, None)
1✔
4276
            .unwrap()
1✔
4277
            .unwrap();
1✔
4278
        assert_eq!(loaded.next_remote_idx, 1);
1✔
4279
    }
1✔
4280

4281
    #[test]
4282
    fn materialize_local_file_copies_and_is_idempotent_when_size_matches() {
1✔
4283
        let dir = tempdir().unwrap();
1✔
4284
        let config = test_config(dir.path().to_path_buf());
1✔
4285
        let src = dir.path().join("src.ndjson");
1✔
4286
        let dst = dir.path().join("nested/dst.ndjson");
1✔
4287

4288
        fs::write(&src, b"line\n").unwrap();
1✔
4289
        HuggingFaceRowSource::materialize_local_file(&config, &src, &dst).unwrap();
1✔
4290
        let first = fs::read(&dst).unwrap();
1✔
4291
        HuggingFaceRowSource::materialize_local_file(&config, &src, &dst).unwrap();
1✔
4292
        let second = fs::read(&dst).unwrap();
1✔
4293
        assert_eq!(first, second);
1✔
4294
    }
1✔
4295

4296
    #[test]
4297
    fn enforce_disk_cap_evicts_old_manifest_shards() {
1✔
4298
        let dir = tempdir().unwrap();
1✔
4299
        let mut config = test_config(dir.path().to_path_buf());
1✔
4300
        config.local_disk_cap_bytes = Some(10);
1✔
4301
        config.min_resident_shards = 0;
1✔
4302
        let source = test_source(config);
1✔
4303

4304
        let manifest_root = source.manifest_cache_root();
1✔
4305
        fs::create_dir_all(&manifest_root).unwrap();
1✔
4306
        let evict_path = manifest_root.join("a.parquet");
1✔
4307
        let keep_path = manifest_root.join("b.parquet");
1✔
4308
        fs::write(&evict_path, vec![1u8; 8]).unwrap();
1✔
4309
        fs::write(&keep_path, vec![2u8; 8]).unwrap();
1✔
4310

4311
        let mut state = SourceState {
1✔
4312
            materialized_rows: 16,
1✔
4313
            total_rows: None,
1✔
4314
            shards: vec![
1✔
4315
                ShardIndex {
1✔
4316
                    path: evict_path.clone(),
1✔
4317
                    global_start: 0,
1✔
4318
                    row_count: 8,
1✔
4319
                    is_parquet: true,
1✔
4320
                    parquet_row_groups: vec![(0, 8)],
1✔
4321
                    checkpoints: Vec::new(),
1✔
4322
                },
1✔
4323
                ShardIndex {
1✔
4324
                    path: keep_path.clone(),
1✔
4325
                    global_start: 8,
1✔
4326
                    row_count: 8,
1✔
4327
                    is_parquet: true,
1✔
4328
                    parquet_row_groups: vec![(0, 8)],
1✔
4329
                    checkpoints: Vec::new(),
1✔
4330
                },
1✔
4331
            ],
1✔
4332
            remote_candidates: None,
1✔
4333
            remote_candidate_sizes: HashMap::new(),
1✔
4334
            next_remote_idx: 0,
1✔
4335
        };
1✔
4336

4337
        let evicted = source
1✔
4338
            .enforce_disk_cap_locked(&mut state, &keep_path)
1✔
4339
            .unwrap();
1✔
4340
        assert!(evicted);
1✔
4341
        assert!(!evict_path.exists());
1✔
4342
        assert!(keep_path.exists());
1✔
4343
        assert_eq!(state.shards.len(), 1);
1✔
4344
    }
1✔
4345

4346
    #[test]
4347
    fn enforce_disk_cap_errors_when_min_resident_prevents_eviction() {
1✔
4348
        let dir = tempdir().unwrap();
1✔
4349
        let mut config = test_config(dir.path().to_path_buf());
1✔
4350
        config.local_disk_cap_bytes = Some(4);
1✔
4351
        config.min_resident_shards = 1;
1✔
4352
        let source = test_source(config);
1✔
4353

4354
        let manifest_root = source.manifest_cache_root();
1✔
4355
        fs::create_dir_all(&manifest_root).unwrap();
1✔
4356
        let protected = manifest_root.join("only.parquet");
1✔
4357
        fs::write(&protected, vec![1u8; 8]).unwrap();
1✔
4358

4359
        let mut state = SourceState {
1✔
4360
            materialized_rows: 8,
1✔
4361
            total_rows: None,
1✔
4362
            shards: vec![ShardIndex {
1✔
4363
                path: protected.clone(),
1✔
4364
                global_start: 0,
1✔
4365
                row_count: 8,
1✔
4366
                is_parquet: true,
1✔
4367
                parquet_row_groups: vec![(0, 8)],
1✔
4368
                checkpoints: Vec::new(),
1✔
4369
            }],
1✔
4370
            remote_candidates: None,
1✔
4371
            remote_candidate_sizes: HashMap::new(),
1✔
4372
            next_remote_idx: 0,
1✔
4373
        };
1✔
4374

4375
        let err = source.enforce_disk_cap_locked(&mut state, &protected);
1✔
4376
        assert!(err.is_err());
1✔
4377
        assert!(!protected.exists());
1✔
4378
    }
1✔
4379

4380
    #[test]
4381
    fn build_shard_index_discovers_local_jsonl_shards() {
1✔
4382
        let dir = tempdir().unwrap();
1✔
4383
        let root = dir.path().to_path_buf();
1✔
4384
        fs::write(root.join("a.jsonl"), b"{\"text\":\"a\"}\n").unwrap();
1✔
4385
        fs::write(root.join("b.ndjson"), b"{\"text\":\"b\"}\n").unwrap();
1✔
4386

4387
        let config = test_config(root.clone());
1✔
4388
        let (shards, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
4389
        assert_eq!(discovered, 2);
1✔
4390
        assert_eq!(shards.len(), 2);
1✔
4391
    }
1✔
4392

4393
    #[test]
4394
    fn index_single_shard_returns_none_for_empty_file() {
1✔
4395
        let dir = tempdir().unwrap();
1✔
4396
        let config = test_config(dir.path().to_path_buf());
1✔
4397
        let path = dir.path().join("empty.jsonl");
1✔
4398
        fs::write(&path, b"").unwrap();
1✔
4399
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0).unwrap();
1✔
4400
        assert!(shard.is_none());
1✔
4401
    }
1✔
4402

4403
    #[test]
4404
    fn refresh_reads_local_rows_and_advances_cursor() {
1✔
4405
        let dir = tempdir().unwrap();
1✔
4406
        let path = dir.path().join("rows.jsonl");
1✔
4407
        fs::write(
1✔
4408
            &path,
1✔
4409
            b"{\"id\":\"r1\",\"text\":\"alpha\"}\n{\"id\":\"r2\",\"text\":\"beta\"}\n{\"id\":\"r3\",\"text\":\"gamma\"}\n",
4410
        )
4411
        .unwrap();
1✔
4412

4413
        let mut config = test_config(dir.path().to_path_buf());
1✔
4414
        config.checkpoint_stride = 1;
1✔
4415
        config.refresh_batch_multiplier = 1;
1✔
4416
        let source = test_source(config.clone());
1✔
4417
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4418
            .unwrap()
1✔
4419
            .unwrap();
1✔
4420
        {
1✔
4421
            let mut state = source.state.lock().unwrap();
1✔
4422
            state.materialized_rows = shard.row_count;
1✔
4423
            state.total_rows = Some(shard.row_count);
1✔
4424
            state.shards = vec![shard];
1✔
4425
        }
1✔
4426

4427
        let snapshot = source.refresh(None, Some(2)).unwrap();
1✔
4428
        assert_eq!(snapshot.records.len(), 2);
1✔
4429
        assert!(snapshot.cursor.revision > 0);
1✔
4430
    }
1✔
4431

4432
    #[test]
4433
    fn reported_record_count_uses_len_hint_for_local_state() {
1✔
4434
        let dir = tempdir().unwrap();
1✔
4435
        let config = test_config(dir.path().to_path_buf());
1✔
4436
        let source = test_source(config);
1✔
4437
        {
1✔
4438
            let mut state = source.state.lock().unwrap();
1✔
4439
            state.materialized_rows = 4;
1✔
4440
            state.total_rows = Some(4);
1✔
4441
        }
1✔
4442
        assert_eq!(source.reported_record_count().unwrap(), 4);
1✔
4443
    }
1✔
4444

4445
    #[test]
4446
    fn rotate_candidates_deterministically_preserves_membership() {
1✔
4447
        let dir = tempdir().unwrap();
1✔
4448
        let config = test_config(dir.path().to_path_buf());
1✔
4449
        let original = vec!["a".to_string(), "b".to_string(), "c".to_string()];
1✔
4450
        let mut rotated = original.clone();
1✔
4451
        HuggingFaceRowSource::rotate_candidates_deterministically(&config, &mut rotated);
1✔
4452
        let mut sorted_original = original;
1✔
4453
        let mut sorted_rotated = rotated;
1✔
4454
        sorted_original.sort();
1✔
4455
        sorted_rotated.sort();
1✔
4456
        assert_eq!(sorted_rotated, sorted_original);
1✔
4457
    }
1✔
4458

4459
    #[test]
4460
    fn parse_row_supports_row_wrapped_payload_and_text_columns() {
1✔
4461
        let dir = tempdir().unwrap();
1✔
4462
        let mut config = test_config(dir.path().to_path_buf());
1✔
4463
        config.text_columns = vec!["headline".into(), "body".into()];
1✔
4464
        config.id_column = Some("rid".into());
1✔
4465
        let source = test_source(config);
1✔
4466

4467
        let parsed = source
1✔
4468
            .parse_row(
1✔
4469
                0,
4470
                &json!({"row": {"rid": "r-1", "headline": "h", "body": "b"}}),
1✔
4471
            )
4472
            .unwrap();
1✔
4473

4474
        assert_eq!(parsed.row_id.as_deref(), Some("r-1"));
1✔
4475
        assert_eq!(parsed.text_fields.len(), 2);
1✔
4476
        assert_eq!(parsed.text_fields[0].name, "headline");
1✔
4477
    }
1✔
4478

4479
    #[test]
4480
    fn row_to_record_returns_none_for_empty_fields() {
1✔
4481
        let dir = tempdir().unwrap();
1✔
4482
        let config = test_config(dir.path().to_path_buf());
1✔
4483
        let source = test_source(config);
1✔
4484
        let row = RowView {
1✔
4485
            row_id: Some("x".into()),
1✔
4486
            timestamp: None,
1✔
4487
            text_fields: Vec::new(),
1✔
4488
        };
1✔
4489
        assert!(source.row_to_record(&row, 0).unwrap().is_none());
1✔
4490
    }
1✔
4491

4492
    #[test]
4493
    fn ensure_row_available_handles_materialized_max_and_exhausted_candidates() {
1✔
4494
        let dir = tempdir().unwrap();
1✔
4495
        let mut config = test_config(dir.path().to_path_buf());
1✔
4496
        config.max_rows = Some(2);
1✔
4497
        let source = test_source(config);
1✔
4498
        {
1✔
4499
            let mut state = source.state.lock().unwrap();
1✔
4500
            state.materialized_rows = 1;
1✔
4501
            state.remote_candidates = Some(vec![]);
1✔
4502
            state.next_remote_idx = 0;
1✔
4503
        }
1✔
4504

4505
        assert!(source.ensure_row_available(0).unwrap());
1✔
4506
        assert!(!source.ensure_row_available(3).unwrap());
1✔
4507
        assert!(!source.ensure_row_available(1).unwrap());
1✔
4508
    }
1✔
4509

4510
    #[test]
4511
    fn read_row_batch_uses_cached_rows_and_respects_limit() {
1✔
4512
        let dir = tempdir().unwrap();
1✔
4513
        let config = test_config(dir.path().to_path_buf());
1✔
4514
        let source = test_source(config.clone());
1✔
4515

4516
        {
1✔
4517
            let mut state = source.state.lock().unwrap();
1✔
4518
            state.materialized_rows = 2;
1✔
4519
            state.total_rows = Some(2);
1✔
4520
        }
1✔
4521

4522
        let row0 = RowView {
1✔
4523
            row_id: Some("r0".into()),
1✔
4524
            timestamp: Some(Utc::now()),
1✔
4525
            text_fields: vec![RowTextField {
1✔
4526
                name: "text".into(),
1✔
4527
                text: "alpha".into(),
1✔
4528
            }],
1✔
4529
        };
1✔
4530
        let row1 = RowView {
1✔
4531
            row_id: Some("r1".into()),
1✔
4532
            timestamp: Some(Utc::now()),
1✔
4533
            text_fields: vec![RowTextField {
1✔
4534
                name: "text".into(),
1✔
4535
                text: "beta".into(),
1✔
4536
            }],
1✔
4537
        };
1✔
4538
        {
1✔
4539
            let mut cache = source.cache.lock().unwrap();
1✔
4540
            cache.insert(0, row0, config.cache_capacity);
1✔
4541
            cache.insert(1, row1, config.cache_capacity);
1✔
4542
        }
1✔
4543

4544
        let mut out = Vec::new();
1✔
4545
        source.read_row_batch(&[0, 1], &mut out, Some(1)).unwrap();
1✔
4546
        assert_eq!(out.len(), 1);
1✔
4547
    }
1✔
4548

4549
    #[test]
4550
    fn read_row_batch_errors_on_invalid_json_line() {
1✔
4551
        let dir = tempdir().unwrap();
1✔
4552
        let path = dir.path().join("broken.jsonl");
1✔
4553
        fs::write(&path, b"not-json\n").unwrap();
1✔
4554

4555
        let mut config = test_config(dir.path().to_path_buf());
1✔
4556
        config.checkpoint_stride = 1;
1✔
4557
        let source = test_source(config.clone());
1✔
4558
        let shard = HuggingFaceRowSource::index_single_shard(&config, &path, 0)
1✔
4559
            .unwrap()
1✔
4560
            .unwrap();
1✔
4561
        {
1✔
4562
            let mut state = source.state.lock().unwrap();
1✔
4563
            state.materialized_rows = 1;
1✔
4564
            state.total_rows = Some(1);
1✔
4565
            state.shards = vec![shard];
1✔
4566
        }
1✔
4567

4568
        let mut out = Vec::new();
1✔
4569
        let result = source.read_row_batch(&[0], &mut out, Some(1));
1✔
4570
        assert!(result.is_err());
1✔
4571
    }
1✔
4572

4573
    #[test]
4574
    fn build_shard_index_errors_when_no_matching_extensions() {
1✔
4575
        let dir = tempdir().unwrap();
1✔
4576
        fs::write(dir.path().join("data.txt"), b"x\n").unwrap();
1✔
4577
        let config = test_config(dir.path().to_path_buf());
1✔
4578
        let result = HuggingFaceRowSource::build_shard_index(&config);
1✔
4579
        assert!(result.is_err());
1✔
4580
    }
1✔
4581

4582
    #[test]
4583
    fn build_shard_index_honors_max_rows() {
1✔
4584
        let dir = tempdir().unwrap();
1✔
4585
        fs::write(
1✔
4586
            dir.path().join("rows.jsonl"),
1✔
4587
            b"{\"text\":\"1\"}\n{\"text\":\"2\"}\n{\"text\":\"3\"}\n",
4588
        )
4589
        .unwrap();
1✔
4590
        let mut config = test_config(dir.path().to_path_buf());
1✔
4591
        config.max_rows = Some(2);
1✔
4592

4593
        let (_, discovered) = HuggingFaceRowSource::build_shard_index(&config).unwrap();
1✔
4594
        assert_eq!(discovered, 2);
1✔
4595
    }
1✔
4596

4597
    #[test]
4598
    fn refresh_handles_empty_total_and_cursor_wrap() {
1✔
4599
        let dir = tempdir().unwrap();
1✔
4600
        let config = test_config(dir.path().to_path_buf());
1✔
4601
        let source = test_source(config.clone());
1✔
4602

4603
        {
1✔
4604
            let mut state = source.state.lock().unwrap();
1✔
4605
            state.materialized_rows = 0;
1✔
4606
            state.total_rows = Some(0);
1✔
4607
        }
1✔
4608
        let empty = source.refresh(None, Some(5)).unwrap();
1✔
4609
        assert!(empty.records.is_empty());
1✔
4610
        assert_eq!(empty.cursor.revision, 0);
1✔
4611

4612
        let path = dir.path().join("rows.jsonl");
1✔
4613
        fs::write(
1✔
4614
            &path,
1✔
4615
            b"{\"id\":\"a\",\"text\":\"A\"}\n{\"id\":\"b\",\"text\":\"B\"}\n",
4616
        )
4617
        .unwrap();
1✔
4618
        let mut cfg2 = config;
1✔
4619
        cfg2.checkpoint_stride = 1;
1✔
4620
        let source2 = test_source(cfg2.clone());
1✔
4621
        let shard = HuggingFaceRowSource::index_single_shard(&cfg2, &path, 0)
1✔
4622
            .unwrap()
1✔
4623
            .unwrap();
1✔
4624
        {
1✔
4625
            let mut state = source2.state.lock().unwrap();
1✔
4626
            state.materialized_rows = 2;
1✔
4627
            state.total_rows = Some(2);
1✔
4628
            state.shards = vec![shard];
1✔
4629
        }
1✔
4630
        let cursor = SourceCursor {
1✔
4631
            last_seen: Utc::now(),
1✔
4632
            revision: 99,
1✔
4633
        };
1✔
4634
        let snapshot = source2.refresh(Some(&cursor), Some(1)).unwrap();
1✔
4635
        assert_eq!(snapshot.records.len(), 1);
1✔
4636
    }
1✔
4637

4638
    #[test]
4639
    fn new_rejects_zero_checkpoint_stride() {
1✔
4640
        let dir = tempdir().unwrap();
1✔
4641
        let mut config = test_config(dir.path().to_path_buf());
1✔
4642
        config.checkpoint_stride = 0;
1✔
4643
        let result = HuggingFaceRowSource::new(config);
1✔
4644
        assert!(result.is_err());
1✔
4645
    }
1✔
4646

4647
    #[test]
4648
    fn parse_global_row_count_response_returns_none_when_split_missing() {
1✔
4649
        let dir = tempdir().unwrap();
1✔
4650
        let config = test_config(dir.path().to_path_buf());
1✔
4651
        let body = r#"{
1✔
4652
            "size": {
1✔
4653
                "splits": [
1✔
4654
                    {"config":"main","split":"test","num_rows":7}
1✔
4655
                ]
1✔
4656
            }
1✔
4657
        }"#;
1✔
4658

4659
        let parsed = HuggingFaceRowSource::parse_global_row_count_response(&config, body).unwrap();
1✔
4660
        assert_eq!(parsed, None);
1✔
4661
    }
1✔
4662

4663
    #[test]
4664
    fn extract_split_row_count_uses_config_num_rows_when_split_empty() {
1✔
4665
        let payload = serde_json::json!({
1✔
4666
            "size": {
1✔
4667
                "configs": [
1✔
4668
                    {
4669
                        "config": "main",
1✔
4670
                        "num_rows": 123,
1✔
4671
                        "splits": [
1✔
4672
                            {"split": "train", "num_rows": 999}
1✔
4673
                        ]
4674
                    }
4675
                ]
4676
            }
4677
        });
4678

4679
        let rows =
1✔
4680
            HuggingFaceRowSource::extract_split_row_count_from_size_response(&payload, "main", "");
1✔
4681
        assert_eq!(rows, Some(123));
1✔
4682
    }
1✔
4683

4684
    #[test]
4685
    fn extract_split_row_count_uses_dataset_num_rows_when_split_empty() {
1✔
4686
        let payload = serde_json::json!({
1✔
4687
            "size": {
1✔
4688
                "dataset": {
1✔
4689
                    "num_rows": 77
1✔
4690
                }
4691
            }
4692
        });
4693

4694
        let rows =
1✔
4695
            HuggingFaceRowSource::extract_split_row_count_from_size_response(&payload, "main", "");
1✔
4696
        assert_eq!(rows, Some(77));
1✔
4697
    }
1✔
4698
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc