• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 24713061833

21 Apr 2026 08:47AM UTC coverage: 78.316% (+0.07%) from 78.248%
24713061833

push

github

web-flow
refactor: enable extended clippy lint set (#675)

* refactor: explicit_iter_loop

* refactor: implicit_clone

* refactor: manual_let_else

* refactor: map_unwrap_or

* refactor: match_same_arms

* refactor: clippy::redundant_closure_for_method_calls

* refactor: redundant_clone

* refactor: redundant_test_prefix

* refactor: semicolon_if_nothing_returned

* refactor: uninlined_format_args

* refactor: unnested_or_patterns

* feat: enable extra lints

* fix: rebase

* fix: removed test

* fix: after rebase

* fix: fmt

* fix: http tests

* fix: rebase

* fix: fmt

* fix: fmt

414 of 503 new or added lines in 59 files covered. (82.31%)

3292 existing lines in 120 files now uncovered.

24379 of 31129 relevant lines covered (78.32%)

1069.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/xtask/src/commands/postgres.rs
1
use std::{
2
    process::{Command, Stdio},
3
    thread,
4
    time::Duration,
5
};
6

7
use anyhow::{Context, Result, bail};
8
use clap::{Args, Subcommand};
9

10
use super::shared::{DEFAULT_BASE_PORT, DEFAULT_PG_SHARD_COUNT};
11

12
const COMPOSE_FILE: &str = "./scripts/docker-compose.yaml";
13

14
/// Returns the program and initial args for docker compose.
15
/// Prefers `docker compose` (v2 plugin) and falls back to `docker-compose`
16
/// (standalone).
17
fn docker_compose_command() -> (&'static str, &'static [&'static str]) {
×
18
    if Command::new("docker")
×
19
        .args(["compose", "version"])
×
20
        .stdout(Stdio::null())
×
UNCOV
21
        .stderr(Stdio::null())
×
22
        .status()
×
UNCOV
23
        .is_ok_and(|s| s.success())
×
24
    {
UNCOV
25
        ("docker", &["compose"])
×
26
    } else {
UNCOV
27
        ("docker-compose", &[])
×
28
    }
UNCOV
29
}
×
30
const DEFAULT_PG_VERSION: &str = "18";
31
const DEFAULT_PG_USER: &str = "postgres";
32
const HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(1);
33

34
#[derive(Args)]
35
pub(crate) struct PostgresArgs {
36
    #[command(subcommand)]
37
    command: PostgresCommand,
38
}
39

40
#[derive(Subcommand)]
41
enum PostgresCommand {
42
    /// Start sharded Postgres clusters for testing.
43
    Start(StartArgs),
44
}
45

46
#[derive(Args)]
47
struct StartArgs {
48
    /// Postgres image tag.
49
    #[arg(long, env = "POSTGRES_VERSION", default_value = DEFAULT_PG_VERSION)]
50
    pg_version: String,
51

52
    /// Number of Postgres clusters to start.
53
    #[arg(long, env = "NUM_LOCAL_DATABASES", default_value_t = DEFAULT_PG_SHARD_COUNT)]
54
    shards: u16,
55

56
    /// Base port for the first cluster. Additional clusters use consecutive
57
    /// ports.
58
    #[arg(long, env = "TESTS_DATABASE_START_PORT", default_value_t = DEFAULT_BASE_PORT)]
59
    base_port: u16,
60

61
    /// Postgres user for health checks.
62
    #[arg(long, env = "POSTGRES_USER", default_value = DEFAULT_PG_USER)]
63
    pg_user: String,
64
}
65

66
impl PostgresArgs {
67
    pub(crate) fn run(self) -> Result<()> {
×
UNCOV
68
        match self.command {
×
UNCOV
69
            PostgresCommand::Start(args) => args.run(),
×
70
        }
71
    }
×
72
}
73

74
impl StartArgs {
UNCOV
75
    fn run(self) -> Result<()> {
×
76
        if self.shards == 0 {
×
77
            bail!("--shards must be at least 1");
×
78
        }
×
79

80
        if self.base_port.checked_add(self.shards - 1).is_none() {
×
UNCOV
81
            bail!("--base-port + --shards exceeds the valid port range");
×
UNCOV
82
        }
×
83

UNCOV
84
        eprintln!(
×
85
            "starting {} Postgres {} clusters on ports {}..{}.",
86
            self.shards,
87
            self.pg_version,
88
            self.base_port,
89
            self.base_port + self.shards - 1,
×
90
        );
91

92
        // Start the full stack (Postgres, Lakekeeper, MinIO, etc.) on the base port.
93
        self.docker_compose_up(&self.pg_version, None, &[])?;
×
94

95
        // Start additional source-postgres containers on subsequent ports.
96
        for shard in 2..=self.shards {
×
97
            let port = self.base_port + shard - 1;
×
98
            let project = format!("etl-stack-pg-{}-shard-{shard}", self.pg_version);
×
99
            self.docker_compose_up(&self.pg_version, Some((&project, port)), &["source-postgres"])?;
×
100
        }
101

102
        // Wait for all clusters to accept connections.
103
        for shard in 1..=self.shards {
×
104
            let port = self.base_port + shard - 1;
×
105
            self.wait_for_pg(port)?;
×
106
        }
107

108
        Ok(())
×
109
    }
×
110

111
    fn docker_compose_up(
×
112
        &self,
×
113
        pg_version: &str,
×
114
        project_and_port: Option<(&str, u16)>,
×
115
        services: &[&str],
×
116
    ) -> Result<()> {
×
117
        let postgres_image =
×
118
            std::env::var("POSTGRES_IMAGE").unwrap_or_else(|_| format!("postgres:{pg_version}"));
×
119
        let (program, compose_args) = docker_compose_command();
×
120
        let mut cmd = Command::new(program);
×
121
        cmd.args(compose_args);
×
UNCOV
122
        cmd.args(["-f", COMPOSE_FILE]);
×
123
        cmd.env("POSTGRES_VERSION", pg_version);
×
124
        cmd.env("POSTGRES_IMAGE", &postgres_image);
×
125

126
        if let Some((project, port)) = project_and_port {
×
UNCOV
127
            cmd.args(["-p", project]);
×
128
            cmd.env("POSTGRES_PORT", port.to_string());
×
129
        }
×
130

131
        cmd.args(["up", "-d"]);
×
UNCOV
132
        cmd.args(services);
×
133

134
        let status = cmd.status().context("failed to run docker compose")?;
×
135

UNCOV
136
        if !status.success() {
×
137
            bail!("docker compose up failed");
×
138
        }
×
139

140
        Ok(())
×
UNCOV
141
    }
×
142

143
    fn wait_for_pg(&self, port: u16) -> Result<()> {
×
144
        loop {
145
            let status = Command::new("pg_isready")
×
146
                .args(["-h", "localhost", "-p", &port.to_string(), "-U", &self.pg_user])
×
147
                .status()
×
148
                .context("failed to run pg_isready")?;
×
149

150
            if status.success() {
×
151
                return Ok(());
×
152
            }
×
153

154
            thread::sleep(HEALTH_CHECK_INTERVAL);
×
155
        }
156
    }
×
157
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc