• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vnvo / deltaforge / 20841599773

09 Jan 2026 04:50AM UTC coverage: 51.06% (-1.1%) from 52.164%
20841599773

push

github

vnvo
chore: fixing clippy warnings

54 of 262 new or added lines in 4 files covered. (20.61%)

590 existing lines in 7 files now uncovered.

4674 of 9154 relevant lines covered (51.06%)

2.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

11.87
/crates/sources/src/postgres/postgres_schema_loader.rs
1
//! PostgreSQL schema loader with wildcard expansion and registry integration.
2
//!
3
//! Provides schema preloading at startup with support for:
4
//! - Wildcard table patterns (e.g., `public.*`, `%.audit_log`)
5
//! - Full schema loading from information_schema
6
//! - Schema registry integration with fingerprinting
7
//! - On-demand reload capability
8

9
use std::collections::HashMap;
10
use std::sync::Arc;
11
use std::time::Instant;
12

13
use schema_registry::{InMemoryRegistry, SourceSchema};
14
use tokio::sync::RwLock;
15
use tokio_postgres::NoTls;
16
use tracing::{debug, info, warn};
17

18
use deltaforge_core::{SourceError, SourceResult};
19

20
use super::postgres_helpers::redact_password;
21
use super::postgres_table_schema::{PostgresColumn, PostgresTableSchema};
22
use crate::schema_loader::{
23
    LoadedSchema as ApiLoadedSchema, SchemaListEntry, SourceSchemaLoader,
24
};
25

26
/// Loaded schema with metadata.
27
#[derive(Debug, Clone)]
28
pub struct LoadedSchema {
29
    pub schema: PostgresTableSchema,
30
    pub registry_version: i32,
31
    pub fingerprint: Arc<str>,
32
    pub sequence: u64,
33
    pub column_names: Arc<Vec<String>>,
34
}
35

36
/// Schema loader with caching and registry integration.
37
#[derive(Clone)]
38
pub struct PostgresSchemaLoader {
39
    dsn: String,
40
    cache: Arc<RwLock<HashMap<(String, String), LoadedSchema>>>,
41
    registry: Arc<InMemoryRegistry>,
42
    tenant: String,
43
}
44

45
impl PostgresSchemaLoader {
46
    /// Create a new schema loader.
47
    pub fn new(
×
48
        dsn: &str,
×
49
        registry: Arc<InMemoryRegistry>,
×
50
        tenant: &str,
×
51
    ) -> Self {
×
52
        info!(
×
53
            "creating postgres schema loader for {}",
×
54
            redact_password(dsn)
×
55
        );
56
        Self {
×
57
            dsn: dsn.to_string(),
×
58
            cache: Arc::new(RwLock::new(HashMap::new())),
×
59
            registry,
×
60
            tenant: tenant.to_string(),
×
61
        }
×
62
    }
×
63

64
    /// Get a database connection.
65
    async fn connect(&self) -> SourceResult<tokio_postgres::Client> {
×
66
        let (client, conn) = tokio_postgres::connect(&self.dsn, NoTls)
×
67
            .await
×
68
            .map_err(|e| SourceError::Connect {
×
69
                details: format!("postgres connect: {}", e).into(),
×
70
            })?;
×
71

UNCOV
72
        tokio::spawn(async move {
×
73
            if let Err(e) = conn.await {
×
74
                tracing::error!("postgres connection error: {}", e);
×
75
            }
×
76
        });
×
77

UNCOV
78
        Ok(client)
×
79
    }
×
80

UNCOV
81
    pub fn current_sequence(&self) -> u64 {
×
82
        self.registry.current_sequence()
×
83
    }
×
84

85
    /// Expand wildcard patterns and preload all matching schemas.
UNCOV
86
    pub async fn preload(
×
UNCOV
87
        &self,
×
UNCOV
88
        patterns: &[String],
×
UNCOV
89
    ) -> SourceResult<Vec<(String, String)>> {
×
UNCOV
90
        let t0 = Instant::now();
×
UNCOV
91
        let tables = self.expand_patterns(patterns).await?;
×
92

UNCOV
93
        info!(
×
94
            dsn = redact_password(&self.dsn),
×
95
            patterns = ?patterns,
96
            matched_tables = tables.len(),
×
97
            "expanded table patterns"
×
98
        );
99

UNCOV
100
        for (schema, table) in &tables {
×
101
            if let Err(e) = self.load_schema(schema, table).await {
×
102
                warn!(schema = %schema, table = %table, error = %e, "failed to preload schema");
×
UNCOV
103
            }
×
104
        }
105

UNCOV
106
        info!(
×
UNCOV
107
            tables_loaded = tables.len(),
×
108
            elapsed_ms = t0.elapsed().as_millis(),
×
109
            "schema preload complete"
×
110
        );
111
        Ok(tables)
×
UNCOV
112
    }
×
113

114
    /// Expand wildcard patterns to actual table list.
115
    pub async fn expand_patterns(
×
116
        &self,
×
117
        patterns: &[String],
×
118
    ) -> SourceResult<Vec<(String, String)>> {
×
UNCOV
119
        let client = self.connect().await?;
×
UNCOV
120
        let mut results = Vec::new();
×
121

122
        if patterns.is_empty() {
×
UNCOV
123
            let rows = client
×
UNCOV
124
                .query(
×
125
                    "SELECT table_schema, table_name FROM information_schema.tables \
×
126
                     WHERE table_type = 'BASE TABLE' \
×
127
                     AND table_schema NOT IN ('pg_catalog', 'information_schema', 'pg_toast')",
×
128
                    &[],
×
129
                )
×
130
                .await
×
UNCOV
131
                .map_err(query_error)?;
×
132

133
            for row in rows {
×
134
                results.push((row.get(0), row.get(1)));
×
135
            }
×
136
            return Ok(results);
×
137
        }
×
138

139
        for pattern in patterns {
×
140
            let (schema_pattern, table_pattern) = parse_pattern(pattern);
×
141
            let query = build_pattern_query(&schema_pattern, &table_pattern);
×
142
            let rows = client.query(&query, &[]).await.map_err(query_error)?;
×
143

UNCOV
144
            for row in rows {
×
145
                let entry: (String, String) = (row.get(0), row.get(1));
×
146
                if !results.contains(&entry) {
×
147
                    results.push(entry);
×
148
                }
×
149
            }
150
        }
151

UNCOV
152
        Ok(results)
×
153
    }
×
154

155
    /// Load full schema for a table.
156
    pub async fn load_schema(
×
157
        &self,
×
UNCOV
158
        schema: &str,
×
159
        table: &str,
×
160
    ) -> SourceResult<LoadedSchema> {
×
161
        self.load_schema_at_checkpoint(schema, table, None).await
×
162
    }
×
163

164
    /// Load schema with optional checkpoint for registry correlation.
UNCOV
165
    pub async fn load_schema_at_checkpoint(
×
UNCOV
166
        &self,
×
UNCOV
167
        schema: &str,
×
168
        table: &str,
×
169
        checkpoint: Option<&[u8]>,
×
UNCOV
170
    ) -> SourceResult<LoadedSchema> {
×
UNCOV
171
        let key = (schema.to_string(), table.to_string());
×
172

173
        if let Some(cached) = self.cache.read().await.get(&key) {
×
174
            debug!(schema = %schema, table = %table, "schema cache hit");
×
175
            return Ok(cached.clone());
×
176
        }
×
177

178
        let t0 = Instant::now();
×
UNCOV
179
        let pg_schema = self.fetch_schema(schema, table).await?;
×
180
        let fingerprint = pg_schema.fingerprint();
×
181
        let column_names: Arc<Vec<String>> = Arc::new(
×
182
            pg_schema.columns.iter().map(|c| c.name.clone()).collect(),
×
183
        );
184

185
        let schema_json = serde_json::to_value(&pg_schema)
×
186
            .map_err(|e| SourceError::Other(e.into()))?;
×
187

188
        let version = self
×
189
            .registry
×
190
            .register_with_checkpoint(
×
UNCOV
191
                &self.tenant,
×
192
                schema,
×
193
                table,
×
194
                &fingerprint,
×
UNCOV
195
                &schema_json,
×
196
                checkpoint,
×
197
            )
×
198
            .await
×
199
            .map_err(SourceError::Other)?;
×
200

UNCOV
201
        let loaded = LoadedSchema {
×
UNCOV
202
            schema: pg_schema,
×
UNCOV
203
            registry_version: version,
×
204
            fingerprint: fingerprint.into(),
×
205
            sequence: self.registry.current_sequence(),
×
UNCOV
206
            column_names,
×
UNCOV
207
        };
×
208

209
        self.cache.write().await.insert(key, loaded.clone());
×
210

211
        let elapsed = t0.elapsed();
×
212
        if elapsed.as_millis() > 200 {
×
213
            warn!(schema = %schema, table = %table, ms = elapsed.as_millis(), "slow schema load");
×
214
        } else {
215
            debug!(schema = %schema, table = %table, version, ms = elapsed.as_millis(), "schema loaded");
×
216
        }
217

218
        Ok(loaded)
×
219
    }
×
220

221
    /// Force reload schema from database (bypasses cache).
222
    pub async fn reload_schema(
×
223
        &self,
×
224
        schema: &str,
×
225
        table: &str,
×
226
    ) -> SourceResult<LoadedSchema> {
×
227
        self.cache
×
UNCOV
228
            .write()
×
UNCOV
229
            .await
×
230
            .remove(&(schema.to_string(), table.to_string()));
×
231
        self.load_schema(schema, table).await
×
232
    }
×
233

234
    /// Reload all schemas matching patterns.
235
    pub async fn reload_all(
×
236
        &self,
×
237
        patterns: &[String],
×
UNCOV
238
    ) -> SourceResult<Vec<(String, String)>> {
×
239
        self.cache.write().await.clear();
×
UNCOV
240
        self.preload(patterns).await
×
UNCOV
241
    }
×
242

243
    /// Get cached schema (without loading from DB).
UNCOV
244
    pub fn get_cached(
×
UNCOV
245
        &self,
×
246
        schema: &str,
×
247
        table: &str,
×
248
    ) -> Option<LoadedSchema> {
×
249
        self.cache.try_read().ok().and_then(|c| {
×
250
            c.get(&(schema.to_string(), table.to_string())).cloned()
×
UNCOV
251
        })
×
252
    }
×
253

254
    /// Fetch full schema from information_schema.
255
    async fn fetch_schema(
×
UNCOV
256
        &self,
×
UNCOV
257
        schema_name: &str,
×
258
        table_name: &str,
×
259
    ) -> SourceResult<PostgresTableSchema> {
×
UNCOV
260
        let client = self.connect().await?;
×
261

262
        let col_rows = client
×
263
            .query(
×
264
                r#"
×
265
                SELECT 
×
UNCOV
266
                    c.column_name, c.data_type, c.udt_name, c.is_nullable,
×
267
                    c.ordinal_position, c.column_default, c.character_maximum_length,
×
UNCOV
268
                    c.numeric_precision, c.numeric_scale, c.is_identity,
×
UNCOV
269
                    c.identity_generation, c.is_generated
×
270
                FROM information_schema.columns c
×
271
                WHERE c.table_schema = $1 AND c.table_name = $2
×
UNCOV
272
                ORDER BY c.ordinal_position
×
UNCOV
273
                "#,
×
274
                &[&schema_name, &table_name],
×
275
            )
×
276
            .await
×
277
            .map_err(query_error)?;
×
278

UNCOV
279
        if col_rows.is_empty() {
×
280
            return Err(SourceError::Schema {
×
281
                details: format!(
×
282
                    "table {}.{} not found",
×
283
                    schema_name, table_name
×
UNCOV
284
                )
×
UNCOV
285
                .into(),
×
286
            });
×
287
        }
×
288

289
        let columns: Vec<PostgresColumn> =
×
NEW
290
            col_rows.iter().map(build_column).collect();
×
291

UNCOV
292
        let pk_rows = client
×
UNCOV
293
            .query(
×
294
                r#"
×
295
                SELECT a.attname
×
296
                FROM pg_index i
×
297
                JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
×
298
                WHERE i.indrelid = ($1 || '.' || $2)::regclass AND i.indisprimary
×
299
                ORDER BY array_position(i.indkey, a.attnum)
×
300
                "#,
×
301
                &[&schema_name, &table_name],
×
302
            )
×
303
            .await
×
304
            .map_err(query_error)?;
×
305

306
        let primary_key: Vec<String> =
×
307
            pk_rows.iter().map(|r| r.get(0)).collect();
×
308

309
        let identity_row = client
×
310
            .query_opt(
×
311
                r#"
×
312
                SELECT relreplident FROM pg_class c
×
313
                JOIN pg_namespace n ON n.oid = c.relnamespace
×
314
                WHERE n.nspname = $1 AND c.relname = $2
×
315
                "#,
×
316
                &[&schema_name, &table_name],
×
317
            )
×
UNCOV
318
            .await
×
319
            .map_err(query_error)?;
×
320

321
        let replica_identity = identity_row.map(|r| {
×
322
            match r.get::<_, i8>(0) as u8 as char {
×
323
                'd' => "default",
×
324
                'n' => "nothing",
×
325
                'f' => "full",
×
326
                'i' => "index",
×
327
                _ => "unknown",
×
328
            }
329
            .to_string()
×
330
        });
×
331

332
        let oid = client
×
333
            .query_opt(
×
334
                "SELECT ($1 || '.' || $2)::regclass::oid",
×
335
                &[&schema_name, &table_name],
×
336
            )
×
337
            .await
×
338
            .map_err(query_error)?
×
339
            .map(|r| r.get::<_, u32>(0));
×
340

341
        Ok(PostgresTableSchema {
×
342
            columns,
×
343
            primary_key,
×
UNCOV
344
            replica_identity,
×
345
            oid,
×
346
            schema_name: Some(schema_name.to_string()),
×
347
        })
×
UNCOV
348
    }
×
349

350
    /// Get column names only (for backward compatibility).
UNCOV
351
    pub async fn column_names(
×
352
        &self,
×
353
        schema: &str,
×
354
        table: &str,
×
355
    ) -> SourceResult<Arc<Vec<String>>> {
×
356
        Ok(Arc::clone(
×
UNCOV
357
            &self.load_schema(schema, table).await?.column_names,
×
358
        ))
359
    }
×
360

361
    #[allow(dead_code)]
362
    pub(crate) fn from_static(
×
363
        cols: HashMap<(String, String), Arc<Vec<String>>>,
×
364
    ) -> Self {
×
365
        let cache: HashMap<(String, String), LoadedSchema> = cols
×
366
            .into_iter()
×
367
            .map(|((schema, table), col_names)| {
×
368
                let columns: Vec<PostgresColumn> = col_names
×
369
                    .iter()
×
370
                    .enumerate()
×
371
                    .map(|(i, name)| {
×
372
                        PostgresColumn::new(name, "text", true, i as i32 + 1)
×
373
                    })
×
374
                    .collect();
×
375
                let pg_schema = PostgresTableSchema::new(columns);
×
376
                let fingerprint = pg_schema.fingerprint();
×
377
                let loaded = LoadedSchema {
×
UNCOV
378
                    schema: pg_schema,
×
379
                    registry_version: 1,
×
380
                    fingerprint: fingerprint.into(),
×
381
                    sequence: 0,
×
UNCOV
382
                    column_names: col_names,
×
UNCOV
383
                };
×
384
                ((schema, table), loaded)
×
385
            })
×
386
            .collect();
×
387

388
        Self {
×
389
            dsn: "host=localhost".to_string(),
×
390
            cache: Arc::new(RwLock::new(cache)),
×
391
            registry: Arc::new(InMemoryRegistry::new()),
×
392
            tenant: "test".to_string(),
×
393
        }
×
394
    }
×
395
}
396

397
/// Build PostgresColumn from query row.
UNCOV
398
fn build_column(row: &tokio_postgres::Row) -> PostgresColumn {
×
399
    let name: String = row.get(0);
×
400
    let data_type: String = row.get(1);
×
UNCOV
401
    let udt_name: String = row.get(2);
×
UNCOV
402
    let is_nullable: String = row.get(3);
×
403
    let ordinal: i32 = row.get(4);
×
404
    let default: Option<String> = row.get(5);
×
405
    let char_max_len: Option<i32> = row.get(6);
×
406
    let num_precision: Option<i32> = row.get(7);
×
407
    let num_scale: Option<i32> = row.get(8);
×
408
    let is_identity: String = row.get(9);
×
409
    let identity_gen: Option<String> = row.get(10);
×
410
    let is_generated: String = row.get(11);
×
411

412
    let is_array = data_type == "ARRAY";
×
413
    let effective_type = if is_array {
×
414
        format!("{}[]", udt_name.trim_start_matches('_'))
×
415
    } else {
416
        data_type.clone()
×
417
    };
418

419
    let mut col = PostgresColumn::new(
×
420
        &name,
×
421
        &effective_type,
×
422
        is_nullable == "YES",
×
423
        ordinal,
×
424
    );
425

UNCOV
426
    if let Some(def) = default {
×
UNCOV
427
        col = col.with_default(def);
×
428
    }
×
429
    if let Some(len) = char_max_len {
×
430
        col = col.with_char_max_length(len);
×
431
    }
×
432
    if let (Some(prec), Some(scale)) = (num_precision, num_scale) {
×
433
        col = col.with_numeric(prec, scale);
×
434
    }
×
UNCOV
435
    if is_identity == "YES" {
×
436
        col = col.with_identity(identity_gen.unwrap_or_default());
×
UNCOV
437
    }
×
438
    if is_generated == "ALWAYS" {
×
439
        col.is_generated = true;
×
440
    }
×
441
    if is_array {
×
442
        col = col.as_array(udt_name.trim_start_matches('_'));
×
443
    }
×
444
    col.udt_name = Some(udt_name);
×
445

UNCOV
446
    col
×
UNCOV
447
}
×
448

449
fn parse_pattern(pattern: &str) -> (String, String) {
4✔
450
    pattern
4✔
451
        .split_once('.')
4✔
452
        .map(|(s, t)| (s.to_string(), t.to_string()))
4✔
453
        .unwrap_or_else(|| ("public".to_string(), pattern.to_string()))
4✔
454
}
4✔
455

456
fn build_pattern_query(schema_pattern: &str, table_pattern: &str) -> String {
3✔
457
    let schema_clause = match schema_pattern {
3✔
458
        "*" | "%" => "table_schema NOT IN ('pg_catalog', 'information_schema', 'pg_toast')".to_string(),
3✔
459
        s if s.contains('%') || s.contains('_') => format!("table_schema LIKE '{}'", escape_like(s)),
2✔
460
        s => format!("table_schema = '{}'", escape_sql(s)),
2✔
461
    };
462

463
    let table_clause = match table_pattern {
3✔
464
        "*" | "%" => "1=1".to_string(),
3✔
465
        t if t.contains('%') || t.contains('_') => {
2✔
466
            format!("table_name LIKE '{}'", escape_like(t))
1✔
467
        }
468
        t => format!("table_name = '{}'", escape_sql(t)),
1✔
469
    };
470

471
    format!(
3✔
472
        "SELECT table_schema, table_name FROM information_schema.tables \
3✔
473
         WHERE table_type = 'BASE TABLE' AND {} AND {}",
3✔
474
        schema_clause, table_clause
475
    )
476
}
3✔
477

478
fn escape_sql(s: &str) -> String {
3✔
479
    s.replace('\'', "''")
3✔
480
}
3✔
481

482
fn escape_like(s: &str) -> String {
1✔
483
    s.replace('\'', "''")
1✔
484
}
1✔
485

486
fn query_error(e: tokio_postgres::Error) -> SourceError {
×
487
    SourceError::Other(anyhow::anyhow!("postgres query: {}", e))
×
UNCOV
488
}
×
489

490
#[async_trait::async_trait]
491
impl SourceSchemaLoader for PostgresSchemaLoader {
492
    fn source_type(&self) -> &'static str {
×
493
        "postgres"
×
494
    }
×
495

496
    async fn load(
497
        &self,
498
        schema: &str,
499
        table: &str,
UNCOV
500
    ) -> anyhow::Result<ApiLoadedSchema> {
×
501
        let loaded = self.load_schema(schema, table).await?;
502
        Ok(ApiLoadedSchema {
503
            database: schema.to_string(),
504
            table: table.to_string(),
505
            schema_json: serde_json::to_value(&loaded.schema)
506
                .unwrap_or_default(),
507
            columns: loaded.column_names.iter().cloned().collect(),
508
            primary_key: loaded.schema.primary_key.clone(),
509
            fingerprint: loaded.fingerprint.to_string(),
510
            registry_version: loaded.registry_version,
511
            loaded_at: chrono::Utc::now(),
512
        })
UNCOV
513
    }
×
514

515
    async fn reload(
516
        &self,
517
        schema: &str,
518
        table: &str,
UNCOV
519
    ) -> anyhow::Result<ApiLoadedSchema> {
×
520
        let loaded = self.reload_schema(schema, table).await?;
521
        Ok(ApiLoadedSchema {
522
            database: schema.to_string(),
523
            table: table.to_string(),
524
            schema_json: serde_json::to_value(&loaded.schema)
525
                .unwrap_or_default(),
526
            columns: loaded.column_names.iter().cloned().collect(),
527
            primary_key: loaded.schema.primary_key.clone(),
528
            fingerprint: loaded.fingerprint.to_string(),
529
            registry_version: loaded.registry_version,
530
            loaded_at: chrono::Utc::now(),
531
        })
UNCOV
532
    }
×
533

534
    async fn reload_all(
535
        &self,
536
        patterns: &[String],
UNCOV
537
    ) -> anyhow::Result<Vec<(String, String)>> {
×
538
        PostgresSchemaLoader::reload_all(self, patterns)
539
            .await
540
            .map_err(Into::into)
UNCOV
541
    }
×
542

543
    async fn list_cached(&self) -> Vec<SchemaListEntry> {
×
544
        self.cache
545
            .read()
546
            .await
547
            .iter()
548
            .map(|((schema, table), loaded)| SchemaListEntry {
549
                database: schema.clone(),
×
550
                table: table.clone(),
×
551
                column_count: loaded.schema.columns.len(),
×
UNCOV
552
                primary_key: loaded.schema.primary_key.clone(),
×
UNCOV
553
                fingerprint: loaded.fingerprint.to_string(),
×
UNCOV
554
                registry_version: loaded.registry_version,
×
UNCOV
555
            })
×
556
            .collect()
557
    }
×
558
}
559

560
#[cfg(test)]
561
mod tests {
562
    use super::*;
563

564
    #[test]
565
    fn test_parse_pattern() {
1✔
566
        assert_eq!(
1✔
567
            parse_pattern("public.users"),
1✔
568
            ("public".into(), "users".into())
1✔
569
        );
570
        assert_eq!(
1✔
571
            parse_pattern("myschema.*"),
1✔
572
            ("myschema".into(), "*".into())
1✔
573
        );
574
        assert_eq!(parse_pattern("%.audit"), ("%".into(), "audit".into()));
1✔
575
        assert_eq!(parse_pattern("orders"), ("public".into(), "orders".into()));
1✔
576
    }
1✔
577

578
    #[test]
579
    fn test_build_pattern_query() {
1✔
580
        let q = build_pattern_query("public", "users");
1✔
581
        assert!(q.contains("table_schema = 'public'"));
1✔
582
        assert!(q.contains("table_name = 'users'"));
1✔
583

584
        let q = build_pattern_query("public", "*");
1✔
585
        assert!(q.contains("table_schema = 'public'"));
1✔
586
        assert!(q.contains("1=1"));
1✔
587

588
        let q = build_pattern_query("%", "audit%");
1✔
589
        assert!(q.contains("table_schema NOT IN"));
1✔
590
        assert!(q.contains("table_name LIKE 'audit%'"));
1✔
591
    }
1✔
592
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc