• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

golang-migrate / migrate / 19823253020

01 Dec 2025 12:49PM UTC coverage: 54.627% (+0.2%) from 54.432%
19823253020

Pull #1340

github

jferrl
feat(spanner): add DML statement support in migration files
Pull Request #1340: feat(spanner): add DML statement support in migration files

66 of 83 new or added lines in 1 file covered. (79.52%)

2 existing lines in 1 file now uncovered.

4427 of 8104 relevant lines covered (54.63%)

48.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.74
/database/spanner/spanner.go
1
package spanner
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "log"
9
        nurl "net/url"
10
        "regexp"
11
        "strconv"
12
        "strings"
13
        "sync/atomic"
14

15
        "cloud.google.com/go/spanner"
16
        sdb "cloud.google.com/go/spanner/admin/database/apiv1"
17
        "cloud.google.com/go/spanner/spansql"
18

19
        "github.com/golang-migrate/migrate/v4"
20
        "github.com/golang-migrate/migrate/v4/database"
21

22
        adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
23
        "google.golang.org/api/iterator"
24
)
25

26
func init() {
2✔
27
        db := Spanner{}
2✔
28
        database.Register("spanner", &db)
2✔
29
}
2✔
30

31
// DefaultMigrationsTable is used if no custom table is specified
32
const DefaultMigrationsTable = "SchemaMigrations"
33

34
type statementKind int
35

36
const (
37
        statementKindDDL statementKind = iota + 1
38
        statementKindDML
39
        statementKindPartitionedDML
40
)
41

42
// ErrNilConfig is returned when no configuration is provided.
43
var ErrNilConfig = errors.New("no config")
44

45
// ErrNoDatabaseName is returned when the database name is not specified in the configuration.
46
var ErrNoDatabaseName = errors.New("no database name")
47

48
// ErrNoSchema is returned when no schema is available.
49
var ErrNoSchema = errors.New("no schema")
50

51
// ErrDatabaseDirty is returned when the database has a dirty migration state.
52
var ErrDatabaseDirty = errors.New("database is dirty")
53

54
// ErrLockHeld is returned when attempting to acquire a lock that is already held.
55
var ErrLockHeld = errors.New("unable to obtain lock")
56

57
// ErrLockNotHeld is returned when attempting to release a lock that is not held.
58
var ErrLockNotHeld = errors.New("unable to release already released lock")
59

60
// ErrMixedStatements is returned when a migration file contains a mix of DDL,
61
// DML (INSERT), and partitioned DML (UPDATE or DELETE) statements.
62
var ErrMixedStatements = errors.New("DDL, DML (INSERT), and partitioned DML (UPDATE or DELETE) must not be combined in the same migration file")
63

64
// ErrEmptyMigration is returned when a migration file is empty or contains only whitespace.
65
var ErrEmptyMigration = errors.New("empty migration")
66

67
// ErrInvalidDMLStatementKind is returned when an unrecognized DML statement type is encountered.
68
var ErrInvalidDMLStatementKind = errors.New("invalid DML statement kind")
69

70
// Config holds the configuration for a Spanner database driver instance.
71
type Config struct {
72
        // MigrationsTable is the name of the table used to track migration versions.
73
        // If empty, DefaultMigrationsTable is used.
74
        MigrationsTable string
75

76
        // DatabaseName is the fully qualified Spanner database name
77
        // (e.g., "projects/{project}/instances/{instance}/databases/{database}").
78
        DatabaseName string
79

80
        // Deprecated: CleanStatements is no longer needed. Migration statements are
81
        // now automatically parsed to detect their type (DDL, DML, PartitionedDML)
82
        // and comments are stripped during parsing. This field is ignored.
83
        CleanStatements bool
84
}
85

86
// Spanner implements database.Driver for Google Cloud Spanner.
87
// It supports DDL statements (CREATE, ALTER, DROP), DML statements (INSERT),
88
// and partitioned DML statements (UPDATE, DELETE) in migration files.
89
type Spanner struct {
90
        db *DB
91

92
        config *Config
93

94
        lock atomic.Bool
95
}
96

97
// DB holds the Spanner client connections for both administrative operations
98
// (schema changes) and data operations (queries and mutations).
99
type DB struct {
100
        admin *sdb.DatabaseAdminClient
101
        data  *spanner.Client
102
}
103

104
// NewDB creates a new DB instance with the provided admin and data clients.
105
// The admin client is used for DDL operations, while the data client is used
106
// for DML operations and queries.
107
func NewDB(admin sdb.DatabaseAdminClient, data spanner.Client) *DB {
×
108
        return &DB{
×
109
                admin: &admin,
×
110
                data:  &data,
×
111
        }
×
112
}
×
113

114
// WithInstance creates a new Spanner driver using an existing DB instance.
115
// It validates the configuration and ensures the migrations version table exists.
116
func WithInstance(instance *DB, config *Config) (database.Driver, error) {
4✔
117
        if config == nil {
4✔
118
                return nil, ErrNilConfig
×
119
        }
×
120

121
        if len(config.DatabaseName) == 0 {
4✔
122
                return nil, ErrNoDatabaseName
×
123
        }
×
124

125
        if len(config.MigrationsTable) == 0 {
8✔
126
                config.MigrationsTable = DefaultMigrationsTable
4✔
127
        }
4✔
128

129
        sx := &Spanner{
4✔
130
                db:     instance,
4✔
131
                config: config,
4✔
132
        }
4✔
133

4✔
134
        if err := sx.ensureVersionTable(); err != nil {
4✔
135
                return nil, err
×
136
        }
×
137

138
        return sx, nil
4✔
139
}
140

141
// Open parses the connection URL and creates a new Spanner driver instance.
142
// The URL format is: spanner://projects/{project}/instances/{instance}/databases/{database}
143
//
144
// Supported query parameters:
145
//   - x-migrations-table: Custom name for the migrations tracking table
146
//   - x-clean-statements: Deprecated, this parameter is ignored. Statements are now always parsed.
147
func (s *Spanner) Open(url string) (database.Driver, error) {
4✔
148
        purl, err := nurl.Parse(url)
4✔
149
        if err != nil {
4✔
150
                return nil, err
×
151
        }
×
152

153
        ctx := context.Background()
4✔
154

4✔
155
        adminClient, err := sdb.NewDatabaseAdminClient(ctx)
4✔
156
        if err != nil {
4✔
157
                return nil, err
×
158
        }
×
159
        dbname := strings.Replace(migrate.FilterCustomQuery(purl).String(), "spanner://", "", 1)
4✔
160
        dataClient, err := spanner.NewClient(ctx, dbname)
4✔
161
        if err != nil {
4✔
162
                log.Fatal(err)
×
163
        }
×
164

165
        migrationsTable := purl.Query().Get("x-migrations-table")
4✔
166

4✔
167
        cleanQuery := purl.Query().Get("x-clean-statements")
4✔
168
        clean := false
4✔
169
        if cleanQuery != "" {
4✔
170
                clean, err = strconv.ParseBool(cleanQuery)
×
171
                if err != nil {
×
172
                        return nil, err
×
173
                }
×
174
        }
175

176
        db := &DB{admin: adminClient, data: dataClient}
4✔
177
        return WithInstance(db, &Config{
4✔
178
                DatabaseName:    dbname,
4✔
179
                MigrationsTable: migrationsTable,
4✔
180
                CleanStatements: clean,
4✔
181
        })
4✔
182
}
183

184
// Close releases all resources held by the Spanner driver, including
185
// both the admin and data client connections.
186
func (s *Spanner) Close() error {
×
187
        s.db.data.Close()
×
188
        return s.db.admin.Close()
×
189
}
×
190

191
// Lock acquires an in-memory lock to prevent concurrent migrations.
192
// Note: This is a local lock only; Spanner DDL operations are inherently
193
// serialized by the service through UpdateDatabaseDdl request queuing.
194
func (s *Spanner) Lock() error {
14✔
195
        if swapped := s.lock.CompareAndSwap(false, true); swapped {
26✔
196
                return nil
12✔
197
        }
12✔
198
        return ErrLockHeld
2✔
199
}
200

201
// Unlock releases the in-memory lock acquired by Lock.
202
func (s *Spanner) Unlock() error {
12✔
203
        if swapped := s.lock.CompareAndSwap(true, false); swapped {
24✔
204
                return nil
12✔
205
        }
12✔
206
        return ErrLockNotHeld
×
207
}
208

209
// Run executes the migration statements read from the provided reader.
210
// It automatically detects the statement type and routes execution accordingly:
211
//   - DDL statements (CREATE, ALTER, DROP): Executed via UpdateDatabaseDdl
212
//   - DML statements (INSERT): Executed within a read-write transaction
213
//   - Partitioned DML (UPDATE, DELETE): Executed via PartitionedUpdate
214
//
215
// Migration files must not mix different statement types.
216
func (s *Spanner) Run(migration io.Reader) error {
18✔
217
        migr, err := io.ReadAll(migration)
18✔
218
        if err != nil {
18✔
219
                return err
×
220
        }
×
221

222
        ctx := context.Background()
18✔
223

18✔
224
        stmts, kind, err := parseStatements(migr)
18✔
225
        if err != nil {
18✔
NEW
226
                return &database.Error{OrigErr: err, Err: "failed to parse migration", Query: migr}
×
UNCOV
227
        }
×
228

229
        switch kind {
18✔
230
        case statementKindDDL:
14✔
231
                return s.runDDL(ctx, stmts, migr)
14✔
232
        case statementKindDML:
2✔
233
                return s.runDML(ctx, stmts, migr)
2✔
234
        case statementKindPartitionedDML:
2✔
235
                return s.runPartitionedDML(ctx, stmts, migr)
2✔
NEW
236
        default:
×
NEW
237
                return &database.Error{OrigErr: ErrInvalidDMLStatementKind, Err: "unknown statement kind", Query: migr}
×
238
        }
239
}
240

241
func (s *Spanner) runDDL(ctx context.Context, stmts []string, migr []byte) error {
14✔
242
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
14✔
243
                Database:   s.config.DatabaseName,
14✔
244
                Statements: stmts,
14✔
245
        })
14✔
246
        if err != nil {
14✔
UNCOV
247
                return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
×
248
        }
×
249

250
        if err := op.Wait(ctx); err != nil {
14✔
251
                return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
×
252
        }
×
253

254
        return nil
14✔
255
}
256

257
func (s *Spanner) runDML(ctx context.Context, stmts []string, migr []byte) error {
2✔
258
        _, err := s.db.data.ReadWriteTransaction(ctx,
2✔
259
                func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
4✔
260
                        for _, stmt := range stmts {
6✔
261
                                if _, err := txn.Update(ctx, spanner.Statement{SQL: stmt}); err != nil {
4✔
NEW
262
                                        return err
×
NEW
263
                                }
×
264
                        }
265
                        return nil
2✔
266
                })
267
        if err != nil {
2✔
NEW
268
                return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
×
NEW
269
        }
×
270
        return nil
2✔
271
}
272

273
func (s *Spanner) runPartitionedDML(ctx context.Context, stmts []string, migr []byte) error {
2✔
274
        for _, stmt := range stmts {
4✔
275
                _, err := s.db.data.PartitionedUpdate(ctx, spanner.Statement{SQL: stmt})
2✔
276
                if err != nil {
2✔
NEW
277
                        return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
×
NEW
278
                }
×
279
        }
280
        return nil
2✔
281
}
282

283
// SetVersion updates the migration version in the migrations tracking table.
284
// It atomically deletes all existing records and inserts the new version.
285
func (s *Spanner) SetVersion(version int, dirty bool) error {
44✔
286
        ctx := context.Background()
44✔
287

44✔
288
        _, err := s.db.data.ReadWriteTransaction(ctx,
44✔
289
                func(_ context.Context, txn *spanner.ReadWriteTransaction) error {
88✔
290
                        m := []*spanner.Mutation{
44✔
291
                                spanner.Delete(s.config.MigrationsTable, spanner.AllKeys()),
44✔
292
                                spanner.Insert(s.config.MigrationsTable,
44✔
293
                                        []string{"Version", "Dirty"},
44✔
294
                                        []any{version, dirty},
44✔
295
                                )}
44✔
296
                        return txn.BufferWrite(m)
44✔
297
                })
44✔
298
        if err != nil {
44✔
299
                return &database.Error{OrigErr: err}
×
300
        }
×
301

302
        return nil
44✔
303
}
304

305
// Version returns the current migration version and dirty state.
306
// If no version has been set, it returns database.NilVersion.
307
func (s *Spanner) Version() (version int, dirty bool, err error) {
16✔
308
        ctx := context.Background()
16✔
309

16✔
310
        stmt := spanner.Statement{
16✔
311
                SQL: `SELECT Version, Dirty FROM ` + s.config.MigrationsTable + ` LIMIT 1`,
16✔
312
        }
16✔
313
        iter := s.db.data.Single().Query(ctx, stmt)
16✔
314
        defer iter.Stop()
16✔
315

16✔
316
        row, err := iter.Next()
16✔
317
        switch err {
16✔
318
        case iterator.Done:
4✔
319
                return database.NilVersion, false, nil
4✔
320
        case nil:
12✔
321
                var v int64
12✔
322
                if err = row.Columns(&v, &dirty); err != nil {
12✔
323
                        return 0, false, &database.Error{OrigErr: err, Query: []byte(stmt.SQL)}
×
324
                }
×
325
                version = int(v)
12✔
326
        default:
×
327
                return 0, false, &database.Error{OrigErr: err, Query: []byte(stmt.SQL)}
×
328
        }
329

330
        return version, dirty, nil
12✔
331
}
332

333
var nameMatcher = regexp.MustCompile(`(CREATE TABLE\s(\S+)\s)|(CREATE.+INDEX\s(\S+)\s)`)
334

335
// Drop removes all tables and indexes from the database by retrieving the
336
// current schema and generating DROP statements in reverse order.
337
// This reverse order ensures that dependent objects (like interleaved tables
338
// and indexes) are dropped before their parent tables.
339
func (s *Spanner) Drop() error {
4✔
340
        ctx := context.Background()
4✔
341
        res, err := s.db.admin.GetDatabaseDdl(ctx, &adminpb.GetDatabaseDdlRequest{
4✔
342
                Database: s.config.DatabaseName,
4✔
343
        })
4✔
344
        if err != nil {
4✔
345
                return &database.Error{OrigErr: err, Err: "drop failed"}
×
346
        }
×
347
        if len(res.Statements) == 0 {
4✔
348
                return nil
×
349
        }
×
350

351
        stmts := make([]string, 0)
4✔
352
        for i := len(res.Statements) - 1; i >= 0; i-- {
16✔
353
                s := res.Statements[i]
12✔
354
                m := nameMatcher.FindSubmatch([]byte(s))
12✔
355

12✔
356
                if len(m) == 0 {
12✔
357
                        continue
×
358
                } else if tbl := m[2]; len(tbl) > 0 {
24✔
359
                        stmts = append(stmts, fmt.Sprintf(`DROP TABLE %s`, tbl))
12✔
360
                } else if idx := m[4]; len(idx) > 0 {
12✔
361
                        stmts = append(stmts, fmt.Sprintf(`DROP INDEX %s`, idx))
×
362
                }
×
363
        }
364

365
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
4✔
366
                Database:   s.config.DatabaseName,
4✔
367
                Statements: stmts,
4✔
368
        })
4✔
369
        if err != nil {
4✔
370
                return &database.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))}
×
371
        }
×
372
        if err := op.Wait(ctx); err != nil {
4✔
373
                return &database.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))}
×
374
        }
×
375

376
        return nil
4✔
377
}
378

379
// ensureVersionTable checks if versions table exists and, if not, creates it.
380
// Note that this function locks the database, which deviates from the usual
381
// convention of "caller locks" in the Spanner type.
382
func (s *Spanner) ensureVersionTable() (err error) {
4✔
383
        if err = s.Lock(); err != nil {
4✔
384
                return err
×
385
        }
×
386

387
        defer func() {
8✔
388
                if e := s.Unlock(); e != nil {
4✔
389
                        err = errors.Join(err, e)
×
390
                }
×
391
        }()
392

393
        ctx := context.Background()
4✔
394
        tbl := s.config.MigrationsTable
4✔
395
        iter := s.db.data.Single().Read(ctx, tbl, spanner.AllKeys(), []string{"Version"})
4✔
396
        if err := iter.Do(func(_ *spanner.Row) error { return nil }); err == nil {
4✔
397
                return nil
×
398
        }
×
399

400
        stmt := fmt.Sprintf(`CREATE TABLE %s (
4✔
401
    Version INT64 NOT NULL,
4✔
402
    Dirty    BOOL NOT NULL
4✔
403
        ) PRIMARY KEY(Version)`, tbl)
4✔
404

4✔
405
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
4✔
406
                Database:   s.config.DatabaseName,
4✔
407
                Statements: []string{stmt},
4✔
408
        })
4✔
409

4✔
410
        if err != nil {
4✔
411
                return &database.Error{OrigErr: err, Query: []byte(stmt)}
×
412
        }
×
413
        if err := op.Wait(ctx); err != nil {
4✔
414
                return &database.Error{OrigErr: err, Query: []byte(stmt)}
×
415
        }
×
416

417
        return nil
4✔
418
}
419

420
// parseStatements attempts to parse migration content as DDL first, then as DML.
421
// Returns the parsed statements and their kind.
422
func parseStatements(migration []byte) ([]string, statementKind, error) {
54✔
423
        content := string(migration)
54✔
424
        if strings.TrimSpace(content) == "" {
58✔
425
                return nil, 0, ErrEmptyMigration
4✔
426
        }
4✔
427

428
        // Try parsing as DDL first
429
        ddl, ddlErr := spansql.ParseDDL("", content)
50✔
430
        if ddlErr == nil && len(ddl.List) > 0 {
74✔
431
                stmts := make([]string, 0, len(ddl.List))
24✔
432
                for _, stmt := range ddl.List {
50✔
433
                        stmts = append(stmts, stmt.SQL())
26✔
434
                }
26✔
435
                return stmts, statementKindDDL, nil
24✔
436
        }
437

438
        // Try parsing as DML
439
        dml, dmlErr := spansql.ParseDML("", content)
26✔
440
        if dmlErr == nil && len(dml.List) > 0 {
52✔
441
                stmts := make([]string, 0, len(dml.List))
26✔
442
                for _, stmt := range dml.List {
64✔
443
                        stmts = append(stmts, stmt.SQL())
38✔
444
                }
38✔
445
                kind, err := inspectDMLKind(dml.List)
26✔
446
                if err != nil {
30✔
447
                        return nil, 0, err
4✔
448
                }
4✔
449
                return stmts, kind, nil
22✔
450
        }
451

NEW
452
        if ddlErr != nil {
×
NEW
453
                return nil, 0, ddlErr
×
NEW
454
        }
×
NEW
455
        return nil, 0, dmlErr
×
456
}
457

458
// inspectDMLKind determines if DML statements are regular DML (INSERT) or
459
// partitioned DML (UPDATE, DELETE). Mixed statement types are not allowed.
460
func inspectDMLKind(stmts []spansql.DMLStmt) (statementKind, error) {
38✔
461
        if len(stmts) == 0 {
38✔
NEW
462
                return statementKindDDL, nil
×
NEW
463
        }
×
464

465
        var hasDML, hasPartitionedDML bool
38✔
466
        for _, stmt := range stmts {
94✔
467
                switch stmt.(type) {
56✔
468
                case *spansql.Insert:
24✔
469
                        hasDML = true
24✔
470
                case *spansql.Update, *spansql.Delete:
32✔
471
                        hasPartitionedDML = true
32✔
NEW
472
                default:
×
NEW
473
                        return 0, fmt.Errorf("%w: unknown DML statement type %T", ErrInvalidDMLStatementKind, stmt)
×
474
                }
475
        }
476

477
        switch {
38✔
478
        case hasDML && !hasPartitionedDML:
12✔
479
                return statementKindDML, nil
12✔
480
        case !hasDML && hasPartitionedDML:
20✔
481
                return statementKindPartitionedDML, nil
20✔
482
        default:
6✔
483
                return 0, ErrMixedStatements
6✔
484
        }
485
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc