• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vnvo / deltaforge / 20521769545

26 Dec 2025 11:37AM UTC coverage: 63.717% (+2.7%) from 60.979%
20521769545

Pull #32

github

web-flow
Merge 00270ffbb into 8977b1707
Pull Request #32: Feat/tursosource

2693 of 4143 new or added lines in 27 files covered. (65.0%)

70 existing lines in 5 files now uncovered.

4861 of 7629 relevant lines covered (63.72%)

3.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/crates/runner/src/sensing_api.rs
1
//! Sensing API - Schema sensing and drift detection endpoints.
2

3
use std::collections::HashMap;
4
use std::sync::Arc;
5

6
use rest_api::{
7
    CacheStats, ColumnDrift, DriftInfo, InferredField, InferredSchemaDetail,
8
    InferredSchemaInfo, PipelineAPIError, SensingController, TableCacheStats,
9
};
10
use schema_sensing::InferredSchemaType;
11
use serde_json::Value;
12

13
use crate::drift_detector::DriftEvent;
14
use crate::pipeline_manager::PipelineManager;
15

16
/// Newtype wrapper providing SensingController implementation.
17
#[derive(Clone)]
18
pub struct SensingApi(pub Arc<PipelineManager>);
19

20
impl SensingApi {
NEW
21
    pub fn new(manager: Arc<PipelineManager>) -> Self {
×
NEW
22
        Self(manager)
×
NEW
23
    }
×
24
}
25

26
#[async_trait::async_trait]
27
impl SensingController for SensingApi {
28
    async fn list_inferred(
29
        &self,
30
        pipeline: &str,
NEW
31
    ) -> Result<Vec<InferredSchemaInfo>, PipelineAPIError> {
×
32
        let sensor_state = self.0.get_sensor(pipeline)?;
33
        let guard = sensor_state.sensor().lock();
34

35
        // Use all_snapshots() which has all the info we need
36
        Ok(guard
37
            .all_snapshots()
38
            .into_iter()
39
            .map(|s| InferredSchemaInfo {
NEW
40
                table: s.table,
×
NEW
41
                fingerprint: s.fingerprint,
×
NEW
42
                sequence: s.sequence,
×
NEW
43
                event_count: s.event_count,
×
NEW
44
                stabilized: s.stabilized,
×
NEW
45
                first_seen: s.first_seen,
×
NEW
46
                last_seen: s.last_seen,
×
NEW
47
            })
×
48
            .collect())
NEW
49
    }
×
50

51
    async fn get_inferred(
52
        &self,
53
        pipeline: &str,
54
        table: &str,
NEW
55
    ) -> Result<InferredSchemaDetail, PipelineAPIError> {
×
56
        let sensor_state = self.0.get_sensor(pipeline)?;
57
        let guard = sensor_state.sensor().lock();
58

NEW
59
        let snapshot = guard.get_snapshot(table).ok_or_else(|| {
×
NEW
60
            PipelineAPIError::NotFound(format!("schema for table {}", table))
×
NEW
61
        })?;
×
62

63
        // Extract field info from the inferred schema
64
        let fields = extract_fields_from_schema(&snapshot.schema);
65

66
        Ok(InferredSchemaDetail {
67
            table: snapshot.table,
68
            fingerprint: snapshot.fingerprint,
69
            sequence: snapshot.sequence,
70
            event_count: snapshot.event_count,
71
            stabilized: snapshot.stabilized,
72
            fields,
73
            first_seen: snapshot.first_seen,
74
            last_seen: snapshot.last_seen,
75
        })
NEW
76
    }
×
77

78
    async fn export_json_schema(
79
        &self,
80
        pipeline: &str,
81
        table: &str,
NEW
82
    ) -> Result<Value, PipelineAPIError> {
×
83
        let sensor_state = self.0.get_sensor(pipeline)?;
84
        let guard = sensor_state.sensor().lock();
85

NEW
86
        let json_schema = guard.to_json_schema(table).ok_or_else(|| {
×
NEW
87
            PipelineAPIError::NotFound(format!("schema for table {}", table))
×
NEW
88
        })?;
×
89

90
        serde_json::to_value(json_schema)
NEW
91
            .map_err(|e| PipelineAPIError::Failed(e.into()))
×
NEW
92
    }
×
93

94
    async fn get_drift(
95
        &self,
96
        pipeline: &str,
NEW
97
    ) -> Result<Vec<DriftInfo>, PipelineAPIError> {
×
98
        let sensor_state = self.0.get_sensor(pipeline)?;
99
        let guard = sensor_state.drift_detector().lock();
100

101
        Ok(guard
102
            .all_summaries()
103
            .into_iter()
NEW
104
            .map(|summary| {
×
105
                // Get detailed drift events for this table
NEW
106
                let columns = guard
×
NEW
107
                    .tracker(&summary.table)
×
NEW
108
                    .map(|tracker| build_column_drift(tracker.drift_events()))
×
NEW
109
                    .unwrap_or_default();
×
110

NEW
111
                DriftInfo {
×
NEW
112
                    table: summary.table,
×
NEW
113
                    has_drift: summary.total_drift_events > 0,
×
NEW
114
                    columns,
×
NEW
115
                    events_analyzed: 0, // TODO: add total_events to DriftSummary
×
NEW
116
                    events_with_drift: summary.total_drift_events as u64,
×
NEW
117
                }
×
NEW
118
            })
×
119
            .collect())
NEW
120
    }
×
121

122
    async fn get_table_drift(
123
        &self,
124
        pipeline: &str,
125
        table: &str,
NEW
126
    ) -> Result<DriftInfo, PipelineAPIError> {
×
127
        let sensor_state = self.0.get_sensor(pipeline)?;
128
        let guard = sensor_state.drift_detector().lock();
129

NEW
130
        let tracker = guard.tracker(table).ok_or_else(|| {
×
NEW
131
            PipelineAPIError::NotFound(format!(
×
NEW
132
                "drift info for table {}",
×
NEW
133
                table
×
NEW
134
            ))
×
NEW
135
        })?;
×
136

137
        let summary = tracker.summary();
138
        let columns = build_column_drift(tracker.drift_events());
139

140
        Ok(DriftInfo {
141
            table: summary.table,
142
            has_drift: summary.total_drift_events > 0,
143
            columns,
144
            events_analyzed: 0,
145
            events_with_drift: summary.total_drift_events as u64,
146
        })
NEW
147
    }
×
148

149
    async fn get_cache_stats(
150
        &self,
151
        pipeline: &str,
NEW
152
    ) -> Result<CacheStats, PipelineAPIError> {
×
153
        let sensor_state = self.0.get_sensor(pipeline)?;
154
        let guard = sensor_state.sensor().lock();
155

156
        // Use all_cache_stats() which returns full statistics
157
        let stats = guard.all_cache_stats();
158

159
        let total_hits: u64 = stats.iter().map(|s| s.cache_hits).sum();
160
        let total_misses: u64 = stats.iter().map(|s| s.cache_misses).sum();
161
        let total = total_hits + total_misses;
162
        let hit_rate = if total > 0 {
163
            total_hits as f64 / total as f64
164
        } else {
165
            0.0
166
        };
167

168
        let tables = stats
169
            .into_iter()
170
            .map(|s| TableCacheStats {
NEW
171
                table: s.table,
×
NEW
172
                cached_structures: s.cached_structures,
×
NEW
173
                max_cache_size: s.max_cache_size,
×
NEW
174
                cache_hits: s.cache_hits,
×
NEW
175
                cache_misses: s.cache_misses,
×
NEW
176
            })
×
177
            .collect();
178

179
        Ok(CacheStats {
180
            tables,
181
            total_cache_hits: total_hits,
182
            total_cache_misses: total_misses,
183
            hit_rate,
184
        })
NEW
185
    }
×
186
}
187

188
// ============================================================================
189
// Schema Field Extraction
190
// ============================================================================
191

192
/// Extract field information from schema_analysis::Schema.
NEW
193
fn extract_fields_from_schema(
×
NEW
194
    schema: &InferredSchemaType,
×
NEW
195
) -> Vec<InferredField> {
×
NEW
196
    match schema {
×
NEW
197
        InferredSchemaType::Struct { fields, .. } => fields
×
NEW
198
            .iter()
×
NEW
199
            .map(|(name, field)| {
×
NEW
200
                let types = field
×
NEW
201
                    .schema
×
NEW
202
                    .as_ref()
×
NEW
203
                    .map(|s| vec![schema_type_name(s)])
×
NEW
204
                    .unwrap_or_default();
×
205

NEW
206
                let (array_element_types, nested_field_count) = field
×
NEW
207
                    .schema
×
NEW
208
                    .as_ref()
×
NEW
209
                    .map(extract_nested_info)
×
NEW
210
                    .unwrap_or((None, None));
×
211

NEW
212
                InferredField {
×
NEW
213
                    name: name.clone(),
×
NEW
214
                    types,
×
NEW
215
                    nullable: field.status.may_be_null,
×
NEW
216
                    optional: field.status.may_be_missing,
×
NEW
217
                    array_element_types,
×
NEW
218
                    nested_field_count,
×
NEW
219
                }
×
NEW
220
            })
×
NEW
221
            .collect(),
×
NEW
222
        _ => vec![],
×
223
    }
NEW
224
}
×
225

226
/// Get human-readable type name for a schema.
NEW
227
fn schema_type_name(schema: &InferredSchemaType) -> String {
×
NEW
228
    match schema {
×
NEW
229
        InferredSchemaType::Null(_) => "null".to_string(),
×
NEW
230
        InferredSchemaType::Boolean(_) => "boolean".to_string(),
×
NEW
231
        InferredSchemaType::Integer(_) => "integer".to_string(),
×
NEW
232
        InferredSchemaType::Float(_) => "number".to_string(),
×
NEW
233
        InferredSchemaType::String(_) => "string".to_string(),
×
NEW
234
        InferredSchemaType::Bytes(_) => "bytes".to_string(),
×
NEW
235
        InferredSchemaType::Sequence { .. } => "array".to_string(),
×
NEW
236
        InferredSchemaType::Struct { .. } => "object".to_string(),
×
NEW
237
        InferredSchemaType::Union { variants } => {
×
NEW
238
            let types: Vec<_> = variants.iter().map(schema_type_name).collect();
×
NEW
239
            types.join(" | ")
×
240
        }
241
    }
NEW
242
}
×
243

244
/// Extract array element types and nested field count.
NEW
245
fn extract_nested_info(
×
NEW
246
    schema: &InferredSchemaType,
×
NEW
247
) -> (Option<Vec<String>>, Option<usize>) {
×
NEW
248
    match schema {
×
NEW
249
        InferredSchemaType::Sequence { field, .. } => {
×
NEW
250
            let element_types =
×
NEW
251
                field.schema.as_ref().map(|s| vec![schema_type_name(s)]);
×
NEW
252
            (element_types, None)
×
253
        }
NEW
254
        InferredSchemaType::Struct { fields, .. } => (None, Some(fields.len())),
×
NEW
255
        _ => (None, None),
×
256
    }
NEW
257
}
×
258

259
// ============================================================================
260
// Drift Column Aggregation
261
// ============================================================================
262

263
/// Build ColumnDrift entries from DriftEvents, grouped by column.
NEW
264
fn build_column_drift(events: &[DriftEvent]) -> Vec<ColumnDrift> {
×
NEW
265
    let mut by_column: HashMap<String, Vec<&DriftEvent>> = HashMap::new();
×
266

NEW
267
    for event in events {
×
NEW
268
        by_column
×
NEW
269
            .entry(event.column.clone())
×
NEW
270
            .or_default()
×
NEW
271
            .push(event);
×
NEW
272
    }
×
273

NEW
274
    by_column
×
NEW
275
        .into_iter()
×
NEW
276
        .map(|(column, events)| {
×
NEW
277
            let expected_type = events
×
NEW
278
                .first()
×
NEW
279
                .map(|e| e.expected.clone())
×
NEW
280
                .unwrap_or_default();
×
281

NEW
282
            let observed_types: Vec<String> = events
×
NEW
283
                .iter()
×
NEW
284
                .map(|e| e.observed.clone())
×
NEW
285
                .collect::<std::collections::HashSet<_>>()
×
NEW
286
                .into_iter()
×
NEW
287
                .collect();
×
288

NEW
289
            let mismatch_count: u64 = events.iter().map(|e| e.count).sum();
×
290

NEW
291
            let examples: Vec<String> = events
×
NEW
292
                .iter()
×
NEW
293
                .flat_map(|e| e.samples.iter().cloned())
×
NEW
294
                .take(5)
×
NEW
295
                .collect();
×
296

NEW
297
            ColumnDrift {
×
NEW
298
                column,
×
NEW
299
                expected_type,
×
NEW
300
                observed_types,
×
NEW
301
                mismatch_count,
×
NEW
302
                examples,
×
NEW
303
            }
×
NEW
304
        })
×
NEW
305
        .collect()
×
NEW
306
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc