• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / cli / 19625585658

24 Nov 2025 06:38AM UTC coverage: 54.979% (-0.1%) from 55.108%
19625585658

Pull #4471

github

web-flow
Merge 7c3613a8c into 4a90b4fce
Pull Request #4471: fix(reset): ensure pgmq extension is fully rebuilt during remote resets

1 of 24 new or added lines in 1 file covered. (4.17%)

6 existing lines in 2 files now uncovered.

6531 of 11879 relevant lines covered (54.98%)

6.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.85
/internal/db/reset/reset.go
1
package reset
2

3
import (
4
        "context"
5
        _ "embed"
6
        "fmt"
7
        "io"
8
        "os"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        "github.com/cenkalti/backoff/v4"
14
        "github.com/containerd/errdefs"
15
        "github.com/docker/docker/api/types"
16
        "github.com/docker/docker/api/types/container"
17
        "github.com/docker/docker/api/types/network"
18
        "github.com/go-errors/errors"
19
        "github.com/jackc/pgconn"
20
        "github.com/jackc/pgerrcode"
21
        "github.com/jackc/pgx/v4"
22
        "github.com/spf13/afero"
23
        "github.com/supabase/cli/internal/db/start"
24
        "github.com/supabase/cli/internal/gen/keys"
25
        "github.com/supabase/cli/internal/migration/apply"
26
        "github.com/supabase/cli/internal/migration/down"
27
        "github.com/supabase/cli/internal/migration/list"
28
        "github.com/supabase/cli/internal/migration/repair"
29
        "github.com/supabase/cli/internal/seed/buckets"
30
        "github.com/supabase/cli/internal/utils"
31
        "github.com/supabase/cli/pkg/migration"
32
)
33

34
func Run(ctx context.Context, version string, last uint, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
5✔
35
        if len(version) > 0 {
5✔
36
                if _, err := strconv.Atoi(version); err != nil {
×
37
                        return errors.New(repair.ErrInvalidVersion)
×
38
                }
×
39
                if _, err := repair.GetMigrationFile(version, fsys); err != nil {
×
40
                        return err
×
41
                }
×
42
        } else if last > 0 {
5✔
43
                localMigrations, err := list.LoadLocalVersions(fsys)
×
44
                if err != nil {
×
45
                        return err
×
46
                }
×
47
                if total := uint(len(localMigrations)); last < total {
×
48
                        version = localMigrations[total-last-1]
×
49
                } else {
×
50
                        // Negative skips all migrations
×
51
                        version = "-"
×
52
                }
×
53
        }
54
        if !utils.IsLocalDatabase(config) {
7✔
55
                return resetRemote(ctx, version, config, fsys, options...)
2✔
56
        }
2✔
57
        // Config file is loaded before parsing --linked or --local flags
58
        if err := utils.AssertSupabaseDbIsRunning(); err != nil {
4✔
59
                return err
1✔
60
        }
1✔
61
        // Reset postgres database because extensions (pg_cron, pg_net) require postgres
62
        if err := resetDatabase(ctx, version, fsys, options...); err != nil {
3✔
63
                return err
1✔
64
        }
1✔
65
        // Seed objects from supabase/buckets directory
66
        if resp, err := utils.Docker.ContainerInspect(ctx, utils.StorageId); err == nil {
2✔
67
                if resp.State.Health == nil || resp.State.Health.Status != types.Healthy {
1✔
68
                        if err := start.WaitForHealthyService(ctx, 30*time.Second, utils.StorageId); err != nil {
×
69
                                return err
×
70
                        }
×
71
                }
72
                if err := buckets.Run(ctx, "", false, fsys); err != nil {
1✔
73
                        return err
×
74
                }
×
75
        }
76
        branch := keys.GetGitBranch(fsys)
1✔
77
        fmt.Fprintln(os.Stderr, "Finished "+utils.Aqua("supabase db reset")+" on branch "+utils.Aqua(branch)+".")
1✔
78
        return nil
1✔
79
}
80

81
func resetDatabase(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
2✔
82
        fmt.Fprintln(os.Stderr, "Resetting local database"+toLogMessage(version))
2✔
83
        if utils.Config.Db.MajorVersion <= 14 {
2✔
84
                return resetDatabase14(ctx, version, fsys, options...)
×
85
        }
×
86
        return resetDatabase15(ctx, version, fsys, options...)
2✔
87
}
88

89
func toLogMessage(version string) string {
3✔
90
        if len(version) > 0 {
3✔
91
                return " to version: " + version
×
92
        }
×
93
        return "..."
3✔
94
}
95

96
func resetDatabase14(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
×
97
        if err := recreateDatabase(ctx, options...); err != nil {
×
98
                return err
×
99
        }
×
100
        if err := initDatabase(ctx, options...); err != nil {
×
101
                return err
×
102
        }
×
103
        if err := RestartDatabase(ctx, os.Stderr); err != nil {
×
104
                return err
×
105
        }
×
106
        conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{}, options...)
×
107
        if err != nil {
×
108
                return err
×
109
        }
×
110
        defer conn.Close(context.Background())
×
111
        return apply.MigrateAndSeed(ctx, version, conn, fsys)
×
112
}
113

114
func resetDatabase15(ctx context.Context, version string, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
2✔
115
        if err := utils.Docker.ContainerRemove(ctx, utils.DbId, container.RemoveOptions{Force: true}); err != nil {
3✔
116
                return errors.Errorf("failed to remove container: %w", err)
1✔
117
        }
1✔
118
        if err := utils.Docker.VolumeRemove(ctx, utils.DbId, true); err != nil {
1✔
119
                return errors.Errorf("failed to remove volume: %w", err)
×
120
        }
×
121
        config := start.NewContainerConfig()
1✔
122
        hostConfig := start.NewHostConfig()
1✔
123
        networkingConfig := network.NetworkingConfig{
1✔
124
                EndpointsConfig: map[string]*network.EndpointSettings{
1✔
125
                        utils.NetId: {
1✔
126
                                Aliases: utils.DbAliases,
1✔
127
                        },
1✔
128
                },
1✔
129
        }
1✔
130
        fmt.Fprintln(os.Stderr, "Recreating database...")
1✔
131
        if _, err := utils.DockerStart(ctx, config, hostConfig, networkingConfig, utils.DbId); err != nil {
1✔
132
                return err
×
133
        }
×
134
        if err := start.WaitForHealthyService(ctx, start.HealthTimeout, utils.DbId); err != nil {
1✔
135
                return err
×
136
        }
×
137
        if err := start.SetupLocalDatabase(ctx, version, fsys, os.Stderr, options...); err != nil {
1✔
138
                return err
×
139
        }
×
140
        fmt.Fprintln(os.Stderr, "Restarting containers...")
1✔
141
        return restartServices(ctx)
1✔
142
}
143

144
func initDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) error {
3✔
145
        conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{User: utils.SUPERUSER_ROLE}, options...)
3✔
146
        if err != nil {
4✔
147
                return err
1✔
148
        }
1✔
149
        defer conn.Close(context.Background())
2✔
150
        return start.InitSchema14(ctx, conn)
2✔
151
}
152

153
// Recreate postgres database by connecting to template1
154
func recreateDatabase(ctx context.Context, options ...func(*pgx.ConnConfig)) error {
5✔
155
        conn, err := utils.ConnectLocalPostgres(ctx, pgconn.Config{User: utils.SUPERUSER_ROLE, Database: "template1"}, options...)
5✔
156
        if err != nil {
6✔
157
                return err
1✔
158
        }
1✔
159
        defer conn.Close(context.Background())
4✔
160
        if err := DisconnectClients(ctx, conn); err != nil {
6✔
161
                return err
2✔
162
        }
2✔
163
        // We are not dropping roles here because they are cluster level entities. Use stop && start instead.
164
        sql := migration.MigrationFile{
2✔
165
                Statements: []string{
2✔
166
                        "DROP DATABASE IF EXISTS postgres WITH (FORCE)",
2✔
167
                        "CREATE DATABASE postgres WITH OWNER postgres",
2✔
168
                        "DROP DATABASE IF EXISTS _supabase WITH (FORCE)",
2✔
169
                        "CREATE DATABASE _supabase WITH OWNER postgres",
2✔
170
                },
2✔
171
        }
2✔
172
        return sql.ExecBatch(ctx, conn)
2✔
173
}
174

175
const (
176
        TERMINATE_BACKENDS      = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname IN ('postgres', '_supabase')"
177
        COUNT_REPLICATION_SLOTS = "SELECT COUNT(*) FROM pg_replication_slots WHERE database IN ('postgres', '_supabase')"
178
)
179

180
func DisconnectClients(ctx context.Context, conn *pgx.Conn) error {
9✔
181
        // Must be executed separately because looping in transaction is unsupported
9✔
182
        // https://dba.stackexchange.com/a/11895
9✔
183
        disconn := migration.MigrationFile{
9✔
184
                Statements: []string{
9✔
185
                        "ALTER DATABASE postgres ALLOW_CONNECTIONS false",
9✔
186
                        "ALTER DATABASE _supabase ALLOW_CONNECTIONS false",
9✔
187
                        TERMINATE_BACKENDS,
9✔
188
                },
9✔
189
        }
9✔
190
        if err := disconn.ExecBatch(ctx, conn); err != nil {
13✔
191
                var pgErr *pgconn.PgError
4✔
192
                if errors.As(err, &pgErr) && pgErr.Code != pgerrcode.InvalidCatalogName {
6✔
193
                        return errors.Errorf("failed to disconnect clients: %w", err)
2✔
194
                }
2✔
195
        }
196
        // Wait for WAL senders to drop their replication slots
197
        policy := start.NewBackoffPolicy(ctx, 10*time.Second)
7✔
198
        waitForDrop := func() error {
14✔
199
                var count int
7✔
200
                if err := conn.QueryRow(ctx, COUNT_REPLICATION_SLOTS).Scan(&count); err != nil {
9✔
201
                        err = errors.Errorf("failed to count replication slots: %w", err)
2✔
202
                        return &backoff.PermanentError{Err: err}
2✔
203
                } else if count > 0 {
7✔
204
                        return errors.Errorf("replication slots still active: %d", count)
×
205
                }
×
206
                return nil
5✔
207
        }
208
        return backoff.Retry(waitForDrop, policy)
7✔
209
}
210

211
func RestartDatabase(ctx context.Context, w io.Writer) error {
7✔
212
        fmt.Fprintln(w, "Restarting containers...")
7✔
213
        // Some extensions must be manually restarted after pg_terminate_backend
7✔
214
        // Ref: https://github.com/citusdata/pg_cron/issues/99
7✔
215
        if err := utils.Docker.ContainerRestart(ctx, utils.DbId, container.StopOptions{}); err != nil {
11✔
216
                return errors.Errorf("failed to restart container: %w", err)
4✔
217
        }
4✔
218
        if err := start.WaitForHealthyService(ctx, start.HealthTimeout, utils.DbId); err != nil {
4✔
219
                return err
1✔
220
        }
1✔
221
        return restartServices(ctx)
2✔
222
}
223

224
func restartServices(ctx context.Context) error {
3✔
225
        // No need to restart PostgREST because it automatically reconnects and listens for schema changes
3✔
226
        services := listServicesToRestart()
3✔
227
        result := utils.WaitAll(services, func(id string) error {
15✔
228
                if err := utils.Docker.ContainerRestart(ctx, id, container.StopOptions{}); err != nil && !errdefs.IsNotFound(err) {
15✔
229
                        return errors.Errorf("failed to restart %s: %w", id, err)
3✔
230
                }
3✔
231
                return nil
9✔
232
        })
233
        // Do not wait for service healthy as those services may be excluded from starting
234
        return errors.Join(result...)
3✔
235
}
236

237
func listServicesToRestart() []string {
5✔
238
        return []string{utils.StorageId, utils.GotrueId, utils.RealtimeId, utils.PoolerId}
5✔
239
}
5✔
240

241
func resetRemote(ctx context.Context, version string, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
2✔
242
        msg := "Do you want to reset the remote database?"
2✔
243
        if shouldReset, err := utils.NewConsole().PromptYesNo(ctx, msg, false); err != nil {
2✔
244
                return err
×
245
        } else if !shouldReset {
3✔
246
                return errors.New(context.Canceled)
1✔
247
        }
1✔
248

249
        fmt.Fprintln(os.Stderr, "Resetting remote database"+toLogMessage(version))
1✔
250

1✔
251
        conn, err := utils.ConnectByConfigStream(ctx, config, io.Discard, options...)
1✔
252
        if err != nil {
2✔
253
                return err
1✔
254
        }
1✔
255
        defer conn.Close(context.Background())
×
NEW
256
        if err := resetPgmqExtension(ctx, conn); err != nil {
×
NEW
257
                return err
×
NEW
258
        }
×
UNCOV
259
        return down.ResetAll(ctx, version, conn, fsys)
×
260
}
261

NEW
262
func resetPgmqExtension(ctx context.Context, conn *pgx.Conn) error {
×
NEW
263
        var exists bool
×
NEW
264
        if err := conn.QueryRow(ctx, "SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pgmq')").Scan(&exists); err != nil {
×
NEW
265
                return errors.Errorf("failed to check pgmq extension: %w", err)
×
NEW
266
        }
×
NEW
267
        if !exists {
×
NEW
268
                return nil
×
NEW
269
        }
×
NEW
270
        if _, err := conn.Exec(ctx, "DROP EXTENSION IF EXISTS pgmq CASCADE"); err != nil {
×
NEW
271
                return errors.Errorf("failed to drop pgmq extension: %w", err)
×
NEW
272
        }
×
NEW
273
        policy := start.NewBackoffPolicy(ctx, 10*time.Second)
×
NEW
274
        createExtension := func() error {
×
NEW
275
                _, err := conn.Exec(ctx, "CREATE EXTENSION IF NOT EXISTS pgmq")
×
NEW
276
                return err
×
NEW
277
        }
×
NEW
278
        if err := backoff.Retry(createExtension, policy); err != nil {
×
NEW
279
                return errors.Errorf("failed to recreate pgmq extension: %w", err)
×
NEW
280
        }
×
NEW
281
        return nil
×
282
}
283

284
func LikeEscapeSchema(schemas []string) (result []string) {
13✔
285
        // Treat _ as literal, * as any character
13✔
286
        replacer := strings.NewReplacer("_", `\_`, "*", "%")
13✔
287
        for _, sch := range schemas {
377✔
288
                result = append(result, replacer.Replace(sch))
364✔
289
        }
364✔
290
        return result
13✔
291
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc