• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

golang-migrate / migrate / 19912254661

03 Dec 2025 11:28PM UTC coverage: 54.624% (+0.2%) from 54.432%
19912254661

Pull #1197

github

jtwatson
Add empty lines at end of migration scripts.
Pull Request #1197: feature(spanner): Implement DML Support for Spanner

66 of 94 new or added lines in 1 file covered. (70.21%)

2 existing lines in 1 file now uncovered.

4430 of 8110 relevant lines covered (54.62%)

49.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.28
/database/spanner/spanner.go
1
package spanner
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "log"
9
        nurl "net/url"
10
        "regexp"
11
        "strconv"
12
        "strings"
13
        "sync/atomic"
14

15
        "cloud.google.com/go/spanner"
16
        sdb "cloud.google.com/go/spanner/admin/database/apiv1"
17

18
        "github.com/cloudspannerecosystem/memefish"
19
        "github.com/cloudspannerecosystem/memefish/token"
20
        "github.com/golang-migrate/migrate/v4"
21
        "github.com/golang-migrate/migrate/v4/database"
22

23
        adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
24
        "google.golang.org/api/iterator"
25
)
26

27
func init() {
2✔
28
        db := Spanner{}
2✔
29
        database.Register("spanner", &db)
2✔
30
}
2✔
31

32
// DefaultMigrationsTable is used if no custom table is specified
33
const DefaultMigrationsTable = "SchemaMigrations"
34

35
// Driver errors
36
var (
37
        ErrNilConfig      = errors.New("no config")
38
        ErrNoDatabaseName = errors.New("no database name")
39
        ErrNoSchema       = errors.New("no schema")
40
        ErrDatabaseDirty  = errors.New("database is dirty")
41
        ErrLockHeld       = errors.New("unable to obtain lock")
42
        ErrLockNotHeld    = errors.New("unable to release already released lock")
43
)
44

45
// Config used for a Spanner instance
46
type Config struct {
47
        MigrationsTable string
48
        DatabaseName    string
49
        // Whether to parse the migration DDL with spansql before
50
        // running them towards Spanner.
51
        // Parsing outputs clean DDL statements such as reformatted
52
        // and void of comments.
53
        CleanStatements bool
54
}
55

56
// Spanner implements database.Driver for Google Cloud Spanner
57
type Spanner struct {
58
        db     *DB
59
        config *Config
60
        lock   atomic.Bool
61
}
62

63
type DB struct {
64
        admin  *sdb.DatabaseAdminClient
65
        data   *spanner.Client
66
        shared bool
67
}
68

69
func NewDB(admin sdb.DatabaseAdminClient, data spanner.Client) *DB {
×
70
        return &DB{
×
NEW
71
                admin:  &admin,
×
NEW
72
                data:   &data,
×
NEW
73
                shared: true,
×
74
        }
×
75
}
×
76

77
// WithInstance implements database.Driver
78
func WithInstance(instance *DB, config *Config) (database.Driver, error) {
8✔
79
        if config == nil {
8✔
80
                return nil, ErrNilConfig
×
81
        }
×
82

83
        if len(config.DatabaseName) == 0 {
8✔
84
                return nil, ErrNoDatabaseName
×
85
        }
×
86

87
        if len(config.MigrationsTable) == 0 {
16✔
88
                config.MigrationsTable = DefaultMigrationsTable
8✔
89
        }
8✔
90

91
        sx := &Spanner{
8✔
92
                db:     instance,
8✔
93
                config: config,
8✔
94
        }
8✔
95

8✔
96
        if err := sx.ensureVersionTable(); err != nil {
8✔
97
                return nil, err
×
98
        }
×
99

100
        return sx, nil
8✔
101
}
102

103
// Open implements database.Driver
104
func (s *Spanner) Open(url string) (database.Driver, error) {
8✔
105
        purl, err := nurl.Parse(url)
8✔
106
        if err != nil {
8✔
107
                return nil, err
×
108
        }
×
109

110
        ctx := context.Background()
8✔
111

8✔
112
        adminClient, err := sdb.NewDatabaseAdminClient(ctx)
8✔
113
        if err != nil {
8✔
114
                return nil, err
×
115
        }
×
116
        dbname := strings.Replace(migrate.FilterCustomQuery(purl).String(), "spanner://", "", 1)
8✔
117
        dataClient, err := spanner.NewClient(ctx, dbname)
8✔
118
        if err != nil {
8✔
119
                log.Fatal(err)
×
120
        }
×
121

122
        migrationsTable := purl.Query().Get("x-migrations-table")
8✔
123

8✔
124
        cleanQuery := purl.Query().Get("x-clean-statements")
8✔
125
        clean := false
8✔
126
        if cleanQuery != "" {
12✔
127
                clean, err = strconv.ParseBool(cleanQuery)
4✔
128
                if err != nil {
4✔
129
                        return nil, err
×
130
                }
×
131
        }
132

133
        db := &DB{admin: adminClient, data: dataClient}
8✔
134
        return WithInstance(db, &Config{
8✔
135
                DatabaseName:    dbname,
8✔
136
                MigrationsTable: migrationsTable,
8✔
137
                CleanStatements: clean,
8✔
138
        })
8✔
139
}
140

141
// Close implements database.Driver
142
func (s *Spanner) Close() error {
×
NEW
143
        if s.db.shared {
×
NEW
144
                return nil
×
NEW
145
        }
×
146
        s.db.data.Close()
×
147
        return s.db.admin.Close()
×
148
}
149

150
// Lock implements database.Driver but doesn't do anything because Spanner only
151
// enqueues the UpdateDatabaseDdlRequest.
152
func (s *Spanner) Lock() error {
28✔
153
        if swapped := s.lock.CompareAndSwap(false, true); swapped {
52✔
154
                return nil
24✔
155
        }
24✔
156
        return ErrLockHeld
4✔
157
}
158

159
// Unlock implements database.Driver but no action required, see Lock.
160
func (s *Spanner) Unlock() error {
24✔
161
        if swapped := s.lock.CompareAndSwap(true, false); swapped {
48✔
162
                return nil
24✔
163
        }
24✔
164
        return ErrLockNotHeld
×
165
}
166

167
// Run implements database.Driver
168
func (s *Spanner) Run(migration io.Reader) error {
18✔
169
        migr, err := io.ReadAll(migration)
18✔
170
        if err != nil {
18✔
171
                return err
×
172
        }
×
173

174
        ctx := context.Background()
18✔
175

18✔
176
        if !s.config.CleanStatements {
30✔
177
                return s.runDdl(ctx, []string{string(migr)})
12✔
178
        }
12✔
179

180
        stmtGroups, err := statementGroups(migr)
6✔
181
        if err != nil {
6✔
NEW
182
                return err
×
NEW
183
        }
×
184

185
        for _, group := range stmtGroups {
16✔
186
                switch group.typ {
10✔
187
                case statementTypeDDL:
6✔
188
                        if err := s.runDdl(ctx, group.stmts); err != nil {
6✔
NEW
189
                                return err
×
NEW
190
                        }
×
191
                case statementTypeDML:
4✔
192
                        if err := s.runDml(ctx, group.stmts); err != nil {
4✔
NEW
193
                                return err
×
NEW
194
                        }
×
NEW
195
                default:
×
NEW
196
                        return fmt.Errorf("unknown statement type: %s", group.typ)
×
197
                }
198
        }
199

200
        return nil
6✔
201
}
202

203
func (s *Spanner) runDdl(ctx context.Context, stmts []string) error {
18✔
204
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
18✔
205
                Database:   s.config.DatabaseName,
18✔
206
                Statements: stmts,
18✔
207
        })
18✔
208
        if err != nil {
18✔
NEW
209
                return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(strings.Join(stmts, ";\n"))}
×
210
        }
×
211

212
        if err := op.Wait(ctx); err != nil {
18✔
NEW
213
                return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(strings.Join(stmts, ";\n"))}
×
NEW
214
        }
×
215

216
        return nil
18✔
217
}
218

219
func (s *Spanner) runDml(ctx context.Context, stmts []string) error {
4✔
220
        _, err := s.db.data.ReadWriteTransaction(ctx,
4✔
221
                func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
8✔
222
                        for _, s := range stmts {
8✔
223
                                _, err := txn.Update(ctx, spanner.Statement{SQL: s})
4✔
224
                                if err != nil {
4✔
NEW
225
                                        return err
×
NEW
226
                                }
×
227
                        }
228
                        return nil
4✔
229
                })
230
        if err != nil {
4✔
NEW
231
                return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(strings.Join(stmts, ";\n"))}
×
UNCOV
232
        }
×
233

234
        return nil
4✔
235
}
236

237
// SetVersion implements database.Driver
238
func (s *Spanner) SetVersion(version int, dirty bool) error {
52✔
239
        ctx := context.Background()
52✔
240

52✔
241
        _, err := s.db.data.ReadWriteTransaction(ctx,
52✔
242
                func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
104✔
243
                        m := []*spanner.Mutation{
52✔
244
                                spanner.Delete(s.config.MigrationsTable, spanner.AllKeys()),
52✔
245
                                spanner.Insert(s.config.MigrationsTable,
52✔
246
                                        []string{"Version", "Dirty"},
52✔
247
                                        []interface{}{version, dirty},
52✔
248
                                ),
52✔
249
                        }
52✔
250
                        return txn.BufferWrite(m)
52✔
251
                })
52✔
252
        if err != nil {
52✔
253
                return &database.Error{OrigErr: err}
×
254
        }
×
255

256
        return nil
52✔
257
}
258

259
// Version implements database.Driver
260
func (s *Spanner) Version() (version int, dirty bool, err error) {
32✔
261
        ctx := context.Background()
32✔
262

32✔
263
        stmt := spanner.Statement{
32✔
264
                SQL: `SELECT Version, Dirty FROM ` + s.config.MigrationsTable + ` LIMIT 1`,
32✔
265
        }
32✔
266
        iter := s.db.data.Single().Query(ctx, stmt)
32✔
267
        defer iter.Stop()
32✔
268

32✔
269
        row, err := iter.Next()
32✔
270
        switch err {
32✔
271
        case iterator.Done:
8✔
272
                return database.NilVersion, false, nil
8✔
273
        case nil:
24✔
274
                var v int64
24✔
275
                if err = row.Columns(&v, &dirty); err != nil {
24✔
276
                        return 0, false, &database.Error{OrigErr: err, Query: []byte(stmt.SQL)}
×
277
                }
×
278
                version = int(v)
24✔
279
        default:
×
280
                return 0, false, &database.Error{OrigErr: err, Query: []byte(stmt.SQL)}
×
281
        }
282

283
        return version, dirty, nil
24✔
284
}
285

286
var nameMatcher = regexp.MustCompile(`(CREATE TABLE\s(\S+)\s)|(CREATE.+INDEX\s(\S+)\s)`)
287

288
// Drop implements database.Driver. Retrieves the database schema first and
289
// creates statements to drop the indexes and tables accordingly.
290
// Note: The drop statements are created in reverse order to how they're
291
// provided in the schema. Assuming the schema describes how the database can
292
// be "build up", it seems logical to "unbuild" the database simply by going the
293
// opposite direction. More testing
294
func (s *Spanner) Drop() error {
8✔
295
        ctx := context.Background()
8✔
296
        res, err := s.db.admin.GetDatabaseDdl(ctx, &adminpb.GetDatabaseDdlRequest{
8✔
297
                Database: s.config.DatabaseName,
8✔
298
        })
8✔
299
        if err != nil {
8✔
300
                return &database.Error{OrigErr: err, Err: "drop failed"}
×
301
        }
×
302
        if len(res.Statements) == 0 {
8✔
303
                return nil
×
304
        }
×
305

306
        stmts := make([]string, 0)
8✔
307
        for i := len(res.Statements) - 1; i >= 0; i-- {
28✔
308
                s := res.Statements[i]
20✔
309
                m := nameMatcher.FindSubmatch([]byte(s))
20✔
310

20✔
311
                if len(m) == 0 {
20✔
312
                        continue
×
313
                } else if tbl := m[2]; len(tbl) > 0 {
40✔
314
                        stmts = append(stmts, fmt.Sprintf(`DROP TABLE %s`, tbl))
20✔
315
                } else if idx := m[4]; len(idx) > 0 {
20✔
316
                        stmts = append(stmts, fmt.Sprintf(`DROP INDEX %s`, idx))
×
317
                }
×
318
        }
319

320
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
8✔
321
                Database:   s.config.DatabaseName,
8✔
322
                Statements: stmts,
8✔
323
        })
8✔
324
        if err != nil {
8✔
325
                return &database.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))}
×
326
        }
×
327
        if err := op.Wait(ctx); err != nil {
8✔
328
                return &database.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))}
×
329
        }
×
330

331
        return nil
8✔
332
}
333

334
// ensureVersionTable checks if versions table exists and, if not, creates it.
335
// Note that this function locks the database, which deviates from the usual
336
// convention of "caller locks" in the Spanner type.
337
func (s *Spanner) ensureVersionTable() (err error) {
8✔
338
        if err = s.Lock(); err != nil {
8✔
339
                return err
×
340
        }
×
341

342
        defer func() {
16✔
343
                if e := s.Unlock(); e != nil {
8✔
344
                        err = errors.Join(err, e)
×
345
                }
×
346
        }()
347

348
        ctx := context.Background()
8✔
349
        tbl := s.config.MigrationsTable
8✔
350
        iter := s.db.data.Single().Read(ctx, tbl, spanner.AllKeys(), []string{"Version"})
8✔
351
        if err := iter.Do(func(r *spanner.Row) error { return nil }); err == nil {
8✔
352
                return nil
×
353
        }
×
354

355
        stmt := fmt.Sprintf(`CREATE TABLE %s (
8✔
356
    Version INT64 NOT NULL,
8✔
357
    Dirty    BOOL NOT NULL
8✔
358
        ) PRIMARY KEY(Version)`, tbl)
8✔
359

8✔
360
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
8✔
361
                Database:   s.config.DatabaseName,
8✔
362
                Statements: []string{stmt},
8✔
363
        })
8✔
364
        if err != nil {
8✔
UNCOV
365
                return &database.Error{OrigErr: err, Query: []byte(stmt)}
×
366
        }
×
367
        if err := op.Wait(ctx); err != nil {
8✔
368
                return &database.Error{OrigErr: err, Query: []byte(stmt)}
×
369
        }
×
370

371
        return nil
8✔
372
}
373

374
type statementType string
375

376
const (
377
        statementTypeUnknown statementType = ""
378
        statementTypeDDL     statementType = "DDL"
379
        statementTypeDML     statementType = "DML"
380
)
381

382
type statementGroup struct {
383
        typ   statementType
384
        stmts []string
385
}
386

387
func statementGroups(migr []byte) (groups []*statementGroup, err error) {
40✔
388
        lex := &memefish.Lexer{
40✔
389
                File: &token.File{Buffer: string(migr)},
40✔
390
        }
40✔
391

40✔
392
        group := &statementGroup{}
40✔
393
        var stmtTyp statementType
40✔
394
        var stmt strings.Builder
40✔
395
        for {
1,016✔
396
                if err := lex.NextToken(); err != nil {
976✔
NEW
397
                        return nil, err
×
NEW
398
                }
×
399

400
                if stmtTyp == statementTypeUnknown {
1,064✔
401
                        switch {
88✔
402
                        case lex.Token.IsKeywordLike("INSERT") || lex.Token.IsKeywordLike("DELETE") || lex.Token.IsKeywordLike("UPDATE"):
8✔
403
                                stmtTyp = statementTypeDML
8✔
404
                        default:
80✔
405
                                stmtTyp = statementTypeDDL
80✔
406
                        }
407
                        if group.typ != stmtTyp {
142✔
408
                                if len(group.stmts) > 0 {
68✔
409
                                        groups = append(groups, group)
14✔
410
                                }
14✔
411
                                group = &statementGroup{typ: stmtTyp}
54✔
412
                        }
413
                }
414

415
                if lex.Token.Kind == token.TokenEOF || lex.Token.Kind == ";" {
1,064✔
416
                        if stmt.Len() > 0 {
148✔
417
                                group.stmts = append(group.stmts, stmt.String())
60✔
418
                        }
60✔
419
                        stmtTyp = statementTypeUnknown
88✔
420
                        stmt.Reset()
88✔
421

88✔
422
                        if lex.Token.Kind == token.TokenEOF {
128✔
423
                                if len(group.stmts) > 0 {
70✔
424
                                        groups = append(groups, group)
30✔
425
                                }
30✔
426

427
                                break
40✔
428
                        }
429

430
                        continue
48✔
431
                }
432

433
                if len(lex.Token.Comments) > 0 && strings.HasPrefix(lex.Token.Comments[0].Raw, "--") {
896✔
434
                        // standard comment Token consumes a \n, so we need to add it back
8✔
435
                        if _, err := stmt.WriteString("\n"); err != nil {
8✔
NEW
436
                                return nil, err
×
NEW
437
                        }
×
438
                }
439
                if stmt.Len() > 0 {
1,720✔
440
                        if _, err := stmt.WriteString(lex.Token.Space); err != nil {
832✔
NEW
441
                                return nil, err
×
NEW
442
                        }
×
443
                }
444
                if _, err := stmt.WriteString(lex.Token.Raw); err != nil {
888✔
NEW
445
                        return nil, err
×
NEW
446
                }
×
447
        }
448

449
        return groups, nil
40✔
450
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc