• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

m-lab / go / 1425

19 Nov 2024 01:38PM UTC coverage: 94.535% (-1.4%) from 95.96%
1425

Pull #183

travis-pro

web-flow
Merge branch 'main' into dependabot/go_modules/golang.org/x/net-0.23.0
Pull Request #183: Bump golang.org/x/net from 0.0.0-20200421231249-e086a090c8fd to 0.23.0

2387 of 2525 relevant lines covered (94.53%)

134.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.82
/cloud/bqx/schema.go
1
// Package bqx provides utilities and extensions for working with bigquery.
2
package bqx
3

4
import (
5
        "bytes"
6
        "context"
7
        "encoding/json"
8
        "errors"
9
        "fmt"
10
        "log"
11
        "regexp"
12
        "strings"
13

14
        "cloud.google.com/go/bigquery"
15
        "github.com/m-lab/go/rtx"
16
        "gopkg.in/yaml.v2"
17
)
18

19
// PrettyPrint generates a formatted json representation of a Schema.
20
// It simplifies the schema by removing zero valued fields, and compacting
21
// each field record onto a single line.
22
// Intended for diagnostics and debugging.  Not suitable for production use.
23
func PrettyPrint(schema bigquery.Schema, simplify bool) (string, error) {
2✔
24
        jsonBytes, err := json.MarshalIndent(schema, "", "  ")
2✔
25
        if err != nil {
2✔
26
                return "", err
×
27
        }
×
28

29
        lines := strings.Split(string(jsonBytes), "\n")
2✔
30
        before := ""
2✔
31
        output := &bytes.Buffer{}
2✔
32

2✔
33
        for _, line := range lines {
170✔
34
                // Remove Required from all fields.
168✔
35
                trim := strings.Trim(strings.TrimSpace(line), ",")
168✔
36
                switch trim {
168✔
37
                case `"Schema": null`:
16✔
38
                        fallthrough
16✔
39
                case `"Repeated": false`:
32✔
40
                        fallthrough
32✔
41
                case `"Required": false`:
34✔
42
                        if !simplify {
34✔
43
                                fmt.Fprint(output, before, trim)
×
44
                                before = ", "
×
45
                        }
×
46
                case `"Required": true`:
16✔
47
                        fmt.Fprint(output, before, trim)
16✔
48
                        before = ", "
16✔
49
                case `"Schema": [`:
2✔
50
                        fallthrough
2✔
51
                case `[`:
4✔
52
                        fmt.Fprintf(output, "%s%s\n", before, trim)
4✔
53
                        before = ""
4✔
54
                case `{`:
18✔
55
                        fmt.Fprint(output, line)
18✔
56
                        before = ""
18✔
57
                case `}`:
18✔
58
                        fmt.Fprintln(output, strings.TrimSpace(line))
18✔
59
                case `]`:
4✔
60
                        fmt.Fprint(output, line)
4✔
61
                        before = ""
4✔
62
                default:
74✔
63
                        fmt.Fprint(output, before, trim)
74✔
64
                        before = ", "
74✔
65
                }
66
        }
67
        fmt.Fprintln(output)
2✔
68
        return output.String(), nil
2✔
69
}
70

71
// CustomizeAppend recursively traverses a schema, appending the
72
// bigquery.FieldSchema to existing fields matching a name in the provided map.
73
func CustomizeAppend(schema bigquery.Schema, additions map[string]*bigquery.FieldSchema) bigquery.Schema {
4✔
74
        // We have to copy the schema, to avoid corrupting the bigquery fieldCache.
4✔
75
        custom := make(bigquery.Schema, len(schema))
4✔
76
        for i := range schema {
12✔
77
                custom[i] = &bigquery.FieldSchema{}
8✔
78
                *custom[i] = *schema[i]
8✔
79
                fs := custom[i]
8✔
80
                s, ok := additions[fs.Name]
8✔
81
                if ok {
10✔
82
                        fs.Schema = append(fs.Schema, s)
2✔
83

2✔
84
                } else {
8✔
85
                        if fs.Type == bigquery.RecordFieldType {
8✔
86
                                fs.Schema = CustomizeAppend(fs.Schema, additions)
2✔
87
                        }
2✔
88
                }
89

90
        }
91
        return custom
4✔
92
}
93

94
// Customize recursively traverses a schema, substituting any fields that have
95
// a matching name in the provided map.
96
func Customize(schema bigquery.Schema, subs map[string]bigquery.FieldSchema) bigquery.Schema {
4✔
97
        // We have to copy the schema, to avoid corrupting the bigquery fieldCache.
4✔
98
        out := make(bigquery.Schema, len(schema))
4✔
99
        for i := range schema {
22✔
100
                out[i] = &bigquery.FieldSchema{}
18✔
101
                *out[i] = *schema[i]
18✔
102
                fs := out[i]
18✔
103
                s, ok := subs[fs.Name]
18✔
104
                if ok {
22✔
105
                        *fs = s
4✔
106

4✔
107
                } else {
18✔
108
                        if fs.Type == bigquery.RecordFieldType {
16✔
109
                                fs.Schema = Customize(fs.Schema, subs)
2✔
110
                        }
2✔
111
                }
112

113
        }
114
        return out
4✔
115
}
116

117
// RemoveRequired recursively traverses a schema, setting Required to false in
118
// all fields that are not fundamentally required by BigQuery.
119
func RemoveRequired(schema bigquery.Schema) bigquery.Schema {
4✔
120
        // We have to copy the schema, to avoid corrupting the bigquery fieldCache.
4✔
121
        out := make(bigquery.Schema, len(schema))
4✔
122
        for i := range schema {
22✔
123
                out[i] = &bigquery.FieldSchema{}
18✔
124
                *out[i] = *schema[i]
18✔
125
                fs := out[i]
18✔
126
                switch fs.Type {
18✔
127
                case bigquery.RecordFieldType:
2✔
128
                        fs.Required = false
2✔
129
                        fs.Schema = RemoveRequired(fs.Schema)
2✔
130

131
                default:
16✔
132
                        // Mark all fields as nullable.
16✔
133
                        fs.Required = false
16✔
134
                }
135
        }
136

137
        return out
4✔
138
}
139

140
// These errors are self-explanatory.
141
var (
142
        ErrInvalidProjectName = errors.New("Invalid project name")
143
        ErrInvalidDatasetName = errors.New("Invalid dataset name")
144
        ErrInvalidTableName   = errors.New("Invalid table name")
145
        ErrInvalidFQTable     = errors.New("Invalid fully qualified table name")
146
)
147

148
var (
149
        projectRegex = regexp.MustCompile("^[a-z0-9-]+$")
150
        datasetRegex = regexp.MustCompile("^[a-zA-Z0-9_]+$")
151
        tableRegex   = regexp.MustCompile("^[a-zA-Z0-9_]+$")
152
)
153

154
// PDT contains a bigquery project, dataset, and table name.
155
type PDT struct {
156
        Project string `json:",omitempty"`
157
        Dataset string `json:",omitempty"`
158
        Table   string `json:",omitempty"`
159
}
160

161
// ParsePDT parses and validates a fully qualified bigquery table name of the
162
// form project.dataset.table.  None of the elements needs to exist, but all
163
// must conform to the corresponding naming restrictions.
164
func ParsePDT(fq string) (PDT, error) {
14✔
165
        parts := strings.Split(fq, ".")
14✔
166
        if len(parts) != 3 {
16✔
167
                return PDT{}, ErrInvalidFQTable
2✔
168
        }
2✔
169
        if !projectRegex.MatchString(parts[0]) {
14✔
170
                return PDT{}, ErrInvalidProjectName
2✔
171
        }
2✔
172
        if !datasetRegex.MatchString(parts[1]) {
12✔
173
                return PDT{}, ErrInvalidDatasetName
2✔
174
        }
2✔
175
        if !tableRegex.MatchString(parts[2]) {
10✔
176
                return PDT{}, ErrInvalidTableName
2✔
177
        }
2✔
178
        return PDT{parts[0], parts[1], parts[2]}, nil
6✔
179
}
180

181
// UpdateTable will update an existing table.  Returns error if the table
182
// doesn't already exist, or if the schema changes are incompatible.
183
func (pdt PDT) UpdateTable(ctx context.Context, client *bigquery.Client, schema bigquery.Schema,
184
        partitioning *bigquery.TimePartitioning) error {
4✔
185
        // See if dataset exists, or create it.
4✔
186
        ds := client.Dataset(pdt.Dataset)
4✔
187
        _, err := ds.Metadata(ctx)
4✔
188
        if err != nil {
4✔
189
                // TODO if we see errors showing up here.
×
190
                // TODO possibly retry if this is a transient error.
×
191
                // apiErr, ok := err.(*googleapi.Error)
×
192
                log.Println(err) // So we can discover these and add explicit handling.
×
193
                return err
×
194
        }
×
195
        t := ds.Table(pdt.Table)
4✔
196

4✔
197
        meta, err := t.Metadata(ctx)
4✔
198
        if err != nil {
6✔
199
                return err
2✔
200
        }
2✔
201

202
        // If a partition field is set, enforce partition filtering.
203
        // This prevents unintentional full data scans on large tables.
204
        requirePartition := partitioning != nil && partitioning.Field != ""
2✔
205

2✔
206
        // If table already exists, attempt to update the schema.
2✔
207
        changes := bigquery.TableMetadataToUpdate{
2✔
208
                Schema:                 schema,
2✔
209
                RequirePartitionFilter: requirePartition,
2✔
210
        }
2✔
211

2✔
212
        _, err = t.Update(ctx, changes, meta.ETag)
2✔
213
        return err
2✔
214
}
215

216
// CreateTable will create a new table, or fail if the table already exists.
217
// It will also set appropriate time-partitioning field and clustering fields
218
// if non-nil arguments are provided.  Returns error if the dataset does not
219
// already exist, or if other errors are encountered.
220
func (pdt PDT) CreateTable(ctx context.Context, client *bigquery.Client, schema bigquery.Schema, description string,
221
        partitioning *bigquery.TimePartitioning, clustering *bigquery.Clustering) error {
6✔
222
        ds := client.Dataset(pdt.Dataset)
6✔
223

6✔
224
        if _, err := ds.Metadata(ctx); err != nil {
8✔
225
                // TODO if we see errors showing up here.
2✔
226
                // TODO possibly retry if this is a transient error.
2✔
227
                // apiErr, ok := err.(*googleapi.Error)
2✔
228
                log.Println(err) // So we can discover these and add explicit handling.
2✔
229
                return err
2✔
230
        }
2✔
231

232
        t := ds.Table(pdt.Table)
4✔
233

4✔
234
        // If a partition field is set, enforce partition filtering.
4✔
235
        // This prevents unintentional full data scans on large tables.
4✔
236
        requirePartition := partitioning != nil && partitioning.Field != ""
4✔
237

4✔
238
        meta := &bigquery.TableMetadata{
4✔
239
                Schema:                 schema,
4✔
240
                TimePartitioning:       partitioning,
4✔
241
                RequirePartitionFilter: requirePartition,
4✔
242
                Clustering:             clustering,
4✔
243
                Description:            description,
4✔
244
        }
4✔
245

4✔
246
        err := t.Create(ctx, meta)
4✔
247

4✔
248
        if err != nil {
8✔
249
                // TODO if we see errors showing up here.
4✔
250
                // TODO possibly retry if this is a transient error.
4✔
251
                // apiErr, ok := err.(*googleapi.Error)
4✔
252
                log.Println(err) // So we can discover these and add explicit handling.
4✔
253
                return err
4✔
254
        }
4✔
255

256
        return nil
×
257
}
258

259
// SchemaDoc contains bigquery.Schema field Descriptions as read from an auxiliary source, such as YAML.
260
type SchemaDoc map[string]map[string]string
261

262
// NewSchemaDoc reads the given file and attempts to parse it as a SchemaDoc. Errors are fatal.
263
func NewSchemaDoc(docs []byte) SchemaDoc {
2✔
264
        sd := SchemaDoc{}
2✔
265
        err := yaml.Unmarshal(docs, &sd)
2✔
266
        rtx.Must(err, "Failed to unmarshal schema doc")
2✔
267
        return sd
2✔
268
}
2✔
269

270
// UpdateSchemaDescription walks each field in the given schema and assigns the
271
// Description field in place using values found in the given SchemaDoc.
272
func UpdateSchemaDescription(schema bigquery.Schema, docs SchemaDoc) error {
4✔
273
        WalkSchema(
4✔
274
                schema, func(prefix []string, field *bigquery.FieldSchema) error {
24✔
275
                        var ok bool
20✔
276
                        var d map[string]string
20✔
277
                        // Starting with the longest prefix, stop looking for descriptions on first match.
20✔
278
                        for start := 0; start < len(prefix) && !ok; start++ {
44✔
279
                                path := strings.Join(prefix[start:], ".")
24✔
280
                                d, ok = docs[path]
24✔
281
                        }
24✔
282
                        if !ok {
24✔
283
                                // This is not an error, the field simply doesn't have extra description.
4✔
284
                                return nil
4✔
285
                        }
4✔
286
                        if field.Description != "" && field.Description != d["Description"] {
16✔
287
                                log.Printf("WARNING: Overwriting existing description for %q: %q vs %q",
×
288
                                        field.Name, field.Description, d["Description"])
×
289
                        }
×
290
                        field.Description = d["Description"]
16✔
291
                        return nil
16✔
292
                },
293
        )
294
        return nil
4✔
295
}
296

297
// WalkSchema visits every FieldSchema object in the given schema by calling the visit function.
298
// The prefix is a path of field names from the top level to the current Field.
299
func WalkSchema(schema bigquery.Schema, visit func(prefix []string, field *bigquery.FieldSchema) error) error {
6✔
300
        return walkSchema([]string{}, schema, visit)
6✔
301
}
6✔
302

303
func walkSchema(prefix []string, schema bigquery.Schema, visit func(prefix []string, field *bigquery.FieldSchema) error) error {
12✔
304
        for _, field := range schema {
40✔
305
                path := append(prefix, field.Name)
28✔
306
                err := visit(path, field)
28✔
307
                if err != nil {
30✔
308
                        return err
2✔
309
                }
2✔
310
                if field.Type == bigquery.RecordFieldType {
32✔
311
                        err = walkSchema(path, field.Schema, visit)
6✔
312
                        if err != nil {
8✔
313
                                return err
2✔
314
                        }
2✔
315
                }
316
        }
317
        return nil
8✔
318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc