• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

m-lab / etl-gardener / 3608

14 Dec 2022 10:22PM UTC coverage: 78.442% (+0.8%) from 77.646%
3608

Pull #390

travis-ci-com

web-flow
Merge 41c1db800 into 90b21e8a2
Pull Request #390: Add target Datasets to gardener configuration

28 of 28 new or added lines in 6 files covered. (100.0%)

1259 of 1605 relevant lines covered (78.44%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.9
/cloud/bq/ops.go
1
package bq
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "html/template"
8
        "log"
9

10
        "cloud.google.com/go/bigquery"
11
        "github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
12

13
        "github.com/m-lab/go/dataset"
14

15
        "github.com/m-lab/etl-gardener/tracker"
16
)
17

18
var (
19
        // ErrTableNotFound is returned when trying to load to non-existent tmp tables.
20
        ErrTableNotFound = errors.New("table not found")
21
        // ErrDatatypeNotSupported is returned by Query for unsupported datatypes.
22
        ErrDatatypeNotSupported = errors.New("datatype not supported")
23
)
24

25
// TableOps is used to construct and execute table partition operations.
26
type TableOps struct {
27
        client     bqiface.Client
28
        LoadSource string // The bucket/path to load from.
29
        Project    string
30
        Date       string // Name of the partition field
31
        Job        tracker.Job
32
        // map key is the single field name, value is fully qualified name
33
        PartitionKeys map[string]string
34
        OrderKeys     string
35
}
36

37
// NewTableOps creates a suitable QueryParams for a Job.
38
// The context is used to create a bigquery client, and should be kept alive while
39
// the querier is in use.
40
func NewTableOps(ctx context.Context, job tracker.Job, project string, loadSource string) (*TableOps, error) {
1✔
41
        c, err := bigquery.NewClient(ctx, project)
1✔
42
        if err != nil {
1✔
43
                return nil, err
×
44
        }
×
45
        bqClient := bqiface.AdaptClient(c)
1✔
46
        return NewTableOpsWithClient(bqClient, job, project, loadSource)
1✔
47
}
48

49
// NewTableOpsWithClient creates a suitable QueryParams for a Job.
50
func NewTableOpsWithClient(client bqiface.Client, job tracker.Job, project string, loadSource string) (*TableOps, error) {
1✔
51
        switch job.Datatype {
1✔
52
        case "switch":
×
53
                fallthrough
×
54
        case "annotation":
1✔
55
                fallthrough
1✔
56
        case "hopannotation1":
1✔
57
                fallthrough
1✔
58
        case "pcap":
1✔
59
                fallthrough
1✔
60
        case "scamper1":
1✔
61
                fallthrough
1✔
62
        case "tcpinfo":
1✔
63
                fallthrough
1✔
64
        case "ndt5":
1✔
65
                fallthrough
1✔
66
        case "ndt7":
1✔
67
                return &TableOps{
1✔
68
                        client:        client,
1✔
69
                        LoadSource:    loadSource,
1✔
70
                        Project:       project,
1✔
71
                        Date:          "date",
1✔
72
                        Job:           job,
1✔
73
                        PartitionKeys: map[string]string{"id": "id"},
1✔
74
                        OrderKeys:     "",
1✔
75
                }, nil
1✔
76

77
        default:
×
78
                return nil, ErrDatatypeNotSupported
×
79
        }
80
}
81

82
var queryTemplates = map[string]*template.Template{
83
        "dedup": dedupTemplate,
84
}
85

86
// makeQuery creates a query from a template.
87
func (to TableOps) makeQuery(t *template.Template) string {
1✔
88
        out := bytes.NewBuffer(nil)
1✔
89
        err := t.Execute(out, to)
1✔
90
        if err != nil {
1✔
91
                log.Println(err)
×
92
        }
×
93
        return out.String()
1✔
94
}
95

96
// dedupQuery returns the appropriate query in string form.
97
func dedupQuery(to TableOps) string {
1✔
98
        return to.makeQuery(dedupTemplate)
1✔
99
}
1✔
100

101
// Dedup initiates a deduplication query, and returns the bqiface.Job.
102
func (to TableOps) Dedup(ctx context.Context, dryRun bool) (bqiface.Job, error) {
1✔
103
        qs := dedupQuery(to)
1✔
104
        if len(qs) == 0 {
1✔
105
                return nil, dataset.ErrNilQuery
×
106
        }
×
107
        if to.client == nil {
1✔
108
                return nil, dataset.ErrNilBqClient
×
109
        }
×
110
        q := to.client.Query(qs)
1✔
111
        if q == nil {
1✔
112
                return nil, dataset.ErrNilQuery
×
113
        }
×
114
        qc := bqiface.QueryConfig{
1✔
115
                QueryConfig: bigquery.QueryConfig{
1✔
116
                        Q:      qs,
1✔
117
                        DryRun: dryRun,
1✔
118
                        // Schedule as batch job to avoid quota limits for interactive jobs.
1✔
119
                        Priority: bigquery.BatchPriority,
1✔
120
                },
1✔
121
        }
1✔
122
        q.SetQueryConfig(qc)
1✔
123
        return q.Run(ctx)
1✔
124
}
125

126
// LoadToTmp loads the "tmp" experiment table from files previously written to GCS by the parsers (or other source).
127
func (to TableOps) LoadToTmp(ctx context.Context, dryRun bool) (bqiface.Job, error) {
×
128
        if dryRun {
×
129
                return nil, errors.New("dryrun not implemented")
×
130
        }
×
131
        if to.client == nil {
×
132
                return nil, dataset.ErrNilBqClient
×
133
        }
×
134

135
        gcsRef := bigquery.NewGCSReference(to.LoadSource)
×
136
        gcsRef.SourceFormat = bigquery.JSON
×
137

×
138
        dest := to.client.
×
139
                Dataset(to.Job.Datasets.Tmp).
×
140
                Table(to.Job.Datatype)
×
141
        if dest == nil {
×
142
                return nil, ErrTableNotFound
×
143
        }
×
144
        loader := dest.LoaderFrom(gcsRef)
×
145
        loadConfig := bqiface.LoadConfig{}
×
146
        loadConfig.WriteDisposition = bigquery.WriteAppend
×
147
        loadConfig.Dst = dest
×
148
        loadConfig.Src = gcsRef
×
149
        loader.SetLoadConfig(loadConfig)
×
150

×
151
        return loader.Run(ctx)
×
152
}
153

154
// CopyToRaw copies the job "temp" table partition to the "raw" table partition.
155
func (to TableOps) CopyToRaw(ctx context.Context, dryRun bool) (bqiface.Job, error) {
×
156
        if dryRun {
×
157
                return nil, errors.New("dryrun not implemented")
×
158
        }
×
159
        if to.client == nil {
×
160
                return nil, dataset.ErrNilBqClient
×
161
        }
×
162
        tableName := to.Job.TablePartition()
×
163
        src := to.client.Dataset(to.Job.Datasets.Tmp).Table(tableName)
×
164
        dest := to.client.Dataset(to.Job.Datasets.Raw).Table(tableName)
×
165

×
166
        copier := dest.CopierFrom(src)
×
167
        config := bqiface.CopyConfig{}
×
168
        config.WriteDisposition = bigquery.WriteTruncate
×
169
        config.Dst = dest
×
170
        config.Srcs = append(config.Srcs, src)
×
171
        copier.SetCopyConfig(config)
×
172
        return copier.Run(ctx)
×
173
}
174

175
const tmpTable = "`{{.Project}}.{{.Job.Datasets.Tmp}}.{{.Job.Datatype}}`"
176
const rawTable = "`{{.Project}}.{{.Job.Datasets.Raw}}.{{.Job.Datatype}}`"
177

178
// NOTE: experiment annotations must come from the same raw experiment dataset.
179
const annoTable = "`{{.Project}}.{{.Job.Datasets.Raw}}.annotation`"
180

181
var dedupTemplate = template.Must(template.New("").Parse(`
182
#standardSQL
183
# Delete all duplicate rows based on key and prefered priority ordering.
184
# This is resource intensive for tcpinfo - 20 slot hours for 12M rows with 250M snapshots,
185
# roughly proportional to the memory footprint of the table partition.
186
# The query is very cheap if there are no duplicates.
187
DELETE
188
FROM ` + tmpTable + ` AS target
189
WHERE {{.Date}} = "{{.Job.Date.Format "2006-01-02"}}"
190
# This identifies all rows that don't match rows to preserve.
191
AND NOT EXISTS (
192
  # This creates list of rows to preserve, based on key and priority.
193
  WITH keep AS (
194
  SELECT * EXCEPT(row_number) FROM (
195
    SELECT
196
      {{range $k, $v := .PartitionKeys}}{{$v}}, {{end}}
197
          parser.Time,
198
      ROW_NUMBER() OVER (
199
        PARTITION BY {{range $k, $v := .PartitionKeys}}{{$v}}, {{end}}date
200
        ORDER BY {{.OrderKeys}} parser.Time DESC
201
      ) row_number
202
      FROM (
203
        SELECT * FROM ` + tmpTable + `
204
        WHERE {{.Date}} = "{{.Job.Date.Format "2006-01-02"}}"
205
      )
206
    )
207
    WHERE row_number = 1
208
  )
209
  SELECT * FROM keep
210
  # This matches against the keep table based on keys.  Sufficient select keys must be
211
  # used to distinguish the preferred row from the others.
212
  WHERE
213
    {{range $k, $v := .PartitionKeys}}target.{{$v}} = keep.{{$k}} AND {{end}}
214
    target.parser.Time = keep.Time
215
)`))
216

217
// DeleteTmp deletes the tmp table partition.
218
func (to TableOps) DeleteTmp(ctx context.Context) error {
×
219
        if to.client == nil {
×
220
                return dataset.ErrNilBqClient
×
221
        }
×
222
        tmp := to.client.Dataset(to.Job.Datasets.Tmp).Table(to.Job.TablePartition())
×
223
        log.Println("Deleting", tmp.FullyQualifiedName())
×
224
        return tmp.Delete(ctx)
×
225
}
226

227
// joinTemplate is used to create join queries, based on Job details.
228
// TODO - explore whether using query parameters would improve readability
229
// instead of executing this template.  Query params cannot be used for
230
// table names, but would work for most other variables.
231
var joinTemplate = template.Must(template.New("").Parse(`
232
#standardSQL
233
# Join the ndt7 data with server and client annotation.
234
WITH {{.Job.Datatype}} AS (
235
SELECT *
236
FROM ` + rawTable + `
237
WHERE {{.Date}} = "{{.Job.Date.Format "2006-01-02"}}"
238
),
239

240
# Need to remove dups?
241
ann AS (
242
SELECT *
243
FROM ` + annoTable + `
244
WHERE {{.Date}} BETWEEN DATE_SUB("{{.Job.Date.Format "2006-01-02"}}", INTERVAL 1 DAY) AND "{{.Job.Date.Format "2006-01-02"}}"
245
)
246

247
SELECT {{.Job.Datatype}}.id, {{.Job.Datatype}}.date, {{.Job.Datatype}}.parser,
248
       ann.* EXCEPT(id, date, parser), {{.Job.Datatype}}.* EXCEPT(id, date, parser)
249
FROM {{.Job.Datatype}} LEFT JOIN ann USING (id)
250
`))
251

252
// Join joins the raw tables into annotated tables.
253
func (to TableOps) Join(ctx context.Context, dryRun bool) (bqiface.Job, error) {
1✔
254
        qs := to.makeQuery(joinTemplate)
1✔
255

1✔
256
        if len(qs) == 0 {
1✔
257
                return nil, dataset.ErrNilQuery
×
258
        }
×
259
        if to.client == nil {
1✔
260
                return nil, dataset.ErrNilBqClient
×
261
        }
×
262
        q := to.client.Query(qs)
1✔
263
        if q == nil {
1✔
264
                return nil, dataset.ErrNilQuery
×
265
        }
×
266
        // The destintation is a partition in a table based on the job
267
        // type and date.  Initially, this will only be ndt7.
268
        dest := to.client.Dataset(to.Job.Datasets.Join).Table(to.Job.TablePartition())
1✔
269
        qc := bqiface.QueryConfig{
1✔
270
                QueryConfig: bigquery.QueryConfig{
1✔
271
                        DryRun: dryRun,
1✔
272
                        Q:      qs,
1✔
273
                        // We want to replace the whole partition
1✔
274
                        WriteDisposition: bigquery.WriteTruncate,
1✔
275
                        // Create the table if it doesn't exist
1✔
276
                        CreateDisposition: bigquery.CreateIfNeeded,
1✔
277
                        // Allow additional fields introduced by the raw tables to be automatically
1✔
278
                        // added to the joined, materialized output table.
1✔
279
                        SchemaUpdateOptions: []string{"ALLOW_FIELD_ADDITION", "ALLOW_FIELD_RELAXATION"},
1✔
280
                        // Partitioning spec, in event we have to create the table.
1✔
281
                        TimePartitioning: &bigquery.TimePartitioning{
1✔
282
                                Field:                  "date",
1✔
283
                                RequirePartitionFilter: true,
1✔
284
                        },
1✔
285
                        // Schedule as batch job to avoid quota limits for interactive jobs.
1✔
286
                        Priority: bigquery.BatchPriority,
1✔
287
                },
1✔
288
                Dst: dest,
1✔
289
        }
1✔
290
        q.SetQueryConfig(qc)
1✔
291
        return q.Run(ctx)
1✔
292
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc