• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vnvo / deltaforge / 20521769545

26 Dec 2025 11:37AM UTC coverage: 63.717% (+2.7%) from 60.979%
20521769545

Pull #32

github

web-flow
Merge 00270ffbb into 8977b1707
Pull Request #32: Feat/tursosource

2693 of 4143 new or added lines in 27 files covered. (65.0%)

70 existing lines in 5 files now uncovered.

4861 of 7629 relevant lines covered (63.72%)

3.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/crates/runner/src/schema_api.rs
1
//! Schema API - Database schema management endpoints.
2

3
use std::sync::Arc;
4
use std::time::Instant;
5

6
use rest_api::{
7
    ColumnInfo, PipelineAPIError, ReloadResult, SchemaController, SchemaDetail,
8
    SchemaInfo, SchemaVersionInfo, TableReloadStatus,
9
};
10
use serde_json::Value;
11

12
use crate::pipeline_manager::PipelineManager;
13

14
#[derive(Clone)]
15
pub struct SchemaApi(pub Arc<PipelineManager>);
16

17
impl SchemaApi {
NEW
18
    pub fn new(manager: Arc<PipelineManager>) -> Self {
×
NEW
19
        Self(manager)
×
NEW
20
    }
×
21
}
22

23
#[async_trait::async_trait]
24
impl SchemaController for SchemaApi {
25
    async fn list_schemas(
26
        &self,
27
        pipeline: &str,
NEW
28
    ) -> Result<Vec<SchemaInfo>, PipelineAPIError> {
×
29
        let loader = self.0.get_loader(pipeline)?;
30
        Ok(loader
31
            .list_cached()
32
            .await
33
            .into_iter()
34
            .map(|e| SchemaInfo {
NEW
35
                database: e.database,
×
NEW
36
                table: e.table,
×
NEW
37
                column_count: e.column_count,
×
NEW
38
                primary_key: e.primary_key,
×
NEW
39
                fingerprint: e.fingerprint,
×
NEW
40
                registry_version: e.registry_version,
×
NEW
41
            })
×
42
            .collect())
NEW
43
    }
×
44

45
    async fn get_schema(
46
        &self,
47
        pipeline: &str,
48
        db: &str,
49
        table: &str,
NEW
50
    ) -> Result<SchemaDetail, PipelineAPIError> {
×
51
        let loader = self.0.get_loader(pipeline)?;
52
        let loaded = loader
53
            .load(db, table)
54
            .await
55
            .map_err(PipelineAPIError::Failed)?;
56

57
        let columns = extract_columns(&loaded.schema_json);
58

59
        Ok(SchemaDetail {
60
            database: loaded.database,
61
            table: loaded.table,
62
            columns,
63
            primary_key: loaded.primary_key,
64
            engine: loaded
65
                .schema_json
66
                .get("engine")
NEW
67
                .and_then(|v| v.as_str())
×
68
                .map(String::from),
69
            charset: loaded
70
                .schema_json
71
                .get("charset")
NEW
72
                .and_then(|v| v.as_str())
×
73
                .map(String::from),
74
            collation: loaded
75
                .schema_json
76
                .get("collation")
NEW
77
                .and_then(|v| v.as_str())
×
78
                .map(String::from),
79
            fingerprint: loaded.fingerprint,
80
            registry_version: loaded.registry_version,
81
            loaded_at: loaded.loaded_at,
82
        })
NEW
83
    }
×
84

85
    async fn reload_schemas(
86
        &self,
87
        pipeline: &str,
NEW
88
    ) -> Result<ReloadResult, PipelineAPIError> {
×
89
        let (loader, patterns) = {
90
            let guard = self.0.pipelines.read();
NEW
91
            let runtime = guard.get(pipeline).ok_or_else(|| {
×
NEW
92
                PipelineAPIError::NotFound(pipeline.to_string())
×
NEW
93
            })?;
×
94
            (
95
                runtime.schema_loader.clone(),
96
                runtime.table_patterns.clone(),
97
            )
98
        };
99

NEW
100
        let loader = loader.ok_or_else(|| {
×
NEW
101
            PipelineAPIError::Failed(anyhow::anyhow!("no schema loader"))
×
NEW
102
        })?;
×
103

104
        let t0 = Instant::now();
105
        let tables = loader
106
            .reload_all(&patterns)
107
            .await
108
            .map_err(PipelineAPIError::Failed)?;
109

110
        Ok(ReloadResult {
111
            pipeline: pipeline.to_string(),
112
            tables_reloaded: tables.len(),
113
            tables: tables
114
                .iter()
115
                .map(|(db, t)| TableReloadStatus {
NEW
116
                    database: db.clone(),
×
NEW
117
                    table: t.clone(),
×
NEW
118
                    status: "ok".to_string(),
×
119
                    changed: true,
NEW
120
                    error: None,
×
NEW
121
                })
×
122
                .collect(),
123
            elapsed_ms: t0.elapsed().as_millis() as u64,
124
        })
NEW
125
    }
×
126

127
    async fn reload_table_schema(
128
        &self,
129
        pipeline: &str,
130
        db: &str,
131
        table: &str,
NEW
132
    ) -> Result<SchemaDetail, PipelineAPIError> {
×
133
        let loader = self.0.get_loader(pipeline)?;
134
        loader
135
            .reload(db, table)
136
            .await
137
            .map_err(PipelineAPIError::Failed)?;
138
        self.get_schema(pipeline, db, table).await
NEW
139
    }
×
140

141
    async fn get_schema_versions(
142
        &self,
143
        pipeline: &str,
144
        db: &str,
145
        table: &str,
NEW
146
    ) -> Result<Vec<SchemaVersionInfo>, PipelineAPIError> {
×
147
        let tenant = self
148
            .0
149
            .pipelines
150
            .read()
151
            .get(pipeline)
NEW
152
            .ok_or_else(|| PipelineAPIError::NotFound(pipeline.to_string()))?
×
153
            .spec
154
            .metadata
155
            .tenant
156
            .clone();
157

158
        Ok(self
159
            .0
160
            .registry()
161
            .list_versions(&tenant, db, table)
162
            .into_iter()
NEW
163
            .map(|v| {
×
NEW
164
                let col_count = v.schema_json
×
NEW
165
                    .get("columns")
×
NEW
166
                    .and_then(|c| c.as_array())
×
NEW
167
                    .map(|arr| arr.len())
×
NEW
168
                    .unwrap_or(0);
×
NEW
169
                SchemaVersionInfo {
×
NEW
170
                    version: v.version,
×
NEW
171
                    fingerprint: v.hash,
×
NEW
172
                    column_count: col_count,
×
NEW
173
                    registered_at: v.registered_at,
×
NEW
174
                }
×
NEW
175
            })
×
176
            .collect())
NEW
177
    }
×
178
}
179

180
/// Extract ColumnInfo from source-specific schema JSON.
NEW
181
fn extract_columns(schema_json: &Value) -> Vec<ColumnInfo> {
×
NEW
182
    let Some(cols) = schema_json.get("columns").and_then(|v| v.as_array())
×
183
    else {
NEW
184
        return vec![];
×
185
    };
186

NEW
187
    cols.iter()
×
NEW
188
        .filter_map(|c| {
×
189
            Some(ColumnInfo {
NEW
190
                name: c.get("name")?.as_str()?.to_string(),
×
NEW
191
                column_type: c
×
NEW
192
                    .get("column_type")
×
NEW
193
                    .or(c.get("declared_type"))
×
NEW
194
                    .and_then(|v| v.as_str())
×
NEW
195
                    .unwrap_or("")
×
NEW
196
                    .to_string(),
×
NEW
197
                data_type: c
×
NEW
198
                    .get("data_type")
×
NEW
199
                    .and_then(|v| v.as_str())
×
NEW
200
                    .unwrap_or("")
×
NEW
201
                    .to_string(),
×
NEW
202
                nullable: c
×
NEW
203
                    .get("nullable")
×
NEW
204
                    .and_then(|v| v.as_bool())
×
NEW
205
                    .unwrap_or(true),
×
NEW
206
                ordinal_position: c
×
NEW
207
                    .get("ordinal_position")
×
NEW
208
                    .or(c.get("column_index"))
×
NEW
209
                    .and_then(|v| v.as_u64())
×
NEW
210
                    .unwrap_or(0) as u32,
×
NEW
211
                default_value: c
×
NEW
212
                    .get("default_value")
×
NEW
213
                    .and_then(|v| v.as_str())
×
NEW
214
                    .map(String::from),
×
NEW
215
                extra: c
×
NEW
216
                    .get("extra")
×
NEW
217
                    .and_then(|v| v.as_str())
×
NEW
218
                    .map(String::from),
×
NEW
219
                is_primary_key: c
×
NEW
220
                    .get("is_primary_key")
×
NEW
221
                    .and_then(|v| v.as_bool())
×
NEW
222
                    .unwrap_or(false),
×
223
            })
NEW
224
        })
×
NEW
225
        .collect()
×
NEW
226
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc