• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

golang-migrate / migrate / 19729155206

27 Nov 2025 07:58AM UTC coverage: 54.432% (+0.4%) from 54.037%
19729155206

Pull #1322

github

leonklingele
chore: bring back unused util.go file as removing it is a breaking change
Pull Request #1322: chore: remove dependency on "hashicorp/go-multierror"

1 of 68 new or added lines in 22 files covered. (1.47%)

19 existing lines in 2 files now uncovered.

4378 of 8043 relevant lines covered (54.43%)

48.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.31
/database/postgres/postgres.go
1
//go:build go1.9
2

3
package postgres
4

5
import (
6
        "context"
7
        "database/sql"
8
        "errors"
9
        "fmt"
10
        "io"
11
        nurl "net/url"
12
        "regexp"
13
        "strconv"
14
        "strings"
15
        "sync/atomic"
16
        "time"
17

18
        "github.com/golang-migrate/migrate/v4"
19
        "github.com/golang-migrate/migrate/v4/database"
20
        "github.com/golang-migrate/migrate/v4/database/multistmt"
21
        "github.com/lib/pq"
22
)
23

24
func init() {
2✔
25
        db := Postgres{}
2✔
26
        database.Register("postgres", &db)
2✔
27
        database.Register("postgresql", &db)
2✔
28
}
2✔
29

30
var (
31
        multiStmtDelimiter = []byte(";")
32

33
        DefaultMigrationsTable       = "schema_migrations"
34
        DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB
35
)
36

37
var (
38
        ErrNilConfig      = fmt.Errorf("no config")
39
        ErrNoDatabaseName = fmt.Errorf("no database name")
40
        ErrNoSchema       = fmt.Errorf("no schema")
41
        ErrDatabaseDirty  = fmt.Errorf("database is dirty")
42
)
43

44
type Config struct {
45
        MigrationsTable       string
46
        MigrationsTableQuoted bool
47
        MultiStatementEnabled bool
48
        DatabaseName          string
49
        SchemaName            string
50
        migrationsSchemaName  string
51
        migrationsTableName   string
52
        StatementTimeout      time.Duration
53
        MultiStatementMaxSize int
54
}
55

56
type Postgres struct {
57
        // Locking and unlocking need to use the same connection
58
        conn     *sql.Conn
59
        db       *sql.DB
60
        isLocked atomic.Bool
61

62
        // Open and WithInstance need to guarantee that config is never nil
63
        config *Config
64
}
65

66
func WithConnection(ctx context.Context, conn *sql.Conn, config *Config) (*Postgres, error) {
530✔
67
        if config == nil {
530✔
68
                return nil, ErrNilConfig
×
69
        }
×
70

71
        if err := conn.PingContext(ctx); err != nil {
530✔
72
                return nil, err
×
73
        }
×
74

75
        if config.DatabaseName == "" {
840✔
76
                query := `SELECT CURRENT_DATABASE()`
310✔
77
                var databaseName string
310✔
78
                if err := conn.QueryRowContext(ctx, query).Scan(&databaseName); err != nil {
310✔
79
                        return nil, &database.Error{OrigErr: err, Query: []byte(query)}
×
80
                }
×
81

82
                if len(databaseName) == 0 {
310✔
83
                        return nil, ErrNoDatabaseName
×
84
                }
×
85

86
                config.DatabaseName = databaseName
310✔
87
        }
88

89
        if config.SchemaName == "" {
1,060✔
90
                query := `SELECT CURRENT_SCHEMA()`
530✔
91
                var schemaName sql.NullString
530✔
92
                if err := conn.QueryRowContext(ctx, query).Scan(&schemaName); err != nil {
530✔
93
                        return nil, &database.Error{OrigErr: err, Query: []byte(query)}
×
94
                }
×
95

96
                if !schemaName.Valid {
530✔
97
                        return nil, ErrNoSchema
×
98
                }
×
99

100
                config.SchemaName = schemaName.String
530✔
101
        }
102

103
        if len(config.MigrationsTable) == 0 {
1,020✔
104
                config.MigrationsTable = DefaultMigrationsTable
490✔
105
        }
490✔
106

107
        config.migrationsSchemaName = config.SchemaName
530✔
108
        config.migrationsTableName = config.MigrationsTable
530✔
109
        if config.MigrationsTableQuoted {
560✔
110
                re := regexp.MustCompile(`"(.*?)"`)
30✔
111
                result := re.FindAllStringSubmatch(config.MigrationsTable, -1)
30✔
112
                config.migrationsTableName = result[len(result)-1][1]
30✔
113
                if len(result) == 2 {
50✔
114
                        config.migrationsSchemaName = result[0][1]
20✔
115
                } else if len(result) > 2 {
40✔
116
                        return nil, fmt.Errorf("\"%s\" MigrationsTable contains too many dot characters", config.MigrationsTable)
10✔
117
                }
10✔
118
        }
119

120
        px := &Postgres{
520✔
121
                conn:   conn,
520✔
122
                config: config,
520✔
123
        }
520✔
124

520✔
125
        if err := px.ensureVersionTable(); err != nil {
540✔
126
                return nil, err
20✔
127
        }
20✔
128

129
        return px, nil
500✔
130
}
131

132
func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
520✔
133
        ctx := context.Background()
520✔
134

520✔
135
        if err := instance.Ping(); err != nil {
520✔
136
                return nil, err
×
137
        }
×
138

139
        conn, err := instance.Conn(ctx)
520✔
140
        if err != nil {
520✔
141
                return nil, err
×
142
        }
×
143

144
        px, err := WithConnection(ctx, conn, config)
520✔
145
        if err != nil {
550✔
146
                return nil, err
30✔
147
        }
30✔
148
        px.db = instance
490✔
149
        return px, nil
490✔
150
}
151

152
func (p *Postgres) Open(url string) (database.Driver, error) {
230✔
153
        purl, err := nurl.Parse(url)
230✔
154
        if err != nil {
230✔
155
                return nil, err
×
156
        }
×
157

158
        db, err := sql.Open("postgres", migrate.FilterCustomQuery(purl).String())
230✔
159
        if err != nil {
230✔
160
                return nil, err
×
161
        }
×
162

163
        migrationsTable := purl.Query().Get("x-migrations-table")
230✔
164
        migrationsTableQuoted := false
230✔
165
        if s := purl.Query().Get("x-migrations-table-quoted"); len(s) > 0 {
270✔
166
                migrationsTableQuoted, err = strconv.ParseBool(s)
40✔
167
                if err != nil {
40✔
168
                        return nil, fmt.Errorf("Unable to parse option x-migrations-table-quoted: %w", err)
×
169
                }
×
170
        }
171
        if (len(migrationsTable) > 0) && (migrationsTableQuoted) && ((migrationsTable[0] != '"') || (migrationsTable[len(migrationsTable)-1] != '"')) {
240✔
172
                return nil, fmt.Errorf("x-migrations-table must be quoted (for instance '\"migrate\".\"schema_migrations\"') when x-migrations-table-quoted is enabled, current value is: %s", migrationsTable)
10✔
173
        }
10✔
174

175
        statementTimeoutString := purl.Query().Get("x-statement-timeout")
220✔
176
        statementTimeout := 0
220✔
177
        if statementTimeoutString != "" {
220✔
178
                statementTimeout, err = strconv.Atoi(statementTimeoutString)
×
179
                if err != nil {
×
180
                        return nil, err
×
181
                }
×
182
        }
183

184
        multiStatementMaxSize := DefaultMultiStatementMaxSize
220✔
185
        if s := purl.Query().Get("x-multi-statement-max-size"); len(s) > 0 {
220✔
186
                multiStatementMaxSize, err = strconv.Atoi(s)
×
187
                if err != nil {
×
188
                        return nil, err
×
189
                }
×
190
                if multiStatementMaxSize <= 0 {
×
191
                        multiStatementMaxSize = DefaultMultiStatementMaxSize
×
192
                }
×
193
        }
194

195
        multiStatementEnabled := false
220✔
196
        if s := purl.Query().Get("x-multi-statement"); len(s) > 0 {
230✔
197
                multiStatementEnabled, err = strconv.ParseBool(s)
10✔
198
                if err != nil {
10✔
199
                        return nil, fmt.Errorf("Unable to parse option x-multi-statement: %w", err)
×
200
                }
×
201
        }
202

203
        px, err := WithInstance(db, &Config{
220✔
204
                DatabaseName:          purl.Path,
220✔
205
                MigrationsTable:       migrationsTable,
220✔
206
                MigrationsTableQuoted: migrationsTableQuoted,
220✔
207
                StatementTimeout:      time.Duration(statementTimeout) * time.Millisecond,
220✔
208
                MultiStatementEnabled: multiStatementEnabled,
220✔
209
                MultiStatementMaxSize: multiStatementMaxSize,
220✔
210
        })
220✔
211

220✔
212
        if err != nil {
250✔
213
                return nil, err
30✔
214
        }
30✔
215

216
        return px, nil
190✔
217
}
218

219
func (p *Postgres) Close() error {
170✔
220
        connErr := p.conn.Close()
170✔
221
        var dbErr error
170✔
222
        if p.db != nil {
330✔
223
                dbErr = p.db.Close()
160✔
224
        }
160✔
225

226
        if connErr != nil || dbErr != nil {
170✔
227
                return fmt.Errorf("conn: %v, db: %v", connErr, dbErr)
×
228
        }
×
229
        return nil
170✔
230
}
231

232
// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
233
func (p *Postgres) Lock() error {
670✔
234
        return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error {
1,310✔
235
                aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
640✔
236
                if err != nil {
640✔
237
                        return err
×
238
                }
×
239

240
                // This will wait indefinitely until the lock can be acquired.
241
                query := `SELECT pg_advisory_lock($1)`
640✔
242
                if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
640✔
243
                        return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
×
244
                }
×
245

246
                return nil
640✔
247
        })
248
}
249

250
func (p *Postgres) Unlock() error {
640✔
251
        return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error {
1,280✔
252
                aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
640✔
253
                if err != nil {
640✔
254
                        return err
×
255
                }
×
256

257
                query := `SELECT pg_advisory_unlock($1)`
640✔
258
                if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
640✔
259
                        return &database.Error{OrigErr: err, Query: []byte(query)}
×
260
                }
×
261
                return nil
640✔
262
        })
263
}
264

265
func (p *Postgres) Run(migration io.Reader) error {
300✔
266
        if p.config.MultiStatementEnabled {
310✔
267
                var err error
10✔
268
                if e := multistmt.Parse(migration, multiStmtDelimiter, p.config.MultiStatementMaxSize, func(m []byte) bool {
30✔
269
                        if err = p.runStatement(m); err != nil {
20✔
270
                                return false
×
271
                        }
×
272
                        return true
20✔
273
                }); e != nil {
×
274
                        return e
×
275
                }
×
276
                return err
10✔
277
        }
278
        migr, err := io.ReadAll(migration)
290✔
279
        if err != nil {
290✔
280
                return err
×
281
        }
×
282
        return p.runStatement(migr)
290✔
283
}
284

285
func (p *Postgres) runStatement(statement []byte) error {
310✔
286
        ctx := context.Background()
310✔
287
        if p.config.StatementTimeout != 0 {
310✔
288
                var cancel context.CancelFunc
×
289
                ctx, cancel = context.WithTimeout(ctx, p.config.StatementTimeout)
×
290
                defer cancel()
×
291
        }
×
292
        query := string(statement)
310✔
293
        if strings.TrimSpace(query) == "" {
310✔
294
                return nil
×
295
        }
×
296
        if _, err := p.conn.ExecContext(ctx, query); err != nil {
320✔
297
                if pgErr, ok := err.(*pq.Error); ok {
20✔
298
                        var line uint
10✔
299
                        var col uint
10✔
300
                        var lineColOK bool
10✔
301
                        if pgErr.Position != "" {
20✔
302
                                if pos, err := strconv.ParseUint(pgErr.Position, 10, 64); err == nil {
20✔
303
                                        line, col, lineColOK = computeLineFromPos(query, int(pos))
10✔
304
                                }
10✔
305
                        }
306
                        message := fmt.Sprintf("migration failed: %s", pgErr.Message)
10✔
307
                        if lineColOK {
20✔
308
                                message = fmt.Sprintf("%s (column %d)", message, col)
10✔
309
                        }
10✔
310
                        if pgErr.Detail != "" {
10✔
311
                                message = fmt.Sprintf("%s, %s", message, pgErr.Detail)
×
312
                        }
×
313
                        return database.Error{OrigErr: err, Err: message, Query: statement, Line: line}
10✔
314
                }
315
                return database.Error{OrigErr: err, Err: "migration failed", Query: statement}
×
316
        }
317
        return nil
300✔
318
}
319

320
func computeLineFromPos(s string, pos int) (line uint, col uint, ok bool) {
74✔
321
        // replace crlf with lf
74✔
322
        s = strings.Replace(s, "\r\n", "\n", -1)
74✔
323
        // pg docs: pos uses index 1 for the first character, and positions are measured in characters not bytes
74✔
324
        runes := []rune(s)
74✔
325
        if pos > len(runes) {
82✔
326
                return 0, 0, false
8✔
327
        }
8✔
328
        sel := runes[:pos]
66✔
329
        line = uint(runesCount(sel, newLine) + 1)
66✔
330
        col = uint(pos - 1 - runesLastIndex(sel, newLine))
66✔
331
        return line, col, true
66✔
332
}
333

334
const newLine = '\n'
335

336
func runesCount(input []rune, target rune) int {
66✔
337
        var count int
66✔
338
        for _, r := range input {
1,404✔
339
                if r == target {
1,442✔
340
                        count++
104✔
341
                }
104✔
342
        }
343
        return count
66✔
344
}
345

346
func runesLastIndex(input []rune, target rune) int {
66✔
347
        for i := len(input) - 1; i >= 0; i-- {
780✔
348
                if input[i] == target {
770✔
349
                        return i
56✔
350
                }
56✔
351
        }
352
        return -1
10✔
353
}
354

355
func (p *Postgres) SetVersion(version int, dirty bool) error {
380✔
356
        tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{})
380✔
357
        if err != nil {
380✔
358
                return &database.Error{OrigErr: err, Err: "transaction start failed"}
×
359
        }
×
360

361
        query := `TRUNCATE ` + pq.QuoteIdentifier(p.config.migrationsSchemaName) + `.` + pq.QuoteIdentifier(p.config.migrationsTableName)
380✔
362
        if _, err := tx.Exec(query); err != nil {
380✔
363
                if errRollback := tx.Rollback(); errRollback != nil {
×
NEW
364
                        err = errors.Join(err, errRollback)
×
365
                }
×
366
                return &database.Error{OrigErr: err, Query: []byte(query)}
×
367
        }
368

369
        // Also re-write the schema version for nil dirty versions to prevent
370
        // empty schema version for failed down migration on the first migration
371
        // See: https://github.com/golang-migrate/migrate/issues/330
372
        if version >= 0 || (version == database.NilVersion && dirty) {
730✔
373
                query = `INSERT INTO ` + pq.QuoteIdentifier(p.config.migrationsSchemaName) + `.` + pq.QuoteIdentifier(p.config.migrationsTableName) + ` (version, dirty) VALUES ($1, $2)`
350✔
374
                if _, err := tx.Exec(query, version, dirty); err != nil {
350✔
375
                        if errRollback := tx.Rollback(); errRollback != nil {
×
NEW
376
                                err = errors.Join(err, errRollback)
×
377
                        }
×
378
                        return &database.Error{OrigErr: err, Query: []byte(query)}
×
379
                }
380
        }
381

382
        if err := tx.Commit(); err != nil {
380✔
383
                return &database.Error{OrigErr: err, Err: "transaction commit failed"}
×
384
        }
×
385

386
        return nil
380✔
387
}
388

389
func (p *Postgres) Version() (version int, dirty bool, err error) {
260✔
390
        query := `SELECT version, dirty FROM ` + pq.QuoteIdentifier(p.config.migrationsSchemaName) + `.` + pq.QuoteIdentifier(p.config.migrationsTableName) + ` LIMIT 1`
260✔
391
        err = p.conn.QueryRowContext(context.Background(), query).Scan(&version, &dirty)
260✔
392
        switch {
260✔
393
        case err == sql.ErrNoRows:
90✔
394
                return database.NilVersion, false, nil
90✔
395

396
        case err != nil:
×
397
                if e, ok := err.(*pq.Error); ok {
×
398
                        if e.Code.Name() == "undefined_table" {
×
399
                                return database.NilVersion, false, nil
×
400
                        }
×
401
                }
402
                return 0, false, &database.Error{OrigErr: err, Query: []byte(query)}
×
403

404
        default:
170✔
405
                return version, dirty, nil
170✔
406
        }
407
}
408

409
func (p *Postgres) Drop() (err error) {
40✔
410
        // select all tables in current schema
40✔
411
        query := `SELECT table_name FROM information_schema.tables WHERE table_schema=(SELECT current_schema()) AND table_type='BASE TABLE'`
40✔
412
        tables, err := p.conn.QueryContext(context.Background(), query)
40✔
413
        if err != nil {
40✔
414
                return &database.Error{OrigErr: err, Query: []byte(query)}
×
415
        }
×
416
        defer func() {
80✔
417
                if errClose := tables.Close(); errClose != nil {
40✔
NEW
418
                        err = errors.Join(err, errClose)
×
419
                }
×
420
        }()
421

422
        // delete one table after another
423
        tableNames := make([]string, 0)
40✔
424
        for tables.Next() {
110✔
425
                var tableName string
70✔
426
                if err := tables.Scan(&tableName); err != nil {
70✔
427
                        return err
×
428
                }
×
429
                if len(tableName) > 0 {
140✔
430
                        tableNames = append(tableNames, tableName)
70✔
431
                }
70✔
432
        }
433
        if err := tables.Err(); err != nil {
40✔
434
                return &database.Error{OrigErr: err, Query: []byte(query)}
×
435
        }
×
436

437
        if len(tableNames) > 0 {
80✔
438
                // delete one by one ...
40✔
439
                for _, t := range tableNames {
110✔
440
                        query = `DROP TABLE IF EXISTS ` + pq.QuoteIdentifier(t) + ` CASCADE`
70✔
441
                        if _, err := p.conn.ExecContext(context.Background(), query); err != nil {
70✔
442
                                return &database.Error{OrigErr: err, Query: []byte(query)}
×
443
                        }
×
444
                }
445
        }
446

447
        return nil
40✔
448
}
449

450
// ensureVersionTable checks if versions table exists and, if not, creates it.
451
// Note that this function locks the database, which deviates from the usual
452
// convention of "caller locks" in the Postgres type.
453
func (p *Postgres) ensureVersionTable() (err error) {
520✔
454
        if err = p.Lock(); err != nil {
520✔
455
                return err
×
456
        }
×
457

458
        defer func() {
1,040✔
459
                if e := p.Unlock(); e != nil {
520✔
NEW
460
                        err = errors.Join(err, e)
×
461
                }
×
462
        }()
463

464
        // This block checks whether the `MigrationsTable` already exists. This is useful because it allows read only postgres
465
        // users to also check the current version of the schema. Previously, even if `MigrationsTable` existed, the
466
        // `CREATE TABLE IF NOT EXISTS...` query would fail because the user does not have the CREATE permission.
467
        // Taken from https://github.com/mattes/migrate/blob/master/database/postgres/postgres.go#L258
468
        query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2 LIMIT 1`
520✔
469
        row := p.conn.QueryRowContext(context.Background(), query, p.config.migrationsSchemaName, p.config.migrationsTableName)
520✔
470

520✔
471
        var count int
520✔
472
        err = row.Scan(&count)
520✔
473
        if err != nil {
520✔
474
                return &database.Error{OrigErr: err, Query: []byte(query)}
×
475
        }
×
476

477
        if count == 1 {
820✔
478
                return nil
300✔
479
        }
300✔
480

481
        query = `CREATE TABLE IF NOT EXISTS ` + pq.QuoteIdentifier(p.config.migrationsSchemaName) + `.` + pq.QuoteIdentifier(p.config.migrationsTableName) + ` (version bigint not null primary key, dirty boolean not null)`
220✔
482
        if _, err = p.conn.ExecContext(context.Background(), query); err != nil {
240✔
483
                return &database.Error{OrigErr: err, Query: []byte(query)}
20✔
484
        }
20✔
485

486
        return nil
200✔
487
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc