• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

golang-migrate / migrate / 5186663860

pending completion
5186663860

Pull #932

github

longit644
Add Golang function as a source of migration
Pull Request #932: Add Golang function as a source of migration

252 of 370 new or added lines in 32 files covered. (68.11%)

4 existing lines in 4 files now uncovered.

4214 of 7231 relevant lines covered (58.28%)

61.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.57
/database/spanner/spanner.go
1
package spanner
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "io"
8
        "log"
9
        nurl "net/url"
10
        "regexp"
11
        "strconv"
12
        "strings"
13

14
        "cloud.google.com/go/spanner"
15
        sdb "cloud.google.com/go/spanner/admin/database/apiv1"
16
        "cloud.google.com/go/spanner/spansql"
17

18
        "github.com/golang-migrate/migrate/v4"
19
        "github.com/golang-migrate/migrate/v4/database"
20
        "github.com/golang-migrate/migrate/v4/source"
21

22
        adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
23
        "github.com/hashicorp/go-multierror"
24
        uatomic "go.uber.org/atomic"
25
        "google.golang.org/api/iterator"
26
)
27

28
func init() {
2✔
29
        db := Spanner{}
2✔
30
        database.Register("spanner", &db)
2✔
31
}
2✔
32

33
// DefaultMigrationsTable is used if no custom table is specified
34
const DefaultMigrationsTable = "SchemaMigrations"
35

36
const (
37
        unlockedVal = 0
38
        lockedVal   = 1
39
)
40

41
// Driver errors
42
var (
43
        ErrNilConfig      = errors.New("no config")
44
        ErrNoDatabaseName = errors.New("no database name")
45
        ErrNoSchema       = errors.New("no schema")
46
        ErrDatabaseDirty  = errors.New("database is dirty")
47
        ErrLockHeld       = errors.New("unable to obtain lock")
48
        ErrLockNotHeld    = errors.New("unable to release already released lock")
49
)
50

51
// Config used for a Spanner instance
52
type Config struct {
53
        MigrationsTable string
54
        DatabaseName    string
55
        // Whether to parse the migration DDL with spansql before
56
        // running them towards Spanner.
57
        // Parsing outputs clean DDL statements such as reformatted
58
        // and void of comments.
59
        CleanStatements bool
60
}
61

62
// Spanner implements database.Driver for Google Cloud Spanner
63
type Spanner struct {
64
        db *DB
65

66
        config *Config
67

68
        lock *uatomic.Uint32
69
}
70

71
type DB struct {
72
        admin *sdb.DatabaseAdminClient
73
        data  *spanner.Client
74
}
75

76
func NewDB(admin sdb.DatabaseAdminClient, data spanner.Client) *DB {
×
77
        return &DB{
×
78
                admin: &admin,
×
79
                data:  &data,
×
80
        }
×
81
}
×
82

83
// WithInstance implements database.Driver
84
func WithInstance(instance *DB, config *Config) (database.Driver, error) {
4✔
85
        if config == nil {
4✔
86
                return nil, ErrNilConfig
×
87
        }
×
88

89
        if len(config.DatabaseName) == 0 {
4✔
90
                return nil, ErrNoDatabaseName
×
91
        }
×
92

93
        if len(config.MigrationsTable) == 0 {
8✔
94
                config.MigrationsTable = DefaultMigrationsTable
4✔
95
        }
4✔
96

97
        sx := &Spanner{
4✔
98
                db:     instance,
4✔
99
                config: config,
4✔
100
                lock:   uatomic.NewUint32(unlockedVal),
4✔
101
        }
4✔
102

4✔
103
        if err := sx.ensureVersionTable(); err != nil {
4✔
104
                return nil, err
×
105
        }
×
106

107
        return sx, nil
4✔
108
}
109

110
// Open implements database.Driver
111
func (s *Spanner) Open(url string) (database.Driver, error) {
4✔
112
        purl, err := nurl.Parse(url)
4✔
113
        if err != nil {
4✔
114
                return nil, err
×
115
        }
×
116

117
        ctx := context.Background()
4✔
118

4✔
119
        adminClient, err := sdb.NewDatabaseAdminClient(ctx)
4✔
120
        if err != nil {
4✔
121
                return nil, err
×
122
        }
×
123
        dbname := strings.Replace(migrate.FilterCustomQuery(purl).String(), "spanner://", "", 1)
4✔
124
        dataClient, err := spanner.NewClient(ctx, dbname)
4✔
125
        if err != nil {
4✔
126
                log.Fatal(err)
×
127
        }
×
128

129
        migrationsTable := purl.Query().Get("x-migrations-table")
4✔
130

4✔
131
        cleanQuery := purl.Query().Get("x-clean-statements")
4✔
132
        clean := false
4✔
133
        if cleanQuery != "" {
4✔
134
                clean, err = strconv.ParseBool(cleanQuery)
×
135
                if err != nil {
×
136
                        return nil, err
×
137
                }
×
138
        }
139

140
        db := &DB{admin: adminClient, data: dataClient}
4✔
141
        return WithInstance(db, &Config{
4✔
142
                DatabaseName:    dbname,
4✔
143
                MigrationsTable: migrationsTable,
4✔
144
                CleanStatements: clean,
4✔
145
        })
4✔
146
}
147

148
// Close implements database.Driver
149
func (s *Spanner) Close() error {
×
150
        s.db.data.Close()
×
151
        return s.db.admin.Close()
×
152
}
×
153

154
// Lock implements database.Driver but doesn't do anything because Spanner only
155
// enqueues the UpdateDatabaseDdlRequest.
156
func (s *Spanner) Lock() error {
14✔
157
        if swapped := s.lock.CAS(unlockedVal, lockedVal); swapped {
26✔
158
                return nil
12✔
159
        }
12✔
160
        return ErrLockHeld
2✔
161
}
162

163
// Unlock implements database.Driver but no action required, see Lock.
164
func (s *Spanner) Unlock() error {
12✔
165
        if swapped := s.lock.CAS(lockedVal, unlockedVal); swapped {
24✔
166
                return nil
12✔
167
        }
12✔
168
        return ErrLockNotHeld
×
169
}
170

171
// Run implements database.Driver
172
func (s *Spanner) Run(migration io.Reader) error {
12✔
173
        migr, err := io.ReadAll(migration)
12✔
174
        if err != nil {
12✔
175
                return err
×
176
        }
×
177

178
        stmts := []string{string(migr)}
12✔
179
        if s.config.CleanStatements {
12✔
180
                stmts, err = cleanStatements(migr)
×
181
                if err != nil {
×
182
                        return err
×
183
                }
×
184
        }
185

186
        ctx := context.Background()
12✔
187
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
12✔
188
                Database:   s.config.DatabaseName,
12✔
189
                Statements: stmts,
12✔
190
        })
12✔
191

12✔
192
        if err != nil {
12✔
193
                return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
×
194
        }
×
195

196
        if err := op.Wait(ctx); err != nil {
12✔
197
                return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
×
198
        }
×
199

200
        return nil
12✔
201
}
202

203
// SetVersion implements database.Driver
204
func (s *Spanner) SetVersion(version int, dirty bool) error {
32✔
205
        ctx := context.Background()
32✔
206

32✔
207
        _, err := s.db.data.ReadWriteTransaction(ctx,
32✔
208
                func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
64✔
209
                        m := []*spanner.Mutation{
32✔
210
                                spanner.Delete(s.config.MigrationsTable, spanner.AllKeys()),
32✔
211
                                spanner.Insert(s.config.MigrationsTable,
32✔
212
                                        []string{"Version", "Dirty"},
32✔
213
                                        []interface{}{version, dirty},
32✔
214
                                )}
32✔
215
                        return txn.BufferWrite(m)
32✔
216
                })
32✔
217
        if err != nil {
32✔
218
                return &database.Error{OrigErr: err}
×
219
        }
×
220

221
        return nil
32✔
222
}
223

224
// Version implements database.Driver
225
func (s *Spanner) Version() (version int, dirty bool, err error) {
16✔
226
        ctx := context.Background()
16✔
227

16✔
228
        stmt := spanner.Statement{
16✔
229
                SQL: `SELECT Version, Dirty FROM ` + s.config.MigrationsTable + ` LIMIT 1`,
16✔
230
        }
16✔
231
        iter := s.db.data.Single().Query(ctx, stmt)
16✔
232
        defer iter.Stop()
16✔
233

16✔
234
        row, err := iter.Next()
16✔
235
        switch err {
16✔
236
        case iterator.Done:
4✔
237
                return database.NilVersion, false, nil
4✔
238
        case nil:
12✔
239
                var v int64
12✔
240
                if err = row.Columns(&v, &dirty); err != nil {
12✔
241
                        return 0, false, &database.Error{OrigErr: err, Query: []byte(stmt.SQL)}
×
242
                }
×
243
                version = int(v)
12✔
244
        default:
×
245
                return 0, false, &database.Error{OrigErr: err, Query: []byte(stmt.SQL)}
×
246
        }
247

248
        return version, dirty, nil
12✔
249
}
250

251
var nameMatcher = regexp.MustCompile(`(CREATE TABLE\s(\S+)\s)|(CREATE.+INDEX\s(\S+)\s)`)
252

253
// Drop implements database.Driver. Retrieves the database schema first and
254
// creates statements to drop the indexes and tables accordingly.
255
// Note: The drop statements are created in reverse order to how they're
256
// provided in the schema. Assuming the schema describes how the database can
257
// be "build up", it seems logical to "unbuild" the database simply by going the
258
// opposite direction. More testing
259
func (s *Spanner) Drop() error {
4✔
260
        ctx := context.Background()
4✔
261
        res, err := s.db.admin.GetDatabaseDdl(ctx, &adminpb.GetDatabaseDdlRequest{
4✔
262
                Database: s.config.DatabaseName,
4✔
263
        })
4✔
264
        if err != nil {
4✔
265
                return &database.Error{OrigErr: err, Err: "drop failed"}
×
266
        }
×
267
        if len(res.Statements) == 0 {
4✔
268
                return nil
×
269
        }
×
270

271
        stmts := make([]string, 0)
4✔
272
        for i := len(res.Statements) - 1; i >= 0; i-- {
16✔
273
                s := res.Statements[i]
12✔
274
                m := nameMatcher.FindSubmatch([]byte(s))
12✔
275

12✔
276
                if len(m) == 0 {
12✔
277
                        continue
×
278
                } else if tbl := m[2]; len(tbl) > 0 {
24✔
279
                        stmts = append(stmts, fmt.Sprintf(`DROP TABLE %s`, tbl))
12✔
280
                } else if idx := m[4]; len(idx) > 0 {
12✔
281
                        stmts = append(stmts, fmt.Sprintf(`DROP INDEX %s`, idx))
×
282
                }
×
283
        }
284

285
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
4✔
286
                Database:   s.config.DatabaseName,
4✔
287
                Statements: stmts,
4✔
288
        })
4✔
289
        if err != nil {
4✔
290
                return &database.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))}
×
291
        }
×
292
        if err := op.Wait(ctx); err != nil {
4✔
293
                return &database.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))}
×
294
        }
×
295

296
        return nil
4✔
297
}
298

299
// Exec implements database.Driver. Executes a migration exectuor.
NEW
300
func (s *Spanner) Exec(e source.Executor) error {
×
NEW
301
        return e.Execute(s.db)
×
NEW
302
}
×
303

304
// ensureVersionTable checks if versions table exists and, if not, creates it.
305
// Note that this function locks the database, which deviates from the usual
306
// convention of "caller locks" in the Spanner type.
307
func (s *Spanner) ensureVersionTable() (err error) {
4✔
308
        if err = s.Lock(); err != nil {
4✔
309
                return err
×
310
        }
×
311

312
        defer func() {
8✔
313
                if e := s.Unlock(); e != nil {
4✔
314
                        if err == nil {
×
315
                                err = e
×
316
                        } else {
×
317
                                err = multierror.Append(err, e)
×
318
                        }
×
319
                }
320
        }()
321

322
        ctx := context.Background()
4✔
323
        tbl := s.config.MigrationsTable
4✔
324
        iter := s.db.data.Single().Read(ctx, tbl, spanner.AllKeys(), []string{"Version"})
4✔
325
        if err := iter.Do(func(r *spanner.Row) error { return nil }); err == nil {
4✔
326
                return nil
×
327
        }
×
328

329
        stmt := fmt.Sprintf(`CREATE TABLE %s (
4✔
330
    Version INT64 NOT NULL,
4✔
331
    Dirty    BOOL NOT NULL
4✔
332
        ) PRIMARY KEY(Version)`, tbl)
4✔
333

4✔
334
        op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
4✔
335
                Database:   s.config.DatabaseName,
4✔
336
                Statements: []string{stmt},
4✔
337
        })
4✔
338

4✔
339
        if err != nil {
4✔
340
                return &database.Error{OrigErr: err, Query: []byte(stmt)}
×
341
        }
×
342
        if err := op.Wait(ctx); err != nil {
4✔
343
                return &database.Error{OrigErr: err, Query: []byte(stmt)}
×
344
        }
×
345

346
        return nil
4✔
347
}
348

349
func cleanStatements(migration []byte) ([]string, error) {
22✔
350
        // The Spanner GCP backend does not yet support comments for the UpdateDatabaseDdl RPC
22✔
351
        // (see https://issuetracker.google.com/issues/159730604) we use
22✔
352
        // spansql to parse the DDL and output valid stamements without comments
22✔
353
        ddl, err := spansql.ParseDDL("", string(migration))
22✔
354
        if err != nil {
22✔
355
                return nil, err
×
356
        }
×
357
        stmts := make([]string, 0, len(ddl.List))
22✔
358
        for _, stmt := range ddl.List {
50✔
359
                stmts = append(stmts, stmt.SQL())
28✔
360
        }
28✔
361
        return stmts, nil
22✔
362
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc