• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / cli / 12005516982

25 Nov 2024 07:49AM UTC coverage: 59.57% (+0.02%) from 59.552%
12005516982

Pull #2904

github

sweatybridge
chore: refactor terminate backend query
Pull Request #2904: fix(reset): ensure _supabase connections disconnect before reset

29 of 31 new or added lines in 3 files covered. (93.55%)

11 existing lines in 2 files now uncovered.

6396 of 10737 relevant lines covered (59.57%)

6.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.67
/internal/db/reset/reset.go
1
package reset
2

3
import (
4
        "context"
5
        _ "embed"
6
        "fmt"
7
        "io"
8
        "os"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        "github.com/cenkalti/backoff/v4"
14
        "github.com/docker/docker/api/types"
15
        "github.com/docker/docker/api/types/container"
16
        "github.com/docker/docker/api/types/network"
17
        "github.com/docker/docker/errdefs"
18
        "github.com/go-errors/errors"
19
        "github.com/jackc/pgconn"
20
        "github.com/jackc/pgerrcode"
21
        "github.com/jackc/pgx/v4"
22
        "github.com/spf13/afero"
23
        "github.com/supabase/cli/internal/db/start"
24
        "github.com/supabase/cli/internal/gen/keys"
25
        "github.com/supabase/cli/internal/migration/apply"
26
        "github.com/supabase/cli/internal/migration/repair"
27
        "github.com/supabase/cli/internal/seed/buckets"
28
        "github.com/supabase/cli/internal/utils"
29
        "github.com/supabase/cli/pkg/migration"
30
)
31

32
func Run(ctx context.Context, version string, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
5✔
33
        if len(version) > 0 {
5✔
34
                if _, err := strconv.Atoi(version); err != nil {
×
35
                        return errors.New(repair.ErrInvalidVersion)
×
36
                }
×
37
                if _, err := repair.GetMigrationFile(version, fsys); err != nil {
×
38
                        return err
×
39
                }
×
40
        }
41
        if !utils.IsLocalDatabase(config) {
7✔
42
                msg := "Do you want to reset the remote database?"
2✔
43
                if shouldReset, err := utils.NewConsole().PromptYesNo(ctx, msg, false); err != nil {
2✔
44
                        return err
×
45
                } else if !shouldReset {
3✔
46
                        return errors.New(context.Canceled)
1✔
47
                }
1✔
48
                return resetRemote(ctx, version, config, fsys, options...)
1✔
49
        }
50
        // Config file is loaded before parsing --linked or --local flags
51
        if err := utils.AssertSupabaseDbIsRunning(); err != nil {
4✔
52
                return err
1✔
53
        }
1✔
54
        // Reset postgres database because extensions (pg_cron, pg_net) require postgres
55
        if err := resetDatabase(ctx, version, fsys, options...); err != nil {
3✔
56
                return err
1✔
57
        }
1✔
58
        // Seed objects from supabase/buckets directory
59
        if resp, err := utils.Docker.ContainerInspect(ctx, utils.StorageId); err == nil {
2✔
60
                if resp.State.Health == nil || resp.State.Health.Status != types.Healthy {
1✔
61
                        if err := start.WaitForHealthyService(ctx, 30*time.Second, utils.StorageId); err != nil {
×
62
                                return err
×
63
                        }
×
64
                }
65
                if err := buckets.Run(ctx, "", false, fsys); err != nil {
1✔
66
                        return err
×
67
                }
×
68
        }
69
        branch := keys.GetGitBranch(fsys)
1✔
70
        fmt.Fprintln(os.Stderr, "Finished "+utils.Aqua("supabase db reset")+" on branch "+utils.Aqua(branch)+".")
1✔
71
        return nil
1✔
72
}
73

74
func resetDatabase(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
2✔
75
        fmt.Fprintln(os.Stderr, "Resetting local database"+toLogMessage(version))
2✔
76
        if utils.Config.Db.MajorVersion <= 14 {
2✔
77
                return resetDatabase14(ctx, version, fsys, options...)
×
78
        }
×
79
        return resetDatabase15(ctx, version, fsys, options...)
2✔
80
}
81

82
func toLogMessage(version string) string {
7✔
83
        if len(version) > 0 {
7✔
84
                return " to version: " + version
×
85
        }
×
86
        return "..."
7✔
87
}
88

89
func resetDatabase14(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
×
90
        if err := recreateDatabase(ctx, options...); err != nil {
×
91
                return err
×
92
        }
×
93
        if err := initDatabase(ctx, options...); err != nil {
×
94
                return err
×
95
        }
×
96
        if err := RestartDatabase(ctx, os.Stderr); err != nil {
×
97
                return err
×
98
        }
×
99
        conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{}, options...)
×
100
        if err != nil {
×
101
                return err
×
102
        }
×
103
        defer conn.Close(context.Background())
×
104
        return apply.MigrateAndSeed(ctx, version, conn, fsys)
×
105
}
106

107
func resetDatabase15(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
2✔
108
        if err := utils.Docker.ContainerRemove(ctx, utils.DbId, container.RemoveOptions{Force: true}); err != nil {
3✔
109
                return errors.Errorf("failed to remove container: %w", err)
1✔
110
        }
1✔
111
        if err := utils.Docker.VolumeRemove(ctx, utils.DbId, true); err != nil {
1✔
112
                return errors.Errorf("failed to remove volume: %w", err)
×
113
        }
×
114
        config := start.NewContainerConfig()
1✔
115
        hostConfig := start.NewHostConfig()
1✔
116
        networkingConfig := network.NetworkingConfig{
1✔
117
                EndpointsConfig: map[string]*network.EndpointSettings{
1✔
118
                        utils.NetId: {
1✔
119
                                Aliases: utils.DbAliases,
1✔
120
                        },
1✔
121
                },
1✔
122
        }
1✔
123
        fmt.Fprintln(os.Stderr, "Recreating database...")
1✔
124
        if _, err := utils.DockerStart(ctx, config, hostConfig, networkingConfig, utils.DbId); err != nil {
1✔
125
                return err
×
126
        }
×
127
        if err := start.WaitForHealthyService(ctx, start.HealthTimeout, utils.DbId); err != nil {
1✔
128
                return err
×
129
        }
×
130
        if err := start.SetupLocalDatabase(ctx, version, fsys, os.Stderr, options...); err != nil {
1✔
131
                return err
×
132
        }
×
133
        fmt.Fprintln(os.Stderr, "Restarting containers...")
1✔
134
        return restartServices(ctx)
1✔
135
}
136

137
func initDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) error {
3✔
138
        conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{User: "supabase_admin"}, options...)
3✔
139
        if err != nil {
4✔
140
                return err
1✔
141
        }
1✔
142
        defer conn.Close(context.Background())
2✔
143
        return start.InitSchema14(ctx, conn)
2✔
144
}
145

146
// Recreate postgres database by connecting to template1
147
func recreateDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) error {
5✔
148
        conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{User: "supabase_admin", Database: "template1"}, options...)
5✔
149
        if err != nil {
6✔
150
                return err
1✔
151
        }
1✔
152
        defer conn.Close(context.Background())
4✔
153
        if err := DisconnectClients(ctx, conn); err != nil {
6✔
154
                return err
2✔
155
        }
2✔
156
        // We are not dropping roles here because they are cluster level entities. Use stop && start instead.
157
        sql := migration.MigrationFile{
2✔
158
                Statements: []string{
2✔
159
                        "DROP DATABASE IF EXISTS postgres WITH (FORCE)",
2✔
160
                        "CREATE DATABASE postgres WITH OWNER postgres",
2✔
161
                        "DROP DATABASE IF EXISTS _supabase WITH (FORCE)",
2✔
162
                        "CREATE DATABASE _supabase WITH OWNER postgres",
2✔
163
                },
2✔
164
        }
2✔
165
        return sql.ExecBatch(ctx, conn)
2✔
166
}
167

168
const (
169
        TERMINATE_BACKENDS      = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname IN ('postgres', '_supabase')"
170
        COUNT_REPLICATION_SLOTS = "SELECT COUNT(*) FROM pg_replication_slots WHERE database IN ('postgres', '_supabase')"
171
)
172

173
func DisconnectClients(ctx context.Context, conn *pgx.Conn) error {
9✔
174
        // Must be executed separately because looping in transaction is unsupported
9✔
175
        // https://dba.stackexchange.com/a/11895
9✔
176
        disconn := migration.MigrationFile{
9✔
177
                Statements: []string{
9✔
178
                        "ALTER DATABASE postgres ALLOW_CONNECTIONS false",
9✔
179
                        "ALTER DATABASE _supabase ALLOW_CONNECTIONS false",
9✔
180
                        TERMINATE_BACKENDS,
9✔
181
                },
9✔
182
        }
9✔
183
        if err := disconn.ExecBatch(ctx, conn); err != nil {
13✔
184
                var pgErr *pgconn.PgError
4✔
185
                if errors.As(err, &pgErr) && pgErr.Code != pgerrcode.InvalidCatalogName {
6✔
186
                        return errors.Errorf("failed to disconnect clients: %w", err)
2✔
187
                }
2✔
188
        }
189
        // Wait for WAL senders to drop their replication slots
190
        policy := start.NewBackoffPolicy(ctx, 10*time.Second)
7✔
191
        waitForDrop := func() error {
14✔
192
                var count int
7✔
193
                if err := conn.QueryRow(ctx, COUNT_REPLICATION_SLOTS).Scan(&count); err != nil {
9✔
194
                        err = errors.Errorf("failed to count replication slots: %w", err)
2✔
195
                        return &backoff.PermanentError{Err: err}
2✔
196
                } else if count > 0 {
7✔
NEW
197
                        return errors.Errorf("replication slots still active: %d", count)
×
NEW
198
                }
×
199
                return nil
5✔
200
        }
201
        return backoff.Retry(waitForDrop, policy)
7✔
202
}
203

204
func RestartDatabase(ctx context.Context, w io.Writer) error {
7✔
205
        fmt.Fprintln(w, "Restarting containers...")
7✔
206
        // Some extensions must be manually restarted after pg_terminate_backend
7✔
207
        // Ref: https://github.com/citusdata/pg_cron/issues/99
7✔
208
        if err := utils.Docker.ContainerRestart(ctx, utils.DbId, container.StopOptions{}); err != nil {
11✔
209
                return errors.Errorf("failed to restart container: %w", err)
4✔
210
        }
4✔
211
        if err := start.WaitForHealthyService(ctx, start.HealthTimeout, utils.DbId); err != nil {
4✔
212
                return err
1✔
213
        }
1✔
214
        return restartServices(ctx)
2✔
215
}
216

217
func restartServices(ctx context.Context) error {
3✔
218
        // No need to restart PostgREST because it automatically reconnects and listens for schema changes
3✔
219
        services := listServicesToRestart()
3✔
220
        result := utils.WaitAll(services, func(id string) error {
15✔
221
                if err := utils.Docker.ContainerRestart(ctx, id, container.StopOptions{}); err != nil && !errdefs.IsNotFound(err) {
15✔
222
                        return errors.Errorf("failed to restart %s: %w", id, err)
3✔
223
                }
3✔
224
                return nil
9✔
225
        })
226
        // Do not wait for service healthy as those services may be excluded from starting
227
        return errors.Join(result...)
3✔
228
}
229

230
func listServicesToRestart() []string {
5✔
231
        return []string{utils.StorageId, utils.GotrueId, utils.RealtimeId, utils.PoolerId}
5✔
232
}
5✔
233

234
func resetRemote(ctx context.Context, version string, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
5✔
235
        fmt.Fprintln(os.Stderr, "Resetting remote database"+toLogMessage(version))
5✔
236
        conn, err := utils.ConnectByConfigStream(ctx, config, io.Discard, options...)
5✔
237
        if err != nil {
7✔
238
                return err
2✔
239
        }
2✔
240
        defer conn.Close(context.Background())
3✔
241
        if err := migration.DropUserSchemas(ctx, conn); err != nil {
4✔
242
                return err
1✔
243
        }
1✔
244
        return apply.MigrateAndSeed(ctx, version, conn, fsys)
2✔
245
}
246

247
func LikeEscapeSchema(schemas []string) (result []string) {
24✔
248
        // Treat _ as literal, * as any character
24✔
249
        replacer := strings.NewReplacer("_", `\_`, "*", "%")
24✔
250
        for _, sch := range schemas {
522✔
251
                result = append(result, replacer.Replace(sch))
498✔
252
        }
498✔
253
        return result
24✔
254
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc