• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

golang-migrate / migrate / 7271458240

20 Dec 2023 05:35AM UTC coverage: 59.051% (+0.5%) from 58.526%
7271458240

push

github

web-flow
Merge pull request #1007 from swensone/master

Add support for rqlite

153 of 196 new or added lines in 1 file covered. (78.06%)

4306 of 7292 relevant lines covered (59.05%)

62.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.06
/database/rqlite/rqlite.go
1
package rqlite
2

3
import (
4
        "fmt"
5
        "io"
6
        nurl "net/url"
7
        "strconv"
8
        "strings"
9

10
        "go.uber.org/atomic"
11

12
        "github.com/golang-migrate/migrate/v4"
13
        "github.com/golang-migrate/migrate/v4/database"
14
        "github.com/hashicorp/go-multierror"
15
        "github.com/pkg/errors"
16
        "github.com/rqlite/gorqlite"
17
)
18

19
func init() {
2✔
20
        database.Register("rqlite", &Rqlite{})
2✔
21
}
2✔
22

23
const (
24
        // DefaultMigrationsTable defines the default rqlite migrations table
25
        DefaultMigrationsTable = "schema_migrations"
26

27
        // DefaultConnectInsecure defines the default setting for connect insecure
28
        DefaultConnectInsecure = false
29
)
30

31
// ErrNilConfig is returned if no configuration was passed to WithInstance
32
var ErrNilConfig = fmt.Errorf("no config")
33

34
// ErrBadConfig is returned if configuration was invalid
35
var ErrBadConfig = fmt.Errorf("bad parameter")
36

37
// Config defines the driver configuration
38
type Config struct {
39
        // ConnectInsecure sets whether the connection uses TLS. Ineffectual when using WithInstance
40
        ConnectInsecure bool
41
        // MigrationsTable configures the migrations table name
42
        MigrationsTable string
43
}
44

45
type Rqlite struct {
46
        db       *gorqlite.Connection
47
        isLocked atomic.Bool
48

49
        config *Config
50
}
51

52
// WithInstance creates a rqlite database driver with an existing gorqlite database connection
53
// and a Config struct
54
func WithInstance(instance *gorqlite.Connection, config *Config) (database.Driver, error) {
24✔
55
        if config == nil {
32✔
56
                return nil, ErrNilConfig
8✔
57
        }
8✔
58

59
        // we use the consistency level check as a database ping
60
        if _, err := instance.ConsistencyLevel(); err != nil {
16✔
NEW
61
                return nil, err
×
NEW
62
        }
×
63

64
        if len(config.MigrationsTable) == 0 {
24✔
65
                config.MigrationsTable = DefaultMigrationsTable
8✔
66
        }
8✔
67

68
        driver := &Rqlite{
16✔
69
                db:     instance,
16✔
70
                config: config,
16✔
71
        }
16✔
72

16✔
73
        if err := driver.ensureVersionTable(); err != nil {
16✔
NEW
74
                return nil, err
×
NEW
75
        }
×
76

77
        return driver, nil
16✔
78
}
79

80
// OpenURL creates a rqlite database driver from a connect URL
81
func OpenURL(url string) (database.Driver, error) {
24✔
82
        d := &Rqlite{}
24✔
83
        return d.Open(url)
24✔
84
}
24✔
85

86
func (r *Rqlite) ensureVersionTable() (err error) {
32✔
87
        if err = r.Lock(); err != nil {
32✔
NEW
88
                return err
×
NEW
89
        }
×
90

91
        defer func() {
64✔
92
                if e := r.Unlock(); e != nil {
32✔
NEW
93
                        if err == nil {
×
NEW
94
                                err = e
×
NEW
95
                        } else {
×
NEW
96
                                err = multierror.Append(err, e)
×
NEW
97
                        }
×
98
                }
99
        }()
100

101
        stmts := []string{
32✔
102
                fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (version uint64, dirty bool)`, r.config.MigrationsTable),
32✔
103
                fmt.Sprintf(`CREATE UNIQUE INDEX IF NOT EXISTS version_unique ON %s (version)`, r.config.MigrationsTable),
32✔
104
        }
32✔
105

32✔
106
        if _, err := r.db.Write(stmts); err != nil {
32✔
NEW
107
                return err
×
NEW
108
        }
×
109

110
        return nil
32✔
111
}
112

113
// Open returns a new driver instance configured with parameters
114
// coming from the URL string. Migrate will call this function
115
// only once per instance.
116
func (r *Rqlite) Open(url string) (database.Driver, error) {
32✔
117
        dburl, config, err := parseUrl(url)
32✔
118
        if err != nil {
48✔
119
                return nil, err
16✔
120
        }
16✔
121
        r.config = config
16✔
122

16✔
123
        r.db, err = gorqlite.Open(dburl.String())
16✔
124
        if err != nil {
16✔
NEW
125
                return nil, err
×
NEW
126
        }
×
127

128
        if err := r.ensureVersionTable(); err != nil {
16✔
NEW
129
                return nil, err
×
NEW
130
        }
×
131

132
        return r, nil
16✔
133
}
134

135
// Close closes the underlying database instance managed by the driver.
136
// Migrate will call this function only once per instance.
137
func (r *Rqlite) Close() error {
24✔
138
        r.db.Close()
24✔
139
        return nil
24✔
140
}
24✔
141

142
// Lock should acquire a database lock so that only one migration process
143
// can run at a time. Migrate will call this function before Run is called.
144
// If the implementation can't provide this functionality, return nil.
145
// Return database.ErrLocked if database is already locked.
146
func (r *Rqlite) Lock() error {
104✔
147
        if !r.isLocked.CAS(false, true) {
112✔
148
                return database.ErrLocked
8✔
149
        }
8✔
150
        return nil
96✔
151
}
152

153
// Unlock should release the lock. Migrate will call this function after
154
// all migrations have been run.
155
func (r *Rqlite) Unlock() error {
96✔
156
        if !r.isLocked.CAS(true, false) {
96✔
NEW
157
                return database.ErrNotLocked
×
NEW
158
        }
×
159
        return nil
96✔
160
}
161

162
// Run applies a migration to the database. migration is guaranteed to be not nil.
163
func (r *Rqlite) Run(migration io.Reader) error {
88✔
164
        migr, err := io.ReadAll(migration)
88✔
165
        if err != nil {
88✔
NEW
166
                return err
×
NEW
167
        }
×
168

169
        query := string(migr[:])
88✔
170
        if _, err := r.db.WriteOne(query); err != nil {
88✔
NEW
171
                return &database.Error{OrigErr: err, Query: []byte(query)}
×
NEW
172
        }
×
173

174
        return nil
88✔
175
}
176

177
// SetVersion saves version and dirty state.
178
// Migrate will call this function before and after each call to Run.
179
// version must be >= -1. -1 means NilVersion.
180
func (r *Rqlite) SetVersion(version int, dirty bool) error {
208✔
181
        deleteQuery := fmt.Sprintf(`DELETE FROM %s`, r.config.MigrationsTable)
208✔
182
        statements := []gorqlite.ParameterizedStatement{
208✔
183
                {
208✔
184
                        Query: deleteQuery,
208✔
185
                },
208✔
186
        }
208✔
187

208✔
188
        // Also re-write the schema version for nil dirty versions to prevent
208✔
189
        // empty schema version for failed down migration on the first migration
208✔
190
        // See: https://github.com/golang-migrate/migrate/issues/330
208✔
191
        insertQuery := fmt.Sprintf(`INSERT INTO %s (version, dirty) VALUES (?, ?)`, r.config.MigrationsTable)
208✔
192
        if version >= 0 || (version == database.NilVersion && dirty) {
392✔
193
                statements = append(statements, gorqlite.ParameterizedStatement{
184✔
194
                        Query: insertQuery,
184✔
195
                        Arguments: []interface{}{
184✔
196
                                version,
184✔
197
                                dirty,
184✔
198
                        },
184✔
199
                })
184✔
200
        }
184✔
201

202
        wr, err := r.db.WriteParameterized(statements)
208✔
203
        if err != nil {
208✔
NEW
204
                for i, res := range wr {
×
NEW
205
                        if res.Err != nil {
×
NEW
206
                                return &database.Error{OrigErr: err, Query: []byte(statements[i].Query)}
×
NEW
207
                        }
×
208
                }
209

210
                // if somehow we're still here, return the original error with combined queries
NEW
211
                return &database.Error{OrigErr: err, Query: []byte(deleteQuery + "\n" + insertQuery)}
×
212
        }
213

214
        return nil
208✔
215
}
216

217
// Version returns the currently active version and if the database is dirty.
218
// When no migration has been applied, it must return version -1.
219
// Dirty means, a previous migration failed and user interaction is required.
220
func (r *Rqlite) Version() (version int, dirty bool, err error) {
96✔
221
        query := "SELECT version, dirty FROM " + r.config.MigrationsTable + " LIMIT 1"
96✔
222

96✔
223
        qr, err := r.db.QueryOne(query)
96✔
224
        if err != nil {
96✔
NEW
225
                return database.NilVersion, false, nil
×
NEW
226
        }
×
227

228
        if !qr.Next() {
136✔
229
                return database.NilVersion, false, nil
40✔
230
        }
40✔
231

232
        if err := qr.Scan(&version, &dirty); err != nil {
56✔
NEW
233
                return database.NilVersion, false, &database.Error{OrigErr: err, Query: []byte(query)}
×
NEW
234
        }
×
235

236
        return version, dirty, nil
56✔
237
}
238

239
// Drop deletes everything in the database.
240
// Note that this is a breaking action, a new call to Open() is necessary to
241
// ensure subsequent calls work as expected.
242
func (r *Rqlite) Drop() error {
16✔
243
        query := `SELECT name FROM sqlite_master WHERE type = 'table'`
16✔
244

16✔
245
        tables, err := r.db.QueryOne(query)
16✔
246
        if err != nil {
16✔
NEW
247
                return &database.Error{OrigErr: err, Query: []byte(query)}
×
NEW
248
        }
×
249

250
        statements := make([]string, 0)
16✔
251
        for tables.Next() {
48✔
252
                var tableName string
32✔
253
                if err := tables.Scan(&tableName); err != nil {
32✔
NEW
254
                        return err
×
NEW
255
                }
×
256

257
                if len(tableName) > 0 {
64✔
258
                        statement := fmt.Sprintf(`DROP TABLE %s`, tableName)
32✔
259
                        statements = append(statements, statement)
32✔
260
                }
32✔
261
        }
262

263
        // return if nothing to do
264
        if len(statements) <= 0 {
16✔
NEW
265
                return nil
×
NEW
266
        }
×
267

268
        wr, err := r.db.Write(statements)
16✔
269
        if err != nil {
16✔
NEW
270
                for i, res := range wr {
×
NEW
271
                        if res.Err != nil {
×
NEW
272
                                return &database.Error{OrigErr: err, Query: []byte(statements[i])}
×
NEW
273
                        }
×
274
                }
275

276
                // if somehow we're still here, return the original error with combined queries
NEW
277
                return &database.Error{OrigErr: err, Query: []byte(strings.Join(statements, "\n"))}
×
278
        }
279

280
        return nil
16✔
281
}
282

283
func parseUrl(url string) (*nurl.URL, *Config, error) {
44✔
284
        parsedUrl, err := nurl.Parse(url)
44✔
285
        if err != nil {
46✔
286
                return nil, nil, err
2✔
287
        }
2✔
288

289
        config, err := parseConfigFromQuery(parsedUrl.Query())
42✔
290
        if err != nil {
54✔
291
                return nil, nil, err
12✔
292
        }
12✔
293

294
        if parsedUrl.Scheme != "rqlite" {
38✔
295
                return nil, nil, errors.Wrap(ErrBadConfig, "bad scheme")
8✔
296
        }
8✔
297

298
        // adapt from rqlite to http/https schemes
299
        if config.ConnectInsecure {
40✔
300
                parsedUrl.Scheme = "http"
18✔
301
        } else {
22✔
302
                parsedUrl.Scheme = "https"
4✔
303
        }
4✔
304

305
        filteredUrl := migrate.FilterCustomQuery(parsedUrl)
22✔
306

22✔
307
        return filteredUrl, config, nil
22✔
308
}
309

310
func parseConfigFromQuery(queryVals nurl.Values) (*Config, error) {
42✔
311
        c := Config{
42✔
312
                ConnectInsecure: DefaultConnectInsecure,
42✔
313
                MigrationsTable: DefaultMigrationsTable,
42✔
314
        }
42✔
315

42✔
316
        migrationsTable := queryVals.Get("x-migrations-table")
42✔
317
        if migrationsTable != "" {
46✔
318
                if strings.HasPrefix(migrationsTable, "sqlite_") {
6✔
319
                        return nil, errors.Wrap(ErrBadConfig, "invalid value for x-migrations-table")
2✔
320
                }
2✔
321
                c.MigrationsTable = migrationsTable
2✔
322
        }
323

324
        connectInsecureStr := queryVals.Get("x-connect-insecure")
40✔
325
        if connectInsecureStr != "" {
68✔
326
                connectInsecure, err := strconv.ParseBool(connectInsecureStr)
28✔
327
                if err != nil {
38✔
328
                        return nil, errors.Wrap(ErrBadConfig, "invalid value for x-connect-insecure")
10✔
329
                }
10✔
330
                c.ConnectInsecure = connectInsecure
18✔
331
        }
332

333
        return &c, nil
30✔
334
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc