• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

m-lab / autoloader / #613096281

10 Nov 2023 08:47PM UTC coverage: 94.62%. First build
#613096281

Pull #29

travis-ci

Pull Request #29: Add v2 gcs package

117 of 138 new or added lines in 3 files covered. (84.78%)

510 of 539 relevant lines covered (94.62%)

1.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.67
/gcs/v2/gcs.go
1
package v2
2

3
import (
4
        "context"
5
        "fmt"
6
        "log"
7
        "path"
8
        "regexp"
9
        "strings"
10

11
        "cloud.google.com/go/storage"
12
        "github.com/m-lab/autoloader/api"
13
        apiv2 "github.com/m-lab/autoloader/api/v2"
14
        "github.com/m-lab/autoloader/gcs"
15
        "github.com/m-lab/go/storagex"
16
        "google.golang.org/api/iterator"
17
)
18

19
var (
20
        project = regexp.MustCompile("(mlab|measurement)-.*")
21
)
22

23
const (
24
        prefix           = "autoload/v2/"
25
        schemaFileSuffix = ".table.json"
26
)
27

28
// ClientV2 is the V2 client used to interact with Google Cloud Storage.
29
type ClientV2 struct {
30
        Buckets []*storagex.Bucket
31
}
32

33
// BucketV2 represents a V2 GCS bucket.
34
type BucketV2 struct {
35
        *storagex.Bucket          // Bucket performs storage.BucketHandle operations.
36
        Organizations    []string // Organizations uploading data to the bucket.
37
}
38

39
// NewClient returns a new Client for the specified bucket names.
1✔
40
func NewClient(c *storage.Client, names []string) *ClientV2 {
1✔
41
        buckets := make([]*storagex.Bucket, 0)
2✔
42
        for _, name := range names {
1✔
43
                bh := c.Bucket(name)
1✔
44
                buckets = append(buckets, storagex.NewBucket(bh))
1✔
45
        }
46

1✔
47
        return &ClientV2{
1✔
48
                Buckets: buckets,
1✔
49
        }
50
}
51

52
// GetDatatypes gets a list of datatypes for the ClientV2's buckets.
1✔
53
func (c *ClientV2) GetDatatypes(ctx context.Context) []*api.Datatype {
1✔
54
        datatypes := make([]*api.Datatype, 0)
1✔
55

2✔
56
        for _, bucket := range c.Buckets {
1✔
57
                orgs, err := getBucketOrgs(ctx, bucket)
1✔
NEW
58
                if err != nil {
×
NEW
59
                        log.Printf("failed to get organizations for bucket: %v", err)
×
60
                        continue
61
                }
1✔
62
                b := &BucketV2{Bucket: bucket, Organizations: orgs}
1✔
63

2✔
64
                b.Walk(ctx, path.Join(prefix, "tables"), func(schema *storagex.Object) error {
1✔
65
                        dts, err := getDatatypes(ctx, b, schema)
1✔
NEW
66
                        if err != nil {
×
NEW
67
                                log.Printf("failed to get datatypes for schema: %v", err)
×
68
                                return err
69
                        }
1✔
70

1✔
71
                        datatypes = append(datatypes, dts...)
72
                        return nil
73
                })
74
        }
1✔
75

76
        return datatypes
77
}
1✔
78

1✔
79
// getBucketOrgs gets the list of organizations uploading data to a bucket.
1✔
80
func getBucketOrgs(ctx context.Context, b *storagex.Bucket) ([]string, error) {
1✔
81
        orgs := make([]string, 0)
1✔
82

1✔
83
        it := b.Objects(ctx, &storage.Query{
1✔
84
                Prefix:    prefix,
1✔
85
                Delimiter: "/",
2✔
86
        })
1✔
87

2✔
88
        for {
1✔
89
                attr, err := it.Next()
1✔
90
                if err == iterator.Done {
91
                        return orgs, nil
1✔
NEW
92
                }
×
NEW
93

×
94
                if err != nil {
95
                        return nil, err
1✔
NEW
96
                }
×
97

98
                parts := strings.Split(attr.Prefix, "/")
99
                if len(parts) != 4 || parts[2] == "tables" {
1✔
100
                        continue
2✔
101
                }
1✔
102
                orgs = append(orgs, parts[2])
103
        }
1✔
104
}
105

106
// getDatatypes gets the list of datatypes for a schema.
107
func getDatatypes(ctx context.Context, b *BucketV2, schema *storagex.Object) ([]*api.Datatype, error) {
1✔
108
        file, err := gcs.ReadFile(ctx, schema.ObjectHandle)
1✔
109
        if err != nil || len(file) == 0 {
1✔
NEW
110
                return nil, fmt.Errorf("invalid schema file under %s", schema.Name)
×
NEW
111
        }
×
112

113
        attrs, err := b.Attrs(ctx)
1✔
114
        if err != nil {
1✔
NEW
115
                return nil, err
×
NEW
116
        }
×
117

118
        path, err := NewSchemaPath(ctx, b, schema.Name)
1✔
119
        if err != nil {
1✔
NEW
120
                return nil, err
×
NEW
121
        }
×
122

123
        dts := make([]*api.Datatype, 0)
1✔
124
        for _, org := range path.Organizations {
2✔
125
                opts := api.DatatypeOpts{
1✔
126
                        Name:         path.Datatype,
1✔
127
                        Experiment:   path.Experiment,
1✔
128
                        Organization: org,
1✔
129
                        Version:      "v2",
1✔
130
                        Location:     attrs.Location,
1✔
131
                        Schema:       file,
1✔
132
                        UpdatedTime:  schema.ObjectAttrs.Updated,
1✔
133
                        Bucket:       b.Bucket,
1✔
134
                }
1✔
135
                dts = append(dts, getDatatype(schema.Bucket, opts))
1✔
136
        }
1✔
137

138
        return dts, nil
1✔
139
}
140

141
// getDatatype creates a single datatype based on its project.
1✔
142
func getDatatype(bucketName string, opts api.DatatypeOpts) *api.Datatype {
1✔
143
        proj := project.FindString(bucketName)
1✔
NEW
144
        switch proj {
×
NEW
145
        case "mlab-autojoin":
×
NEW
146
                fallthrough
×
NEW
147
        case "mlab-thirdparty":
×
148
                return apiv2.NewBYODatatype(opts, proj)
1✔
149
        default:
1✔
150
                return apiv2.NewMlabDatatype(opts)
151
        }
152
}
153

154
// GetDirs returns all the directory paths for a datatype within a start (inclusive) and
NEW
155
// end (exclusive) date.
×
NEW
156
func (c *ClientV2) GetDirs(ctx context.Context, dt *api.Datatype, start, end string) ([]gcs.Dir, error) {
×
NEW
157
        p := path.Join(prefix, dt.Organization, dt.Experiment, dt.Name)
×
NEW
158
        return gcs.GetDirs(ctx, dt, p, start, end)
×
159
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc