• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

m-lab / etl / 7433

13 Dec 2022 08:37PM UTC coverage: 67.384% (+0.2%) from 67.226%
7433

Pull #1108

travis-ci-com

web-flow
Merge bda76a95a into 163dadfe6
Pull Request #1108: Add capacity to filter select client_name values

35 of 35 new or added lines in 1 file covered. (100.0%)

3351 of 4973 relevant lines covered (67.38%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.63
/active/active.go
1
// Package active provides code for managing processing of an entire
2
// directory of task files.
3
package active
4

5
// GCSSource iterates over a gs:// prefix to produce Runnables.
6
//
7
// TODO: Throttle combines a source of Runnables with a mechanism for blocking the Next() function
8

9
// TODO:
10
// A. Add metrics
11
// B. Add retry
12
// C. Recovery and monitoring using datastore.
13
import (
14
        "context"
15
        "log"
16
        "regexp"
17
        "sync"
18
        "time"
19

20
        "google.golang.org/api/iterator"
21

22
        "cloud.google.com/go/storage"
23
        "github.com/m-lab/etl-gardener/tracker"
24
        "github.com/m-lab/etl/etl"
25
        "github.com/m-lab/etl/metrics"
26
        "github.com/m-lab/go/cloud/gcs"
27
        "github.com/m-lab/go/logx"
28
)
29

30
var debug = logx.Debug
31

32
/* Discussion:
33
 What should happen if:
34
        storage context expires, or is canceled?
35
                Next() will return context error.
36
        storage errors?
37
                Next() will return any objects iterated prior to error.
38
                Next() will then return the error.
39

40
   streamToPending should run to completion unless the context expires,
41
   or there is a persistent gs error.
42
*/
43

44
// FileLister defines a function that returns a list of storage.ObjectAttrs.
45
type FileLister func(ctx context.Context) ([]*storage.ObjectAttrs, int64, error)
46

47
// FileListerFunc creates a function that returns a slice of *storage.ObjectAttrs.
48
// On certain GCS errors, it may return partial result and an error.
49
// TODO - consider moving this to GardenerAPI.
50
func FileListerFunc(bh *gcs.BucketHandle, prefix string, filter *regexp.Regexp) FileLister {
1✔
51
        return func(ctx context.Context) ([]*storage.ObjectAttrs, int64, error) {
2✔
52
                return bh.GetFilesSince(ctx, prefix, filter, time.Time{})
1✔
53
        }
1✔
54
}
55

56
// Context implements context.Context, but allows injection of an alternate Err().
57
type Context struct {
58
        context.Context
59
        cancel   func()
60
        lock     sync.Mutex
61
        otherErr error // Should access with atomics.
62
}
63

64
// Err returns nil, otherErr, or Context.Err()
65
func (c *Context) Err() error {
1✔
66
        c.lock.Lock()
1✔
67
        defer c.lock.Unlock()
1✔
68
        select {
1✔
69
        case <-c.Done():
1✔
70
                if c.otherErr != nil {
2✔
71
                        return c.otherErr
1✔
72
                }
1✔
73
                return c.Context.Err()
1✔
74
        default:
×
75
                return nil
×
76
        }
77
}
78

79
// Fail cancels the context, and sets the result of context.Err()
80
func (c *Context) Fail(err error) {
1✔
81
        debug.Output(2, "stopping")
1✔
82
        c.lock.Lock()
1✔
83
        defer c.lock.Unlock()
1✔
84
        c.otherErr = err
1✔
85
        c.cancel()
1✔
86
}
1✔
87

88
// WithFail wraps a context to allow specifying custom error with Fail()
89
func WithFail(ctx context.Context) *Context {
1✔
90
        ctx, cancel := context.WithCancel(ctx)
1✔
91
        return &Context{Context: ctx, cancel: cancel}
1✔
92
}
1✔
93

94
// GCSSource implements RunnableSource for a GCS bucket/prefix.
95
type GCSSource struct {
96
        ctx *Context
97
        // The fileLister produces the list of source files.
98
        fileLister FileLister
99
        // toRunnable creates a Runnable from ObjectAttrs.
100
        toRunnable func(*storage.ObjectAttrs) Runnable
101

102
        pendingChan chan Runnable
103

104
        label string
105
}
106

107
// NewGCSSource creates a new source for active processing.
108
func NewGCSSource(ctx context.Context, job tracker.Job, fl FileLister, toRunnable func(*storage.ObjectAttrs) Runnable) (*GCSSource, error) {
1✔
109
        src := GCSSource{
1✔
110
                ctx:        WithFail(ctx),
1✔
111
                fileLister: fl,
1✔
112
                toRunnable: toRunnable,
1✔
113

1✔
114
                pendingChan: make(chan Runnable, 0),
1✔
115
                label:       job.Path(),
1✔
116
        }
1✔
117

1✔
118
        go src.streamToPending(ctx, job)
1✔
119

1✔
120
        return &src, nil
1✔
121
}
1✔
122

123
// Label implements Source.Label
124
func (src *GCSSource) Label() string {
1✔
125
        return src.label
1✔
126
}
1✔
127

128
// CancelStreaming terminates the streaming goroutine context.
129
func (src *GCSSource) CancelStreaming() {
1✔
130
        src.ctx.cancel()
1✔
131
}
1✔
132

133
// Next implements RunnableSource.  It returns
134
//    the next pending job to run, OR
135
//    iterator.Done OR
136
//    ctx.Err() OR
137
//    gcs error
138
func (src *GCSSource) Next(ctx context.Context) (Runnable, error) {
1✔
139
        select {
1✔
140
        // This allows exit if pendingChan is blocking.
141
        case <-ctx.Done():
×
142
                return nil, ctx.Err()
×
143
        case next, ok := <-src.pendingChan:
1✔
144
                // This ensures that context expirations and errors are respected.
1✔
145
                select {
1✔
146
                case <-ctx.Done():
1✔
147
                        log.Println("Context expired in Next")
1✔
148
                        metrics.ActiveErrors.WithLabelValues(src.Label(), "context expired in Next").Inc()
1✔
149
                        return nil, ctx.Err()
1✔
150
                case <-src.ctx.Done():
1✔
151
                        log.Println(src.ctx.Err())
1✔
152
                        return nil, src.ctx.Err()
1✔
153
                default:
1✔
154
                        if ok {
2✔
155
                                return next, nil
1✔
156
                        }
1✔
157
                        debug.Println("iterator.Done")
1✔
158
                        return nil, iterator.Done
1✔
159
                }
160
        }
161
}
162

163
// streamToPending feeds tasks to the pending queue until all files have been submitted.
164
// It fetches the list of files once, then converts files to Runnables until all files are
165
// handled, or the context is canceled or expires.
166
// The Runnables are pulled from the queue by Next().
167
func (src *GCSSource) streamToPending(ctx context.Context, job tracker.Job) {
1✔
168
        // No matter what else happens, we eventually want to close the pendingChan.
1✔
169
        defer close(src.pendingChan)
1✔
170

1✔
171
        files, _, err := src.fileLister(ctx)
1✔
172
        if err != nil {
2✔
173
                log.Println("Error streaming", err)
1✔
174
                metrics.ActiveErrors.WithLabelValues(src.Label(), "filelister").Inc()
1✔
175
                src.ctx.Fail(err)
1✔
176
                return
1✔
177
        }
1✔
178

179
        index := 0
1✔
180
        dataType := etl.DataType(job.Datatype)
1✔
181
        skipCount := dataType.SkipCount()
1✔
182

1✔
183
        for _, f := range files {
2✔
184
                debug.Println(f)
1✔
185
                if f == nil {
1✔
186
                        log.Println("Nil file!!")
×
187
                        metrics.ActiveErrors.WithLabelValues(src.Label(), "nil file").Inc()
×
188
                        continue
×
189
                }
190
                // We abandon remaining items if context expires.
191
                if ctx.Err() != nil {
1✔
192
                        metrics.ActiveErrors.WithLabelValues(src.Label(), "streamToPending").Inc()
×
193
                        break
×
194
                }
195

196
                if index%(skipCount+1) == 0 {
2✔
197
                        debug.Printf("Adding gs://%s/%s", f.Bucket, f.Name)
1✔
198
                        // Blocks until consumer reads channel.
1✔
199
                        src.pendingChan <- src.toRunnable(f)
1✔
200
                }
1✔
201
                index++
1✔
202
        }
203
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc