• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snowplow / sql-runner / 15063404311

16 May 2025 07:46AM UTC coverage: 27.73% (+1.0%) from 26.721%
15063404311

push

github

jbeemster
Prepared for release

353 of 1273 relevant lines covered (27.73%)

2.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/sql_runner/bigquery_target.go
1
// Copyright (c) 2015-2025 Snowplow Analytics Ltd. All rights reserved.
2
//
3
// This program is licensed to you under the Apache License Version 2.0,
4
// and you may not use this file except in compliance with the Apache License Version 2.0.
5
// You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
6
//
7
// Unless required by applicable law or agreed to in writing,
8
// software distributed under the Apache License Version 2.0 is distributed on an
9
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10
// See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
11
package main
12

13
import (
14
        "fmt"
15
        "log"
16
        "os"
17
        "strings"
18

19
        bq "cloud.google.com/go/bigquery"
20
        "github.com/olekukonko/tablewriter"
21
        "golang.org/x/net/context"
22
        "google.golang.org/api/iterator"
23
)
24

25
// BigQueryTarget represents BigQuery as a target.
26
type BigQueryTarget struct {
27
        Target
28
        Client *bq.Client
29
}
30

31
// IsConnectable tests connection to determine whether the BigQuery target is
32
// connectable.
33
func (bqt BigQueryTarget) IsConnectable() bool {
×
34
        var err error = nil
×
35
        ctx := context.Background()
×
36

×
37
        client := bqt.Client
×
38
        query := client.Query("SELECT 1") // empty query to test connection
×
39

×
40
        it, err := query.Read(ctx)
×
41
        if err != nil {
×
42
                log.Printf("ERROR: Failed to perform test query: %v", err)
×
43
                return false
×
44
        }
×
45

46
        var row []bq.Value
×
47
        err = it.Next(&row)
×
48
        if err != nil {
×
49
                log.Printf("ERROR: Failed to read test query results: %v", err)
×
50
                return false
×
51
        }
×
52

53
        return fmt.Sprint(row) == "[1]"
×
54
}
55

56
// NewBigQueryTarget returns a ptr to a BigQueryTarget.
57
func NewBigQueryTarget(target Target) (*BigQueryTarget, error) {
×
58
        projectID := target.Project
×
59
        ctx := context.Background()
×
60

×
61
        client, err := bq.NewClient(ctx, projectID)
×
62
        if err != nil {
×
63
                return nil, err
×
64
        }
×
65

66
        client.Location = target.Region
×
67

×
68
        return &BigQueryTarget{target, client}, nil
×
69
}
70

71
// GetTarget returns the Target field of BigQueryTarget.
72
func (bqt BigQueryTarget) GetTarget() Target {
×
73
        return bqt.Target
×
74
}
×
75

76
// RunQuery runs a query against the target.
77
func (bqt BigQueryTarget) RunQuery(query ReadyQuery, dryRun bool, showQueryOutput bool) QueryStatus {
×
78
        var affected int64 = 0
×
79
        var err error = nil
×
80
        var schema bq.Schema = nil
×
81
        ctx := context.Background()
×
82

×
83
        if dryRun {
×
84
                if bqt.IsConnectable() {
×
85
                        log.Printf("SUCCESS: Able to connect to target database, %s.", bqt.Project)
×
86
                } else {
×
87
                        log.Printf("ERROR: Cannot connect to target database, %s.", bqt.Project)
×
88
                }
×
89
                return QueryStatus{query, query.Path, 0, nil}
×
90
        }
91

92
        script := query.Script
×
93

×
94
        if len(strings.TrimSpace(script)) > 0 {
×
95
                // If showing query output, perform a dry run to get column metadata
×
96
                if showQueryOutput {
×
97
                        dq := bqt.Client.Query(script)
×
98
                        dq.DryRun = true
×
99
                        dqJob, err := dq.Run(ctx)
×
100
                        if err != nil {
×
101
                                log.Printf("ERROR: Failed to dry run job: %s.", err)
×
102
                                return QueryStatus{query, query.Path, int(affected), err}
×
103
                        }
×
104

105
                        schema = dqJob.LastStatus().Statistics.Details.(*bq.QueryStatistics).Schema
×
106
                }
107

108
                q := bqt.Client.Query(script)
×
109

×
110
                job, err := q.Run(ctx)
×
111
                if err != nil {
×
112
                        log.Printf("ERROR: Failed to run job: %s.", err)
×
113
                        return QueryStatus{query, query.Path, int(affected), err}
×
114
                }
×
115

116
                it, err := job.Read(ctx)
×
117
                if err != nil {
×
118
                        log.Printf("ERROR: Failed to read job results: %s.", err)
×
119
                        return QueryStatus{query, query.Path, int(affected), err}
×
120
                }
×
121

122
                status, err := job.Status(ctx)
×
123
                if err != nil {
×
124
                        log.Printf("ERROR: Failed to read job results: %s.", err)
×
125
                        return QueryStatus{query, query.Path, int(affected), err}
×
126
                }
×
127
                if err := status.Err(); err != nil {
×
128
                        log.Printf("ERROR: Error running job: %s.", err)
×
129
                        return QueryStatus{query, query.Path, int(affected), err}
×
130
                }
×
131

132
                if showQueryOutput {
×
133
                        err = printBqTable(it, schema)
×
134
                        if err != nil {
×
135
                                log.Printf("ERROR: Failed to print output: %s.", err)
×
136
                                return QueryStatus{query, query.Path, int(affected), err}
×
137
                        }
×
138
                } else {
×
139
                        queryStats := job.LastStatus().Statistics.Details.(*bq.QueryStatistics)
×
140
                        aff := queryStats.NumDMLAffectedRows
×
141
                        affected += aff
×
142
                }
×
143
        }
144

145
        return QueryStatus{query, query.Path, int(affected), err}
×
146
}
147

148
func printBqTable(rows *bq.RowIterator, schema bq.Schema) error {
×
149
        outputBuffer := make([][]string, 0, 10)
×
150

×
151
        for {
×
152
                var row []bq.Value
×
153
                err := rows.Next(&row)
×
154
                if err == iterator.Done {
×
155
                        break
×
156
                }
157
                if err != nil {
×
158
                        return err
×
159
                }
×
160
                outputBuffer = append(outputBuffer, bqStringify(row))
×
161
        }
162

163
        if len(outputBuffer) > 0 {
×
164
                log.Printf("QUERY OUTPUT:\n")
×
165
                table := tablewriter.NewWriter(os.Stdout)
×
166

×
167
                // Get columns from table schema
×
168
                columns := make([]string, len(schema))
×
169
                for i, field := range schema {
×
170
                        columns[i] = field.Name
×
171
                }
×
172
                table.SetHeader(columns)
×
173

×
174
                table.SetBorders(tablewriter.Border{Left: true, Top: false, Right: true, Bottom: false})
×
175
                table.SetCenterSeparator("|")
×
176

×
177
                for _, row := range outputBuffer {
×
178
                        table.Append(row)
×
179
                }
×
180

181
                table.Render() // Send output
×
182
        }
183
        return nil
×
184
}
185

186
func bqStringify(row []bq.Value) []string {
×
187
        var line []string
×
188
        for _, element := range row {
×
189
                line = append(line, fmt.Sprint(element))
×
190
        }
×
191
        return line
×
192
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc