• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

golang-migrate / migrate / 16295271531

15 Jul 2025 01:55PM UTC coverage: 56.553% (+0.2%) from 56.314%
16295271531

Pull #1294

github

dsyers
lint
Pull Request #1294: Triggers

790 of 1325 new or added lines in 24 files covered. (59.62%)

4 existing lines in 4 files now uncovered.

5277 of 9331 relevant lines covered (56.55%)

55.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.68
/database/spanner/spanner.go
1
package spanner
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "log"
9
        nurl "net/url"
10
        "regexp"
11
        "strconv"
12
        "strings"
13

14
        "cloud.google.com/go/spanner"
15
        sdb "cloud.google.com/go/spanner/admin/database/apiv1"
16
        "cloud.google.com/go/spanner/spansql"
17

18
        "github.com/golang-migrate/migrate/v4"
19
        "github.com/golang-migrate/migrate/v4/database"
20

21
        adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
22
        "github.com/hashicorp/go-multierror"
23
        uatomic "go.uber.org/atomic"
24
        "google.golang.org/api/iterator"
25
)
26

27
func init() {
2✔
28
        db := Spanner{}
2✔
29
        database.Register("spanner", &db)
2✔
30
}
2✔
31

32
// DefaultMigrationsTable is used if no custom table is specified
33
const DefaultMigrationsTable = "SchemaMigrations"
34

35
const (
36
        unlockedVal = 0
37
        lockedVal   = 1
38
)
39

40
// Driver errors
41
var (
42
        ErrNilConfig      = errors.New("no config")
43
        ErrNoDatabaseName = errors.New("no database name")
44
        ErrNoSchema       = errors.New("no schema")
45
        ErrDatabaseDirty  = errors.New("database is dirty")
46
        ErrLockHeld       = errors.New("unable to obtain lock")
47
        ErrLockNotHeld    = errors.New("unable to release already released lock")
48
)
49

50
// Config used for a Spanner instance
51
type Config struct {
52
        MigrationsTable string
53
        DatabaseName    string
54
        // Whether to parse the migration DDL with spansql before
55
        // running them towards Spanner.
56
        // Parsing outputs clean DDL statements such as reformatted
57
        // and void of comments.
58
        CleanStatements bool
59

60
        Triggers map[string]func(response interface{}) error
61
}
62

63
// Spanner implements database.Driver for Google Cloud Spanner
64
type Spanner struct {
65
        db *DB
66

67
        config *Config
68

69
        lock *uatomic.Uint32
70
}
71

72
type DB struct {
73
        admin *sdb.DatabaseAdminClient
74
        data  *spanner.Client
75
}
76

77
type TriggerResponse struct {
78
        Driver  *Spanner
79
        Config  *Config
80
        Trigger string
81
        Detail  interface{}
82
}
83

84
func NewDB(admin sdb.DatabaseAdminClient, data spanner.Client) *DB {
×
85
        return &DB{
×
86
                admin: &admin,
×
87
                data:  &data,
×
88
        }
×
89
}
×
90

91
// WithInstance implements database.Driver
92
func WithInstance(instance *DB, config *Config) (database.Driver, error) {
4✔
93
        if config == nil {
4✔
94
                return nil, ErrNilConfig
×
95
        }
×
96

97
        if len(config.DatabaseName) == 0 {
4✔
98
                return nil, ErrNoDatabaseName
×
99
        }
×
100

101
        if len(config.MigrationsTable) == 0 {
8✔
102
                config.MigrationsTable = DefaultMigrationsTable
4✔
103
        }
4✔
104

105
        sx := &Spanner{
4✔
106
                db:     instance,
4✔
107
                config: config,
4✔
108
                lock:   uatomic.NewUint32(unlockedVal),
4✔
109
        }
4✔
110

4✔
111
        if err := sx.ensureVersionTable(); err != nil {
4✔
112
                return nil, err
×
113
        }
×
114

115
        return sx, nil
4✔
116
}
117

118
// Open implements database.Driver
119
func (s *Spanner) Open(url string) (database.Driver, error) {
4✔
120
        purl, err := nurl.Parse(url)
4✔
121
        if err != nil {
4✔
122
                return nil, err
×
123
        }
×
124

125
        ctx := context.Background()
4✔
126

4✔
127
        adminClient, err := sdb.NewDatabaseAdminClient(ctx)
4✔
128
        if err != nil {
4✔
129
                return nil, err
×
130
        }
×
131
        dbname := strings.Replace(migrate.FilterCustomQuery(purl).String(), "spanner://", "", 1)
4✔
132
        dataClient, err := spanner.NewClient(ctx, dbname)
4✔
133
        if err != nil {
4✔
134
                log.Fatal(err)
×
135
        }
×
136

137
        migrationsTable := purl.Query().Get("x-migrations-table")
4✔
138

4✔
139
        cleanQuery := purl.Query().Get("x-clean-statements")
4✔
140
        clean := false
4✔
141
        if cleanQuery != "" {
4✔
142
                clean, err = strconv.ParseBool(cleanQuery)
×
143
                if err != nil {
×
144
                        return nil, err
×
145
                }
×
146
        }
147

148
        db := &DB{admin: adminClient, data: dataClient}
4✔
149
        return WithInstance(db, &Config{
4✔
150
                DatabaseName:    dbname,
4✔
151
                MigrationsTable: migrationsTable,
4✔
152
                CleanStatements: clean,
4✔
153
        })
4✔
154
}
155

156
// Close implements database.Driver
157
func (s *Spanner) Close() error {
×
158
        s.db.data.Close()
×
159
        return s.db.admin.Close()
×
160
}
×
161

162
func (s *Spanner) AddTriggers(t map[string]func(response interface{}) error) {
2✔
163
        s.config.Triggers = t
2✔
164
}
2✔
165

166
func (s *Spanner) Trigger(name string, detail interface{}) error {
96✔
167
        if s.config.Triggers == nil {
132✔
168
                return nil
36✔
169
        }
36✔
170

171
        if trigger, ok := s.config.Triggers[name]; ok {
80✔
172
                return trigger(TriggerResponse{
20✔
173
                        Driver:  s,
20✔
174
                        Config:  s.config,
20✔
175
                        Trigger: name,
20✔
176
                        Detail:  detail,
20✔
177
                })
20✔
178
        }
20✔
179

180
        return nil
40✔
181
}
182

183
// Lock implements database.Driver but doesn't do anything because Spanner only
184
// enqueues the UpdateDatabaseDdlRequest.
185
func (s *Spanner) Lock() error {
14✔
186
        if swapped := s.lock.CAS(unlockedVal, lockedVal); swapped {
26✔
187
                return nil
12✔
188
        }
12✔
189
        return ErrLockHeld
2✔
190
}
191

192
// Unlock implements database.Driver but no action required, see Lock.
193
func (s *Spanner) Unlock() error {
12✔
194
        if swapped := s.lock.CAS(lockedVal, unlockedVal); swapped {
24✔
195
                return nil
12✔
196
        }
12✔
197
        return ErrLockNotHeld
×
198
}
199

200
// Run implements database.Driver
201
func (s *Spanner) Run(migration io.Reader) error {
12✔
202
        migr, err := io.ReadAll(migration)
12✔
203
        if err != nil {
12✔
204
                return err
×
205
        }
×
206

207
        if err := s.Trigger(database.TrigRunPre, struct {
12✔
208
                Query string
12✔
209
        }{Query: string(migr)}); err != nil {
12✔
NEW
210
                return &database.Error{OrigErr: err, Err: "failed to trigger RunPre"}
×
NEW
211
        }
×
212

213
        stmts := []string{string(migr)}
12✔
214
        if s.config.CleanStatements {
12✔
215
                stmts, err = cleanStatements(migr)
×
216
                if err != nil {
×
217
                        return err
×
218
                }
×
219
        }
220

221
        ctx := context.Background()
12✔
222
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
12✔
223
                Database:   s.config.DatabaseName,
12✔
224
                Statements: stmts,
12✔
225
        })
12✔
226

12✔
227
        if err != nil {
12✔
228
                return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
×
229
        }
×
230

231
        if err := op.Wait(ctx); err != nil {
12✔
232
                return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
×
233
        }
×
234

235
        if err := s.Trigger(database.TrigRunPost, struct {
12✔
236
                Query string
12✔
237
        }{Query: string(migr)}); err != nil {
12✔
NEW
238
                return &database.Error{OrigErr: err, Err: "failed to trigger RunPost"}
×
NEW
239
        }
×
240

241
        return nil
12✔
242
}
243

244
// SetVersion implements database.Driver
245
func (s *Spanner) SetVersion(version int, dirty bool) error {
32✔
246
        ctx := context.Background()
32✔
247

32✔
248
        if err := s.Trigger(database.TrigSetVersionPre, struct {
32✔
249
                Version int
32✔
250
                Dirty   bool
32✔
251
        }{Version: version, Dirty: dirty}); err != nil {
32✔
NEW
252
                return &database.Error{OrigErr: err, Err: "failed to trigger SetVersionPre"}
×
NEW
253
        }
×
254

255
        _, err := s.db.data.ReadWriteTransaction(ctx,
32✔
256
                func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
64✔
257
                        m := []*spanner.Mutation{
32✔
258
                                spanner.Delete(s.config.MigrationsTable, spanner.AllKeys()),
32✔
259
                                spanner.Insert(s.config.MigrationsTable,
32✔
260
                                        []string{"Version", "Dirty"},
32✔
261
                                        []interface{}{version, dirty},
32✔
262
                                )}
32✔
263
                        return txn.BufferWrite(m)
32✔
264
                })
32✔
265
        if err != nil {
32✔
266
                return &database.Error{OrigErr: err}
×
267
        }
×
268

269
        if err := s.Trigger(database.TrigSetVersionPost, struct {
32✔
270
                Version int
32✔
271
                Dirty   bool
32✔
272
        }{Version: version, Dirty: dirty}); err != nil {
32✔
NEW
273
                return &database.Error{OrigErr: err, Err: "failed to trigger SetVersionPost"}
×
NEW
274
        }
×
275

276
        return nil
32✔
277
}
278

279
// Version implements database.Driver
280
func (s *Spanner) Version() (version int, dirty bool, err error) {
16✔
281
        ctx := context.Background()
16✔
282

16✔
283
        stmt := spanner.Statement{
16✔
284
                SQL: `SELECT Version, Dirty FROM ` + s.config.MigrationsTable + ` LIMIT 1`,
16✔
285
        }
16✔
286
        iter := s.db.data.Single().Query(ctx, stmt)
16✔
287
        defer iter.Stop()
16✔
288

16✔
289
        row, err := iter.Next()
16✔
290
        switch err {
16✔
291
        case iterator.Done:
4✔
292
                return database.NilVersion, false, nil
4✔
293
        case nil:
12✔
294
                var v int64
12✔
295
                if err = row.Columns(&v, &dirty); err != nil {
12✔
296
                        return 0, false, &database.Error{OrigErr: err, Query: []byte(stmt.SQL)}
×
297
                }
×
298
                version = int(v)
12✔
299
        default:
×
300
                return 0, false, &database.Error{OrigErr: err, Query: []byte(stmt.SQL)}
×
301
        }
302

303
        return version, dirty, nil
12✔
304
}
305

306
var nameMatcher = regexp.MustCompile(`(CREATE TABLE\s(\S+)\s)|(CREATE.+INDEX\s(\S+)\s)`)
307

308
// Drop implements database.Driver. Retrieves the database schema first and
309
// creates statements to drop the indexes and tables accordingly.
310
// Note: The drop statements are created in reverse order to how they're
311
// provided in the schema. Assuming the schema describes how the database can
312
// be "build up", it seems logical to "unbuild" the database simply by going the
313
// opposite direction. More testing
314
func (s *Spanner) Drop() error {
4✔
315
        ctx := context.Background()
4✔
316
        res, err := s.db.admin.GetDatabaseDdl(ctx, &adminpb.GetDatabaseDdlRequest{
4✔
317
                Database: s.config.DatabaseName,
4✔
318
        })
4✔
319
        if err != nil {
4✔
320
                return &database.Error{OrigErr: err, Err: "drop failed"}
×
321
        }
×
322
        if len(res.Statements) == 0 {
4✔
323
                return nil
×
324
        }
×
325

326
        stmts := make([]string, 0)
4✔
327
        for i := len(res.Statements) - 1; i >= 0; i-- {
16✔
328
                s := res.Statements[i]
12✔
329
                m := nameMatcher.FindSubmatch([]byte(s))
12✔
330

12✔
331
                if len(m) == 0 {
12✔
332
                        continue
×
333
                } else if tbl := m[2]; len(tbl) > 0 {
24✔
334
                        stmts = append(stmts, fmt.Sprintf(`DROP TABLE %s`, tbl))
12✔
335
                } else if idx := m[4]; len(idx) > 0 {
12✔
336
                        stmts = append(stmts, fmt.Sprintf(`DROP INDEX %s`, idx))
×
337
                }
×
338
        }
339

340
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
4✔
341
                Database:   s.config.DatabaseName,
4✔
342
                Statements: stmts,
4✔
343
        })
4✔
344
        if err != nil {
4✔
345
                return &database.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))}
×
346
        }
×
347
        if err := op.Wait(ctx); err != nil {
4✔
348
                return &database.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))}
×
349
        }
×
350

351
        return nil
4✔
352
}
353

354
// ensureVersionTable checks if versions table exists and, if not, creates it.
355
// Note that this function locks the database, which deviates from the usual
356
// convention of "caller locks" in the Spanner type.
357
func (s *Spanner) ensureVersionTable() (err error) {
4✔
358
        if err = s.Lock(); err != nil {
4✔
359
                return err
×
360
        }
×
361

362
        defer func() {
8✔
363
                if e := s.Unlock(); e != nil {
4✔
364
                        if err == nil {
×
365
                                err = e
×
366
                        } else {
×
367
                                err = multierror.Append(err, e)
×
368
                        }
×
369
                }
370
        }()
371

372
        ctx := context.Background()
4✔
373
        tbl := s.config.MigrationsTable
4✔
374
        iter := s.db.data.Single().Read(ctx, tbl, spanner.AllKeys(), []string{"Version"})
4✔
375
        if err := iter.Do(func(r *spanner.Row) error { return nil }); err == nil {
4✔
NEW
376
                if err := s.Trigger(database.TrigVersionTableExists, nil); err != nil {
×
NEW
377
                        return &database.Error{OrigErr: err, Err: "failed to trigger VersionTableExists"}
×
NEW
378
                }
×
379
                return nil
×
380
        }
381

382
        if err := s.Trigger(database.TrigVersionTablePre, nil); err != nil {
4✔
NEW
383
                return &database.Error{OrigErr: err, Err: "failed to trigger VersionTablePre"}
×
NEW
384
        }
×
385

386
        stmt := fmt.Sprintf(`CREATE TABLE %s (
4✔
387
    Version INT64 NOT NULL,
4✔
388
    Dirty    BOOL NOT NULL
4✔
389
        ) PRIMARY KEY(Version)`, tbl)
4✔
390

4✔
391
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
4✔
392
                Database:   s.config.DatabaseName,
4✔
393
                Statements: []string{stmt},
4✔
394
        })
4✔
395

4✔
396
        if err != nil {
4✔
397
                return &database.Error{OrigErr: err, Query: []byte(stmt)}
×
398
        }
×
399
        if err := op.Wait(ctx); err != nil {
4✔
400
                return &database.Error{OrigErr: err, Query: []byte(stmt)}
×
401
        }
×
402

403
        if err := s.Trigger(database.TrigVersionTablePost, nil); err != nil {
4✔
NEW
404
                return &database.Error{OrigErr: err, Err: "failed to trigger VersionTablePost"}
×
NEW
405
        }
×
406

407
        return nil
4✔
408
}
409

410
func cleanStatements(migration []byte) ([]string, error) {
22✔
411
        // The Spanner GCP backend does not yet support comments for the UpdateDatabaseDdl RPC
22✔
412
        // (see https://issuetracker.google.com/issues/159730604) we use
22✔
413
        // spansql to parse the DDL and output valid stamements without comments
22✔
414
        ddl, err := spansql.ParseDDL("", string(migration))
22✔
415
        if err != nil {
22✔
416
                return nil, err
×
417
        }
×
418
        stmts := make([]string, 0, len(ddl.List))
22✔
419
        for _, stmt := range ddl.List {
50✔
420
                stmts = append(stmts, stmt.SQL())
28✔
421
        }
28✔
422
        return stmts, nil
22✔
423
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc