• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vnvo / deltaforge / 20521769545

26 Dec 2025 11:37AM UTC coverage: 63.717% (+2.7%) from 60.979%
20521769545

Pull #32

github

web-flow
Merge 00270ffbb into 8977b1707
Pull Request #32: Feat/tursosource

2693 of 4143 new or added lines in 27 files covered. (65.0%)

70 existing lines in 5 files now uncovered.

4861 of 7629 relevant lines covered (63.72%)

3.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.4
/crates/schema-sensing/src/sensor.rs
1
use std::collections::{HashMap, HashSet};
2
use std::hash::{Hash, Hasher};
3

4
use schema_analysis::InferredSchema;
5
use serde::de::DeserializeSeed;
6
use tracing::{debug, info, trace, warn};
7

8
use deltaforge_config::SchemaSensingConfig;
9

10
use crate::errors::{SensorError, SensorResult};
11
use crate::json_schema::JsonSchema;
12
use crate::schema_state::{
13
    SchemaSnapshot, SensedSchemaVersion, TableSchemaState,
14
};
15

16
/// Result of observing an event.
17
#[derive(Debug, Clone, PartialEq, Eq)]
18
pub enum ObserveResult {
19
    /// Schema sensing is disabled for this table
20
    Disabled,
21

22
    /// First event for this table - schema initialized
23
    NewSchema { fingerprint: String, sequence: u64 },
24

25
    /// Schema changed from observing this event
26
    Evolved {
27
        old_fingerprint: String,
28
        new_fingerprint: String,
29
        old_sequence: u64,
30
        new_sequence: u64,
31
    },
32

33
    /// Schema unchanged
34
    Unchanged { fingerprint: String, sequence: u64 },
35

36
    /// Sampling limit reached - schema stabilized
37
    Stabilized { fingerprint: String, sequence: u64 },
38

39
    /// Skipped due to structure cache hit (fast path)
40
    CacheHit { fingerprint: String, sequence: u64 },
41

42
    /// Skipped due to sampling (not selected for this event)
43
    Sampled { fingerprint: String, sequence: u64 },
44
}
45

46
/// Compute a lightweight structure fingerprint from top-level JSON keys only.
47
///
48
/// This is MUCH faster than traversing nested structures. We only hash:
49
/// - The set of top-level key names (for objects)
50
/// - The type tag (for primitives/arrays)
51
///
52
/// This is sufficient to detect most schema changes (new/removed columns)
53
/// while being O(keys) instead of O(total_nodes).
54
fn compute_structure_hash(value: &serde_json::Value) -> u64 {
15✔
55
    use std::collections::hash_map::DefaultHasher;
56
    let mut hasher = DefaultHasher::new();
15✔
57
    hash_top_level_structure(value, &mut hasher);
15✔
58
    hasher.finish()
15✔
59
}
15✔
60

61
/// Hash only top-level structure - no recursion into nested objects.
62
fn hash_top_level_structure<H: Hasher>(
15✔
63
    value: &serde_json::Value,
15✔
64
    hasher: &mut H,
15✔
65
) {
15✔
66
    match value {
15✔
NEW
67
        serde_json::Value::Null => 0u8.hash(hasher),
×
NEW
68
        serde_json::Value::Bool(_) => 1u8.hash(hasher),
×
NEW
69
        serde_json::Value::Number(_) => 2u8.hash(hasher),
×
NEW
70
        serde_json::Value::String(_) => 3u8.hash(hasher),
×
NEW
71
        serde_json::Value::Array(_) => {
×
NEW
72
            // Just mark as array - don't inspect elements
×
NEW
73
            4u8.hash(hasher);
×
NEW
74
        }
×
75
        serde_json::Value::Object(obj) => {
15✔
76
            5u8.hash(hasher);
15✔
77
            // Hash number of keys (fast structural check)
78
            obj.len().hash(hasher);
15✔
79
            // Hash sorted key names only - O(k log k) where k = number of keys
80
            let mut keys: Vec<_> = obj.keys().collect();
15✔
81
            keys.sort_unstable();
15✔
82
            for key in keys {
42✔
83
                key.hash(hasher);
27✔
84
            }
27✔
85
        }
86
    }
87
}
15✔
88

89
/// Cache statistics for a single table.
90
#[derive(Debug, Clone)]
91
pub struct CacheStatsEntry {
92
    pub table: String,
93
    pub cached_structures: usize,
94
    pub max_cache_size: usize,
95
    pub cache_hits: u64,
96
    pub cache_misses: u64,
97
}
98

99
/// Per-table structure cache for fast-path skipping.
100
struct StructureCache {
101
    seen: HashSet<u64>,
102
    max_size: usize,
103
    hits: u64,
104
    misses: u64,
105
}
106

107
impl StructureCache {
108
    fn new(max_size: usize) -> Self {
7✔
109
        Self {
7✔
110
            seen: HashSet::new(),
7✔
111
            max_size,
7✔
112
            hits: 0,
7✔
113
            misses: 0,
7✔
114
        }
7✔
115
    }
7✔
116

117
    fn check_and_insert(&mut self, hash: u64) -> bool {
15✔
118
        if self.seen.contains(&hash) {
15✔
119
            self.hits += 1;
5✔
120
            return true;
5✔
121
        }
10✔
122
        self.misses += 1;
10✔
123
        if self.seen.len() < self.max_size {
10✔
124
            self.seen.insert(hash);
10✔
125
        }
10✔
126
        false
10✔
127
    }
15✔
128

129
    fn clear(&mut self) {
3✔
130
        self.seen.clear();
3✔
131
    }
3✔
132

133
    fn stats(&self) -> (usize, usize, u64, u64) {
2✔
134
        (self.seen.len(), self.max_size, self.hits, self.misses)
2✔
135
    }
2✔
136
}
137

138
/// Universal schema sensor with performance optimizations.
139
///
140
/// Infers and tracks schema from JSON payloads across all tables.
141
/// Uses `schema_analysis` for robust type inference.
142
///
143
/// Performance features:
144
/// - Structure caching: Skip payloads with identical key structure
145
/// - Sampling: After warmup, only analyze a fraction of events
146
/// - Stabilization: Stop after max_sample_size events per table
147
pub struct SchemaSensor {
148
    /// Configuration
149
    config: SchemaSensingConfig,
150

151
    /// Per-table schema state
152
    schemas: HashMap<String, TableSchemaState>,
153

154
    /// Per-table structure cache for fast-path skipping
155
    structure_caches: HashMap<String, StructureCache>,
156
}
157

158
impl SchemaSensor {
159
    pub fn new(config: SchemaSensingConfig) -> Self {
9✔
160
        Self {
9✔
161
            config,
9✔
162
            schemas: HashMap::new(),
9✔
163
            structure_caches: HashMap::new(),
9✔
164
        }
9✔
165
    }
9✔
166

167
    /// Create a sensor with sensing enabled (for testing).
168
    pub fn enabled() -> Self {
2✔
169
        Self::new(SchemaSensingConfig {
2✔
170
            enabled: true,
2✔
171
            ..Default::default()
2✔
172
        })
2✔
173
    }
2✔
174

175
    /// Check if sensing is enabled.
NEW
176
    pub fn is_enabled(&self) -> bool {
×
NEW
177
        self.config.enabled
×
NEW
178
    }
×
179

180
    /// Observe a pre-parsed JSON value (optimized path).
181
    ///
182
    /// This is the preferred method when you already have a parsed Value,
183
    /// as it can use structure caching without re-parsing.
184
    pub fn observe_value(
24✔
185
        &mut self,
24✔
186
        table: &str,
24✔
187
        value: &serde_json::Value,
24✔
188
    ) -> SensorResult<ObserveResult> {
24✔
189
        // Check if sensing is enabled for this table
190
        if !self.config.should_sense_table(table) {
24✔
NEW
191
            return Ok(ObserveResult::Disabled);
×
192
        }
24✔
193

194
        // Get current event count for sampling decision
195
        let event_count =
24✔
196
            self.schemas.get(table).map(|s| s.event_count).unwrap_or(0);
24✔
197

198
        // Check if we've hit the stabilization limit
199
        if let Some(state) = self.schemas.get(table) {
24✔
200
            if state.stabilized {
19✔
NEW
201
                return Ok(ObserveResult::Stabilized {
×
NEW
202
                    fingerprint: state.fingerprint.clone(),
×
NEW
203
                    sequence: state.sequence,
×
NEW
204
                });
×
205
            }
19✔
206
        }
5✔
207

208
        // Structure cache check (fast path) - O(keys) for top-level only
209
        if self.config.sampling.structure_cache {
24✔
210
            let structure_hash = compute_structure_hash(value);
9✔
211
            let cache = self
9✔
212
                .structure_caches
9✔
213
                .entry(table.to_string())
9✔
214
                .or_insert_with(|| {
9✔
215
                    StructureCache::new(
4✔
216
                        self.config.sampling.structure_cache_size,
4✔
217
                    )
218
                });
4✔
219

220
            if cache.check_and_insert(structure_hash) {
9✔
221
                // Cache hit - skip full sensing
222
                if let Some(state) = self.schemas.get_mut(table) {
3✔
223
                    state.record_observation();
3✔
224

225
                    // Check stabilization even on cache hit
226
                    let max_samples = self.config.deep_inspect.max_sample_size;
3✔
227
                    if max_samples > 0 && state.event_count >= max_samples as u64 && !state.stabilized {
3✔
NEW
228
                        state.mark_stabilized();
×
NEW
229
                        return Ok(ObserveResult::Stabilized {
×
NEW
230
                            fingerprint: state.fingerprint.clone(),
×
NEW
231
                            sequence: state.sequence,
×
NEW
232
                        });
×
233
                    }
3✔
234

235
                    return Ok(ObserveResult::CacheHit {
3✔
236
                        fingerprint: state.fingerprint.clone(),
3✔
237
                        sequence: state.sequence,
3✔
238
                    });
3✔
NEW
239
                }
×
240
                // No schema yet - fall through to full sensing
241
            }
6✔
242
        }
15✔
243

244
        // Sampling check (after warmup)
245
        if !self.config.should_sample(event_count) {
21✔
246
            if let Some(state) = self.schemas.get_mut(table) {
5✔
247
                state.record_observation();
5✔
248
                return Ok(ObserveResult::Sampled {
5✔
249
                    fingerprint: state.fingerprint.clone(),
5✔
250
                    sequence: state.sequence,
5✔
251
                });
5✔
NEW
252
            }
×
253
        }
16✔
254

255
        // Full sensing path - serialize and process
256
        let json = serde_json::to_vec(value)
16✔
257
            .map_err(|e| SensorError::Serialization(e.to_string()))?;
16✔
258
        self.observe_bytes(table, &json, event_count)
16✔
259
    }
24✔
260

261
    /// Observe raw JSON bytes.
262
    pub fn observe(
8✔
263
        &mut self,
8✔
264
        table: &str,
8✔
265
        json: &[u8],
8✔
266
    ) -> SensorResult<ObserveResult> {
8✔
267
        // Check if sensing is enabled for this table
268
        if !self.config.should_sense_table(table) {
8✔
269
            return Ok(ObserveResult::Disabled);
1✔
270
        }
7✔
271

272
        let event_count =
7✔
273
            self.schemas.get(table).map(|s| s.event_count).unwrap_or(0);
7✔
274

275
        // Check stabilization
276
        if let Some(state) = self.schemas.get(table) {
7✔
277
            if state.stabilized {
4✔
278
                return Ok(ObserveResult::Stabilized {
1✔
279
                    fingerprint: state.fingerprint.clone(),
1✔
280
                    sequence: state.sequence,
1✔
281
                });
1✔
282
            }
3✔
283
        }
3✔
284

285
        // For bytes path with structure caching, we need to parse once
286
        // and reuse for both cache check and sensing
287
        if self.config.sampling.structure_cache {
6✔
288
            if let Ok(value) = serde_json::from_slice::<serde_json::Value>(json)
6✔
289
            {
290
                let structure_hash = compute_structure_hash(&value);
6✔
291
                let cache = self
6✔
292
                    .structure_caches
6✔
293
                    .entry(table.to_string())
6✔
294
                    .or_insert_with(|| {
6✔
295
                        StructureCache::new(
3✔
296
                            self.config.sampling.structure_cache_size,
3✔
297
                        )
298
                    });
3✔
299

300
                if cache.check_and_insert(structure_hash) {
6✔
301
                    // Cache hit
302
                    if let Some(state) = self.schemas.get_mut(table) {
2✔
303
                        state.record_observation();
2✔
304

305
                        // Check stabilization even on cache hit
306
                        let max_samples = self.config.deep_inspect.max_sample_size;
2✔
307
                        if max_samples > 0 && state.event_count >= max_samples as u64 && !state.stabilized {
2✔
308
                            state.mark_stabilized();
1✔
309
                            return Ok(ObserveResult::Stabilized {
1✔
310
                                fingerprint: state.fingerprint.clone(),
1✔
311
                                sequence: state.sequence,
1✔
312
                            });
1✔
313
                        }
1✔
314

315
                        return Ok(ObserveResult::CacheHit {
1✔
316
                            fingerprint: state.fingerprint.clone(),
1✔
317
                            sequence: state.sequence,
1✔
318
                        });
1✔
NEW
319
                    }
×
320
                }
4✔
321

322
                // Cache miss - check sampling
323
                if !self.config.should_sample(event_count) {
4✔
NEW
324
                    if let Some(state) = self.schemas.get_mut(table) {
×
NEW
325
                        state.record_observation();
×
NEW
326
                        return Ok(ObserveResult::Sampled {
×
NEW
327
                            fingerprint: state.fingerprint.clone(),
×
NEW
328
                            sequence: state.sequence,
×
NEW
329
                        });
×
NEW
330
                    }
×
331
                }
4✔
332

333
                // Need full sensing - use already-parsed JSON bytes
334
                return self.observe_bytes(table, json, event_count);
4✔
NEW
335
            }
×
NEW
336
        }
×
337

338
        // No cache path - just check sampling
NEW
339
        if !self.config.should_sample(event_count) {
×
NEW
340
            if let Some(state) = self.schemas.get_mut(table) {
×
NEW
341
                state.record_observation();
×
NEW
342
                return Ok(ObserveResult::Sampled {
×
NEW
343
                    fingerprint: state.fingerprint.clone(),
×
NEW
344
                    sequence: state.sequence,
×
NEW
345
                });
×
NEW
346
            }
×
NEW
347
        }
×
348

NEW
349
        self.observe_bytes(table, json, event_count)
×
350
    }
8✔
351

352
    /// Internal: perform full schema sensing on bytes.
353
    fn observe_bytes(
20✔
354
        &mut self,
20✔
355
        table: &str,
20✔
356
        json: &[u8],
20✔
357
        event_count: u64,
20✔
358
    ) -> SensorResult<ObserveResult> {
20✔
359
        // Check if we already have state for this table
360
        if let Some(state) = self.schemas.get_mut(table) {
20✔
361
            // Check sampling limit for stabilization
362
            let max_samples = self.config.deep_inspect.max_sample_size;
12✔
363
            if max_samples > 0 && event_count >= max_samples as u64 {
12✔
NEW
364
                state.mark_stabilized();
×
NEW
365
                info!(
×
366
                    table = %table,
367
                    events = state.event_count,
NEW
368
                    "schema stabilized after sampling limit"
×
369
                );
NEW
370
                return Ok(ObserveResult::Stabilized {
×
NEW
371
                    fingerprint: state.fingerprint.clone(),
×
NEW
372
                    sequence: state.sequence,
×
NEW
373
                });
×
374
            }
12✔
375

376
            // Expand schema with new observation
377
            let old_fingerprint = state.fingerprint.clone();
12✔
378
            let old_sequence = state.sequence;
12✔
379

380
            // Use DeserializeSeed to expand the existing schema
381
            let mut de = serde_json::Deserializer::from_slice(json);
12✔
382
            if let Err(e) = state.inferred.deserialize(&mut de) {
12✔
NEW
383
                warn!(table = %table, error = %e, "failed to expand schema");
×
NEW
384
                state.record_observation();
×
NEW
385
                return Ok(ObserveResult::Unchanged {
×
NEW
386
                    fingerprint: state.fingerprint.clone(),
×
NEW
387
                    sequence: state.sequence,
×
NEW
388
                });
×
389
            }
12✔
390

391
            state.record_observation();
12✔
392
            state.update_fingerprint();
12✔
393

394
            if state.fingerprint != old_fingerprint {
12✔
395
                debug!(
3✔
396
                    table = %table,
397
                    old_fp = %old_fingerprint,
398
                    new_fp = %state.fingerprint,
399
                    sequence = state.sequence,
NEW
400
                    "schema evolved"
×
401
                );
402

403
                // Clear structure cache on evolution
404
                if let Some(cache) = self.structure_caches.get_mut(table) {
3✔
405
                    cache.clear();
3✔
406
                }
3✔
407

408
                Ok(ObserveResult::Evolved {
3✔
409
                    old_fingerprint,
3✔
410
                    new_fingerprint: state.fingerprint.clone(),
3✔
411
                    old_sequence,
3✔
412
                    new_sequence: state.sequence,
3✔
413
                })
3✔
414
            } else {
415
                trace!(table = %table, events = state.event_count, "schema unchanged");
9✔
416
                Ok(ObserveResult::Unchanged {
9✔
417
                    fingerprint: state.fingerprint.clone(),
9✔
418
                    sequence: state.sequence,
9✔
419
                })
9✔
420
            }
421
        } else {
422
            // First event for this table - create initial schema
423
            let inferred: InferredSchema = serde_json::from_slice(json)?;
8✔
424
            let state = TableSchemaState::new(table.to_string(), inferred);
8✔
425

426
            let result = ObserveResult::NewSchema {
8✔
427
                fingerprint: state.fingerprint.clone(),
8✔
428
                sequence: state.sequence,
8✔
429
            };
8✔
430

431
            info!(
8✔
432
                table = %table,
433
                fingerprint = %state.fingerprint,
NEW
434
                "new schema discovered"
×
435
            );
436

437
            self.schemas.insert(table.to_string(), state);
8✔
438
            Ok(result)
8✔
439
        }
440
    }
20✔
441

442
    /// Get schema version info for a table.
443
    pub fn get_version(&self, table: &str) -> Option<SensedSchemaVersion> {
1✔
444
        self.schemas.get(table).map(|s| s.version())
1✔
445
    }
1✔
446

447
    /// Get full schema snapshot for a table.
NEW
448
    pub fn get_snapshot(&self, table: &str) -> Option<SchemaSnapshot> {
×
NEW
449
        self.schemas.get(table).map(SchemaSnapshot::from)
×
NEW
450
    }
×
451

452
    /// Get all table names being tracked.
NEW
453
    pub fn tables(&self) -> Vec<&str> {
×
NEW
454
        self.schemas.keys().map(|s| s.as_str()).collect()
×
NEW
455
    }
×
456

457
    /// Get snapshots for all tables.
NEW
458
    pub fn all_snapshots(&self) -> Vec<SchemaSnapshot> {
×
NEW
459
        self.schemas.values().map(SchemaSnapshot::from).collect()
×
NEW
460
    }
×
461

462
    /// Get the raw inferred schema for a table.
NEW
463
    pub fn get_schema(&self, table: &str) -> Option<&schema_analysis::Schema> {
×
NEW
464
        self.schemas.get(table).map(|s| s.schema())
×
NEW
465
    }
×
466

467
    /// Get event count for a table.
468
    pub fn event_count(&self, table: &str) -> u64 {
1✔
469
        self.schemas.get(table).map(|s| s.event_count).unwrap_or(0)
1✔
470
    }
1✔
471

472
    /// Check if a table's schema has stabilized.
473
    pub fn is_stabilized(&self, table: &str) -> bool {
1✔
474
        self.schemas
1✔
475
            .get(table)
1✔
476
            .map(|s| s.stabilized)
1✔
477
            .unwrap_or(false)
1✔
478
    }
1✔
479

480
    /// Get cache statistics for a table.
481
    pub fn cache_stats(&self, table: &str) -> Option<CacheStatsEntry> {
2✔
482
        self.structure_caches.get(table).map(|c| {
2✔
483
            let (cached, max_size, hits, misses) = c.stats();
2✔
484
            CacheStatsEntry {
2✔
485
                table: table.to_string(),
2✔
486
                cached_structures: cached,
2✔
487
                max_cache_size: max_size,
2✔
488
                cache_hits: hits,
2✔
489
                cache_misses: misses,
2✔
490
            }
2✔
491
        })
2✔
492
    }
2✔
493

494
    /// Get cache statistics for all tables.
NEW
495
    pub fn all_cache_stats(&self) -> Vec<CacheStatsEntry> {
×
NEW
496
        self.structure_caches
×
NEW
497
            .iter()
×
NEW
498
            .map(|(table, cache)| {
×
NEW
499
                let (cached, max_size, hits, misses) = cache.stats();
×
NEW
500
                CacheStatsEntry {
×
NEW
501
                    table: table.clone(),
×
NEW
502
                    cached_structures: cached,
×
NEW
503
                    max_cache_size: max_size,
×
NEW
504
                    cache_hits: hits,
×
NEW
505
                    cache_misses: misses,
×
NEW
506
                }
×
NEW
507
            })
×
NEW
508
            .collect()
×
NEW
509
    }
×
510

511
    /// Reset tracking for a table.
NEW
512
    pub fn reset_table(&mut self, table: &str) {
×
NEW
513
        self.schemas.remove(table);
×
NEW
514
        self.structure_caches.remove(table);
×
NEW
515
    }
×
516

517
    /// Reset all tracking.
NEW
518
    pub fn reset_all(&mut self) {
×
NEW
519
        self.schemas.clear();
×
NEW
520
        self.structure_caches.clear();
×
NEW
521
    }
×
522

523
    /// Get configuration.
NEW
524
    pub fn config(&self) -> &SchemaSensingConfig {
×
NEW
525
        &self.config
×
NEW
526
    }
×
527

528
    /// Export a table's schema as JSON Schema.
NEW
529
    pub fn to_json_schema(&self, table: &str) -> Option<JsonSchema> {
×
NEW
530
        self.schemas
×
NEW
531
            .get(table)
×
NEW
532
            .map(|s| crate::json_schema::to_json_schema(s.schema()))
×
NEW
533
    }
×
534

535
    /// Export all schemas as JSON Schema.
NEW
536
    pub fn all_json_schemas(&self) -> HashMap<String, JsonSchema> {
×
NEW
537
        self.schemas
×
NEW
538
            .iter()
×
NEW
539
            .map(|(table, state)| {
×
NEW
540
                (
×
NEW
541
                    table.clone(),
×
NEW
542
                    crate::json_schema::to_json_schema(state.schema()),
×
NEW
543
                )
×
NEW
544
            })
×
NEW
545
            .collect()
×
NEW
546
    }
×
547
}
548

549
#[cfg(test)]
550
mod tests {
551
    use super::*;
552
    use deltaforge_config::SamplingConfig;
553

554
    #[test]
555
    fn test_disabled_sensor() {
1✔
556
        let mut sensor = SchemaSensor::new(SchemaSensingConfig::default());
1✔
557
        let result = sensor.observe("users", b"{}").unwrap();
1✔
558
        assert_eq!(result, ObserveResult::Disabled);
1✔
559
    }
1✔
560

561
    #[test]
562
    fn test_new_schema() {
1✔
563
        let mut sensor = SchemaSensor::enabled();
1✔
564
        let json = br#"{"id": 1, "name": "Alice"}"#;
1✔
565

566
        let result = sensor.observe("users", json).unwrap();
1✔
567
        assert!(matches!(result, ObserveResult::NewSchema { .. }));
1✔
568

569
        let version = sensor.get_version("users").unwrap();
1✔
570
        assert_eq!(version.sequence, 1);
1✔
571
    }
1✔
572

573
    #[test]
574
    fn test_structure_cache_hit() {
1✔
575
        let config = SchemaSensingConfig {
1✔
576
            enabled: true,
1✔
577
            sampling: SamplingConfig {
1✔
578
                structure_cache: true,
1✔
579
                structure_cache_size: 100,
1✔
580
                warmup_events: 1000,
1✔
581
                sample_rate: 1,
1✔
582
            },
1✔
583
            ..Default::default()
1✔
584
        };
1✔
585
        let mut sensor = SchemaSensor::new(config);
1✔
586

587
        // First event - new schema
588
        let json1 = serde_json::json!({"id": 1, "name": "Alice"});
1✔
589
        let result1 = sensor.observe_value("users", &json1).unwrap();
1✔
590
        assert!(matches!(result1, ObserveResult::NewSchema { .. }));
1✔
591

592
        // Second event with same structure - cache hit
593
        let json2 = serde_json::json!({"id": 2, "name": "Bob"});
1✔
594
        let result2 = sensor.observe_value("users", &json2).unwrap();
1✔
595
        assert!(matches!(result2, ObserveResult::CacheHit { .. }));
1✔
596

597
        // Event count should still increase
598
        assert_eq!(sensor.event_count("users"), 2);
1✔
599
    }
1✔
600

601
    #[test]
602
    fn test_structure_cache_miss_different_keys() {
1✔
603
        let config = SchemaSensingConfig {
1✔
604
            enabled: true,
1✔
605
            sampling: SamplingConfig {
1✔
606
                structure_cache: true,
1✔
607
                structure_cache_size: 100,
1✔
608
                warmup_events: 1000,
1✔
609
                sample_rate: 1,
1✔
610
            },
1✔
611
            ..Default::default()
1✔
612
        };
1✔
613
        let mut sensor = SchemaSensor::new(config);
1✔
614

615
        let json1 = serde_json::json!({"id": 1, "name": "Alice"});
1✔
616
        sensor.observe_value("users", &json1).unwrap();
1✔
617

618
        // Different top-level keys - cache miss, triggers evolution
619
        let json2 = serde_json::json!({"id": 2, "name": "Bob", "email": "bob@example.com"});
1✔
620
        let result2 = sensor.observe_value("users", &json2).unwrap();
1✔
621
        assert!(matches!(result2, ObserveResult::Evolved { .. }));
1✔
622
    }
1✔
623

624
    #[test]
625
    fn test_structure_cache_hit_nested_changes() {
1✔
626
        // With top-level-only hashing, nested structure changes don't cause cache miss
627
        let config = SchemaSensingConfig {
1✔
628
            enabled: true,
1✔
629
            sampling: SamplingConfig {
1✔
630
                structure_cache: true,
1✔
631
                structure_cache_size: 100,
1✔
632
                warmup_events: 1000,
1✔
633
                sample_rate: 1,
1✔
634
            },
1✔
635
            ..Default::default()
1✔
636
        };
1✔
637
        let mut sensor = SchemaSensor::new(config);
1✔
638

639
        let json1 = serde_json::json!({"id": 1, "data": {"a": 1}});
1✔
640
        sensor.observe_value("users", &json1).unwrap();
1✔
641

642
        // Same top-level keys, different nested structure - still cache HIT
643
        // (This is intentional for performance - we trade some accuracy for speed)
644
        let json2 = serde_json::json!({"id": 2, "data": {"a": 1, "b": 2}});
1✔
645
        let result2 = sensor.observe_value("users", &json2).unwrap();
1✔
646
        assert!(matches!(result2, ObserveResult::CacheHit { .. }));
1✔
647
    }
1✔
648

649
    #[test]
650
    fn test_sampling_after_warmup() {
1✔
651
        let config = SchemaSensingConfig {
1✔
652
            enabled: true,
1✔
653
            sampling: SamplingConfig {
1✔
654
                warmup_events: 5,
1✔
655
                sample_rate: 2,         // 50% sampling
1✔
656
                structure_cache: false, // Disable for this test
1✔
657
                ..Default::default()
1✔
658
            },
1✔
659
            ..Default::default()
1✔
660
        };
1✔
661
        let mut sensor = SchemaSensor::new(config);
1✔
662

663
        // Warmup phase - all events processed
664
        for i in 0..5 {
6✔
665
            let json = serde_json::json!({"id": i});
5✔
666
            let result = sensor.observe_value("users", &json).unwrap();
5✔
667
            assert!(!matches!(result, ObserveResult::Sampled { .. }));
5✔
668
        }
669

670
        // After warmup - sampling kicks in
671
        let mut sampled_count = 0;
1✔
672
        let mut processed_count = 0;
1✔
673
        for i in 5..15 {
11✔
674
            let json = serde_json::json!({"id": i});
10✔
675
            let result = sensor.observe_value("users", &json).unwrap();
10✔
676
            match result {
10✔
677
                ObserveResult::Sampled { .. } => sampled_count += 1,
5✔
678
                ObserveResult::Unchanged { .. }
679
                | ObserveResult::CacheHit { .. } => processed_count += 1,
5✔
NEW
680
                _ => {}
×
681
            }
682
        }
683

684
        // With 50% sampling, roughly half should be sampled
685
        assert!(sampled_count > 0, "some events should be sampled");
1✔
686
        assert!(processed_count > 0, "some events should be processed");
1✔
687
    }
1✔
688

689
    #[test]
690
    fn test_cache_cleared_on_evolution() {
1✔
691
        let config = SchemaSensingConfig {
1✔
692
            enabled: true,
1✔
693
            sampling: SamplingConfig {
1✔
694
                structure_cache: true,
1✔
695
                structure_cache_size: 100,
1✔
696
                warmup_events: 1000,
1✔
697
                sample_rate: 1,
1✔
698
            },
1✔
699
            ..Default::default()
1✔
700
        };
1✔
701
        let mut sensor = SchemaSensor::new(config);
1✔
702

703
        // Build up cache
704
        let json1 = serde_json::json!({"id": 1});
1✔
705
        sensor.observe_value("users", &json1).unwrap();
1✔
706
        let json2 = serde_json::json!({"id": 2});
1✔
707
        sensor.observe_value("users", &json2).unwrap();
1✔
708

709
        let stats = sensor.cache_stats("users").unwrap();
1✔
710
        assert_eq!(stats.cached_structures, 1); // Only one unique structure
1✔
711
        assert_eq!(stats.cache_hits, 1);   // Second json2 was a cache hit
1✔
712
        assert_eq!(stats.cache_misses, 1); // First json1 was a miss        
1✔
713

714
        // Evolution should clear cache
715
        let json3 = serde_json::json!({"id": 3, "extra": "field"});
1✔
716
        let result = sensor.observe_value("users", &json3).unwrap();
1✔
717
        assert!(matches!(result, ObserveResult::Evolved { .. }));
1✔
718

719
        let stats = sensor.cache_stats("users").unwrap();
1✔
720
        assert_eq!(stats.cached_structures, 0); // Cache cleared
1✔
721
    }
1✔
722

723
    #[test]
724
    fn test_schema_evolution() {
1✔
725
        let mut sensor = SchemaSensor::enabled();
1✔
726

727
        let json1 = br#"{"id": 1, "name": "Alice"}"#;
1✔
728
        sensor.observe("users", json1).unwrap();
1✔
729

730
        let json2 = br#"{"id": 2, "name": "Bob", "email": "bob@example.com"}"#;
1✔
731
        let result = sensor.observe("users", json2).unwrap();
1✔
732

733
        match result {
1✔
734
            ObserveResult::Evolved {
735
                old_sequence,
1✔
736
                new_sequence,
1✔
737
                ..
738
            } => {
739
                assert_eq!(old_sequence, 1);
1✔
740
                assert_eq!(new_sequence, 2);
1✔
741
            }
NEW
742
            _ => panic!("expected schema evolution, got {:?}", result),
×
743
        }
744
    }
1✔
745

746
    #[test]
747
    fn test_stabilization() {
1✔
748
        let config = SchemaSensingConfig {
1✔
749
            enabled: true,
1✔
750
            deep_inspect: deltaforge_config::DeepInspectConfig {
1✔
751
                max_sample_size: 3,
1✔
752
                ..Default::default()
1✔
753
            },
1✔
754
            ..Default::default()
1✔
755
        };
1✔
756

757
        let mut sensor = SchemaSensor::new(config);
1✔
758

759
        sensor.observe("users", br#"{"id": 1}"#).unwrap();
1✔
760
        sensor.observe("users", br#"{"id": 2}"#).unwrap();
1✔
761
        sensor.observe("users", br#"{"id": 3}"#).unwrap();
1✔
762

763
        let result = sensor.observe("users", br#"{"id": 4}"#).unwrap();
1✔
764
        assert!(matches!(result, ObserveResult::Stabilized { .. }));
1✔
765
        assert!(sensor.is_stabilized("users"));
1✔
766
    }
1✔
767
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc