• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snowplow / sql-runner / 15063404311

16 May 2025 07:46AM UTC coverage: 27.73% (+1.0%) from 26.721%
15063404311

push

github

jbeemster
Prepared for release

353 of 1273 relevant lines covered (27.73%)

2.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

25.12
/sql_runner/run.go
1
// Copyright (c) 2015-2025 Snowplow Analytics Ltd. All rights reserved.
2
//
3
// This program is licensed to you under the Apache License Version 2.0,
4
// and you may not use this file except in compliance with the Apache License Version 2.0.
5
// You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
6
//
7
// Unless required by applicable law or agreed to in writing,
8
// software distributed under the Apache License Version 2.0 is distributed on an
9
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10
// See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
11
package main
12

13
import (
14
        "bytes"
15
        "fmt"
16
        "log"
17
        "strings"
18
)
19

20
const (
21
        redshiftType   = "redshift"
22
        postgresType   = "postgres"
23
        postgresqlType = "postgresql"
24
        snowflakeType  = "snowflake"
25
        bigqueryType   = "bigquery"
26

27
        errorUnsupportedDbType = "Database type is unsupported"
28
        errorFromStepNotFound  = "The fromStep argument did not match any available steps"
29
        errorQueryFailedInit   = "An error occurred loading the SQL file"
30
        errorRunQueryNotFound  = "The runQuery argument did not match any available queries"
31
        errorRunQueryArgument  = "Argument for -runQuery should be in format 'step::query'"
32
        errorNewTargetFailure  = "Failed to create target"
33
)
34

35
// TargetStatus reports on any errors from running the
36
// playbook against a singular target.
37
type TargetStatus struct {
38
        Name   string
39
        Errors []error // For any errors not related to a specific step
40
        Steps  []StepStatus
41
}
42

43
// StepStatus reports on any errors from running a step.
44
type StepStatus struct {
45
        Name    string
46
        Index   int
47
        Queries []QueryStatus
48
}
49

50
// QueryStatus reports ony any error from a query.
51
type QueryStatus struct {
52
        Query    ReadyQuery
53
        Path     string
54
        Affected int
55
        Error    error
56
}
57

58
// ReadyStep contains a step that is ready for execution.
59
type ReadyStep struct {
60
        Name    string
61
        Queries []ReadyQuery
62
}
63

64
// ReadyQuery contains a query that is ready for execution.
65
type ReadyQuery struct {
66
        Script string
67
        Name   string
68
        Path   string
69
}
70

71
// Run runs a playbook of SQL scripts.
72
//
73
// Handles dispatch to the appropriate
74
// database engine
75
func Run(pb Playbook, sp SQLProvider, fromStep string, runQuery string, dryRun bool, fillTemplates bool, showQueryOutput bool) []TargetStatus {
×
76

×
77
        var steps []Step
×
78
        var trimErr []TargetStatus
×
79

×
80
        if runQuery != "" {
×
81
                steps, trimErr = trimToQuery(pb.Steps, runQuery, pb.Targets)
×
82
        } else {
×
83
                steps, trimErr = trimSteps(pb.Steps, fromStep, pb.Targets)
×
84
        }
×
85
        if trimErr != nil {
×
86
                return trimErr
×
87
        }
×
88

89
        // Prepare all SQL queries
90
        readySteps, readyErr := loadSteps(steps, sp, pb.Variables, pb.Targets)
×
91
        if readyErr != nil {
×
92
                return readyErr
×
93
        }
×
94

95
        if fillTemplates {
×
96
                for _, steps := range readySteps {
×
97
                        for _, query := range steps.Queries {
×
98
                                var message bytes.Buffer
×
99
                                message.WriteString(fmt.Sprintf("Step name: %s\n", steps.Name))
×
100
                                message.WriteString(fmt.Sprintf("Query name: %s\n", query.Name))
×
101
                                message.WriteString(fmt.Sprintf("Query path: %s\n", query.Path))
×
102
                                message.WriteString(query.Script)
×
103
                                log.Print(message.String())
×
104
                        }
×
105
                }
106
                allStatuses := make([]TargetStatus, 0)
×
107
                return allStatuses
×
108
        }
109

110
        targetChan := make(chan TargetStatus, len(pb.Targets))
×
111

×
112
        // Route each target to the right db client and run
×
113
        for _, tgt := range pb.Targets {
×
114
                routeAndRun(tgt, readySteps, targetChan, dryRun, showQueryOutput)
×
115
        }
×
116

117
        // Compose statuses from each target run
118
        // Duplicated in runSteps, because NOGENERICS
119
        allStatuses := make([]TargetStatus, 0)
×
120
        for i := 0; i < len(pb.Targets); i++ {
×
121
                select {
×
122
                case status := <-targetChan:
×
123
                        allStatuses = append(allStatuses, status)
×
124
                }
125
        }
126

127
        return allStatuses
×
128
}
129

130
// --- Pre-run processors
131

132
// Trims down to an indivdual query
133
func trimToQuery(steps []Step, runQuery string, targets []Target) ([]Step, []TargetStatus) {
10✔
134
        runQueryParts := strings.Split(runQuery, "::")
10✔
135
        if len(runQueryParts) != 2 {
13✔
136
                err := fmt.Errorf(errorRunQueryArgument)
3✔
137
                return nil, makeTargetStatuses(err, targets)
3✔
138
        }
3✔
139

140
        var stepName, queryName string = runQueryParts[0], runQueryParts[1]
7✔
141
        if stepName == "" || queryName == "" {
9✔
142
                err := fmt.Errorf(errorRunQueryArgument)
2✔
143
                return nil, makeTargetStatuses(err, targets)
2✔
144
        }
2✔
145

146
        steps, trimErr := trimSteps(steps, stepName, targets)
5✔
147
        if trimErr != nil {
6✔
148
                return nil, trimErr
1✔
149
        }
1✔
150

151
        step := steps[0] // safe
4✔
152
        queries := []Query{}
4✔
153
        for _, query := range step.Queries {
8✔
154
                if query.Name == queryName {
7✔
155
                        queries = append(queries, query)
3✔
156
                        break
3✔
157
                }
158
        }
159

160
        if len(queries) == 0 {
5✔
161
                err := fmt.Errorf("%s: '%s'", errorRunQueryNotFound, queryName)
1✔
162
                return nil, makeTargetStatuses(err, targets)
1✔
163
        }
1✔
164
        step.Queries = queries
3✔
165

3✔
166
        return []Step{step}, nil
3✔
167
}
168

169
// Trims skippable steps
170
func trimSteps(steps []Step, fromStep string, targets []Target) ([]Step, []TargetStatus) {
5✔
171
        stepIndex := 0
5✔
172
        if fromStep != "" {
10✔
173
                exists := false
5✔
174
                for i := 0; i < len(steps); i++ {
13✔
175
                        if steps[i].Name == fromStep {
12✔
176
                                exists = true
4✔
177
                                stepIndex = i
4✔
178
                                break
4✔
179
                        }
180
                }
181
                if !exists {
6✔
182
                        err := fmt.Errorf("%s: %s", errorFromStepNotFound, fromStep)
1✔
183
                        return nil, makeTargetStatuses(err, targets)
1✔
184
                }
1✔
185
        }
186
        return steps[stepIndex:], nil
4✔
187
}
188

189
// Helper to create the corresponding []TargetStatus given an error.
190
func makeTargetStatuses(err error, targets []Target) []TargetStatus {
7✔
191
        allStatuses := make([]TargetStatus, 0, len(targets))
7✔
192
        for _, tgt := range targets {
21✔
193
                errs := []error{err}
14✔
194
                status := TargetStatus{
14✔
195
                        Name:   tgt.Name,
14✔
196
                        Errors: errs,
14✔
197
                        Steps:  nil,
14✔
198
                }
14✔
199
                allStatuses = append(allStatuses, status)
14✔
200
        }
14✔
201

202
        return allStatuses
7✔
203
}
204

205
// Loads all SQL files for all Steps in the playbook ahead of time
206
// Fails as soon as a bad query is found
207
func loadSteps(steps []Step, sp SQLProvider, variables map[string]interface{}, targets []Target) ([]ReadyStep, []TargetStatus) {
×
208
        sCount := len(steps)
×
209
        readySteps := make([]ReadyStep, sCount)
×
210

×
211
        for i := 0; i < sCount; i++ {
×
212
                step := steps[i]
×
213
                qCount := len(step.Queries)
×
214
                readyQueries := make([]ReadyQuery, qCount)
×
215

×
216
                for j := 0; j < qCount; j++ {
×
217
                        query := step.Queries[j]
×
218
                        queryText, err := prepareQuery(query.File, sp, query.Template, variables)
×
219
                        queryPath := sp.ResolveKey(query.File)
×
220

×
221
                        if err != nil {
×
222
                                allStatuses := make([]TargetStatus, 0)
×
223
                                for _, tgt := range targets {
×
224
                                        status := loadQueryFailed(tgt.Name, queryPath, err)
×
225
                                        allStatuses = append(allStatuses, status)
×
226
                                }
×
227
                                return nil, allStatuses
×
228
                        }
229
                        readyQueries[j] = ReadyQuery{Script: queryText, Name: query.Name, Path: queryPath}
×
230
                }
231
                readySteps[i] = ReadyStep{Name: step.Name, Queries: readyQueries}
×
232
        }
233
        return readySteps, nil
×
234
}
235

236
// Helper for a load query failed error
237
func loadQueryFailed(targetName string, queryPath string, err error) TargetStatus {
×
238
        errs := []error{fmt.Errorf("%s: %s: %s", errorQueryFailedInit, queryPath, err)}
×
239
        return TargetStatus{
×
240
                Name:   targetName,
×
241
                Errors: errs,
×
242
                Steps:  nil,
×
243
        }
×
244
}
×
245

246
// --- Running
247

248
// Route to correct database client and run
249
func routeAndRun(target Target, readySteps []ReadyStep, targetChan chan TargetStatus, dryRun bool, showQueryOutput bool) {
×
250
        switch strings.ToLower(target.Type) {
×
251
        case redshiftType, postgresType, postgresqlType:
×
252
                go func(tgt Target) {
×
253
                        pg, err := NewPostgresTarget(tgt)
×
254
                        if err != nil {
×
255
                                targetChan <- newTargetFailure(tgt, err)
×
256
                                return
×
257
                        }
×
258
                        targetChan <- runSteps(pg, readySteps, dryRun, showQueryOutput)
×
259
                }(target)
260
        case snowflakeType:
×
261
                go func(tgt Target) {
×
262
                        snfl, err := NewSnowflakeTarget(tgt)
×
263
                        if err != nil {
×
264
                                targetChan <- newTargetFailure(tgt, err)
×
265
                                return
×
266
                        }
×
267
                        targetChan <- runSteps(snfl, readySteps, dryRun, showQueryOutput)
×
268
                }(target)
269
        case bigqueryType:
×
270
                go func(tgt Target) {
×
271
                        bq, err := NewBigQueryTarget(tgt)
×
272
                        if err != nil {
×
273
                                targetChan <- newTargetFailure(tgt, err)
×
274
                                return
×
275
                        }
×
276
                        targetChan <- runSteps(bq, readySteps, dryRun, showQueryOutput)
×
277
                }(target)
278
        default:
×
279
                targetChan <- unsupportedDbType(target.Name, target.Type)
×
280
        }
281
}
282

283
// Helper for an unrecognized database type
284
func unsupportedDbType(targetName string, targetType string) TargetStatus {
×
285
        errs := []error{fmt.Errorf("%s: %s", errorUnsupportedDbType, targetType)}
×
286
        return TargetStatus{
×
287
                Name:   targetName,
×
288
                Errors: errs,
×
289
                Steps:  nil,
×
290
        }
×
291
}
×
292

293
// Helper to create TargetStatus after an error on New*Target
294
func newTargetFailure(target Target, err error) TargetStatus {
×
295
        errs := []error{fmt.Errorf("%s: %s: %s", errorNewTargetFailure, target.Type, err.Error())}
×
296
        return TargetStatus{
×
297
                Name:   target.Name,
×
298
                Errors: errs,
×
299
                Steps:  nil,
×
300
        }
×
301
}
×
302

303
// Handles the sequential flow of steps (some of
304
// which may involve multiple queries in parallel).
305
//
306
// runSteps fails fast - we stop executing SQL on
307
// this target when a step fails.
308
func runSteps(database Db, steps []ReadyStep, dryRun bool, showQueryOutput bool) TargetStatus {
×
309

×
310
        allStatuses := make([]StepStatus, len(steps))
×
311

×
312
FailFast:
×
313
        for i, stp := range steps {
×
314
                stpIndex := i + 1
×
315
                status := runQueries(database, stpIndex, stp.Name, stp.Queries, dryRun, showQueryOutput)
×
316
                allStatuses = append(allStatuses, status)
×
317

×
318
                for _, qry := range status.Queries {
×
319
                        if qry.Error != nil {
×
320
                                break FailFast
×
321
                        }
322
                }
323
        }
324
        return TargetStatus{
×
325
                Name:   database.GetTarget().Name,
×
326
                Errors: nil,
×
327
                Steps:  allStatuses,
×
328
        }
×
329
}
330

331
// Handles running N queries in parallel.
332
//
333
// runQueries composes failures across the queries
334
// for a given step: if one query fails, the others
335
// will still complete.
336
func runQueries(database Db, stepIndex int, stepName string, queries []ReadyQuery, dryRun bool, showQueryOutput bool) StepStatus {
×
337

×
338
        queryChan := make(chan QueryStatus, len(queries))
×
339
        dbName := database.GetTarget().Name
×
340

×
341
        // Route each target to the right db client and run
×
342
        for _, query := range queries {
×
343
                go func(qry ReadyQuery) {
×
344
                        log.Printf("EXECUTING %s (in step %s @ %s): %s", qry.Name, stepName, dbName, qry.Path)
×
345
                        queryChan <- database.RunQuery(qry, dryRun, showQueryOutput)
×
346
                }(query)
×
347
        }
348

349
        // Collect statuses from each target run
350
        allStatuses := make([]QueryStatus, 0)
×
351
        for i := 0; i < len(queries); i++ {
×
352
                select {
×
353
                case status := <-queryChan:
×
354
                        if status.Error != nil {
×
355
                                log.Printf("FAILURE: %s (step %s @ target %s), ERROR: %s\n", status.Query.Name, stepName, dbName, status.Error.Error())
×
356
                        } else {
×
357
                                log.Printf("SUCCESS: %s (step %s @ target %s), ROWS AFFECTED: %d\n", status.Query.Name, stepName, dbName, status.Affected)
×
358
                        }
×
359
                        allStatuses = append(allStatuses, status)
×
360
                }
361
        }
362

363
        return StepStatus{
×
364
                Name:    stepName,
×
365
                Index:   stepIndex,
×
366
                Queries: allStatuses,
×
367
        }
×
368
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc