• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 019074f6-fb21-4003-bd1e-30edc2b1e424

02 Jul 2024 07:40PM UTC coverage: 71.509% (-0.01%) from 71.52%
019074f6-fb21-4003-bd1e-30edc2b1e424

push

buildkite

web-flow
Include a zero bucket, zeros are being reported as 1 (#6153)

With a min bucket of `1`, a `0` is reported as a `1`, which is quite misleading for this kind of use.

Sorta obvious in retrospect, but we have so few histograms anywhere that I apparently don't have any safe habits built up.

105299 of 147252 relevant lines covered (71.51%)

2633.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.06
/common/task/parallel_task_processor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/uber/cadence/common"
32
        "github.com/uber/cadence/common/backoff"
33
        "github.com/uber/cadence/common/dynamicconfig"
34
        "github.com/uber/cadence/common/log"
35
        "github.com/uber/cadence/common/log/tag"
36
        "github.com/uber/cadence/common/metrics"
37
)
38

39
type (
40
        // ParallelTaskProcessorOptions configs PriorityTaskProcessor
41
        ParallelTaskProcessorOptions struct {
42
                QueueSize   int
43
                WorkerCount dynamicconfig.IntPropertyFn
44
                RetryPolicy backoff.RetryPolicy
45
        }
46

47
        parallelTaskProcessorImpl struct {
48
                status           int32
49
                tasksCh          chan Task
50
                shutdownCh       chan struct{}
51
                workerShutdownCh []chan struct{}
52
                shutdownWG       sync.WaitGroup
53
                logger           log.Logger
54
                metricsScope     metrics.Scope
55
                options          *ParallelTaskProcessorOptions
56
        }
57
)
58

59
const (
60
        defaultMonitorTickerDuration = 5 * time.Second
61
)
62

63
var (
64
        // ErrTaskProcessorClosed is the error returned when submiting task to a stopped processor
65
        ErrTaskProcessorClosed = errors.New("task processor has already shutdown")
66
)
67

68
// NewParallelTaskProcessor creates a new PriorityTaskProcessor
69
func NewParallelTaskProcessor(
70
        logger log.Logger,
71
        metricsClient metrics.Client,
72
        options *ParallelTaskProcessorOptions,
73
) Processor {
45✔
74
        return &parallelTaskProcessorImpl{
45✔
75
                status:           common.DaemonStatusInitialized,
45✔
76
                tasksCh:          make(chan Task, options.QueueSize),
45✔
77
                shutdownCh:       make(chan struct{}),
45✔
78
                workerShutdownCh: make([]chan struct{}, 0, options.WorkerCount()),
45✔
79
                logger:           logger,
45✔
80
                metricsScope:     metricsClient.Scope(metrics.ParallelTaskProcessingScope),
45✔
81
                options:          options,
45✔
82
        }
45✔
83
}
45✔
84

85
func (p *parallelTaskProcessorImpl) Start() {
27✔
86
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
27✔
87
                return
×
88
        }
×
89

90
        initialWorkerCount := p.options.WorkerCount()
27✔
91

27✔
92
        p.shutdownWG.Add(initialWorkerCount)
27✔
93
        for i := 0; i < initialWorkerCount; i++ {
3,645✔
94
                shutdownCh := make(chan struct{})
3,618✔
95
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
3,618✔
96
                go p.taskWorker(shutdownCh)
3,618✔
97
        }
3,618✔
98

99
        p.shutdownWG.Add(1)
27✔
100
        go p.workerMonitor(defaultMonitorTickerDuration)
27✔
101

27✔
102
        p.logger.Info("Parallel task processor started.")
27✔
103
}
104

105
func (p *parallelTaskProcessorImpl) Stop() {
27✔
106
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
27✔
107
                return
×
108
        }
×
109

110
        close(p.shutdownCh)
27✔
111

27✔
112
        p.drainAndNackTasks()
27✔
113

27✔
114
        if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
27✔
115
                p.logger.Warn("Parallel task processor timedout on shutdown.")
×
116
        }
×
117
        p.logger.Info("Parallel task processor shutdown.")
27✔
118
}
119

120
func (p *parallelTaskProcessorImpl) Submit(task Task) error {
18,471✔
121
        p.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
18,471✔
122
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
18,471✔
123
        defer sw.Stop()
18,471✔
124

18,471✔
125
        if p.isStopped() {
27,359✔
126
                return ErrTaskProcessorClosed
8,888✔
127
        }
8,888✔
128

129
        select {
9,583✔
130
        case p.tasksCh <- task:
9,579✔
131
                if p.isStopped() {
9,579✔
132
                        p.drainAndNackTasks()
×
133
                }
×
134
                return nil
9,579✔
135
        case <-p.shutdownCh:
4✔
136
                return ErrTaskProcessorClosed
4✔
137
        }
138
}
139

140
func (p *parallelTaskProcessorImpl) taskWorker(shutdownCh chan struct{}) {
3,644✔
141
        defer p.shutdownWG.Done()
3,644✔
142

3,644✔
143
        for {
16,772✔
144
                select {
13,128✔
145
                case <-shutdownCh:
3,644✔
146
                        return
3,644✔
147
                case task := <-p.tasksCh:
9,487✔
148
                        p.executeTask(task, shutdownCh)
9,487✔
149
                }
150
        }
151
}
152

153
func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struct{}) {
9,491✔
154
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskTaskProcessingLatency)
9,491✔
155
        defer sw.Stop()
9,491✔
156

9,491✔
157
        defer func() {
18,982✔
158
                if r := recover(); r != nil {
9,492✔
159
                        p.logger.Error("recovered panic in task execution", tag.Dynamic("recovered-panic", r))
1✔
160
                        task.HandleErr(fmt.Errorf("recovered panic: %v", r))
1✔
161
                        task.Nack()
1✔
162
                }
1✔
163
        }()
164

165
        op := func() error {
19,188✔
166
                if err := task.Execute(); err != nil {
9,958✔
167
                        return task.HandleErr(err)
261✔
168
                }
261✔
169
                return nil
9,438✔
170
        }
171

172
        isRetryable := func(err error) bool {
9,752✔
173
                select {
261✔
174
                case <-shutdownCh:
1✔
175
                        return false
1✔
176
                default:
260✔
177
                }
178

179
                return task.RetryErr(err)
260✔
180
        }
181
        throttleRetry := backoff.NewThrottleRetry(
9,491✔
182
                backoff.WithRetryPolicy(p.options.RetryPolicy),
9,491✔
183
                backoff.WithRetryableError(isRetryable),
9,491✔
184
        )
9,491✔
185

9,491✔
186
        if err := throttleRetry.Do(context.Background(), op); err != nil {
9,546✔
187
                // non-retryable error or exhausted all retries or worker shutdown
55✔
188
                task.Nack()
55✔
189
                return
55✔
190
        }
55✔
191

192
        // no error
193
        task.Ack()
9,438✔
194
}
195

196
func (p *parallelTaskProcessorImpl) workerMonitor(tickerDuration time.Duration) {
28✔
197
        defer p.shutdownWG.Done()
28✔
198

28✔
199
        ticker := time.NewTicker(tickerDuration)
28✔
200

28✔
201
        for {
428✔
202
                select {
400✔
203
                case <-p.shutdownCh:
28✔
204
                        ticker.Stop()
28✔
205
                        p.removeWorker(len(p.workerShutdownCh))
28✔
206
                        return
28✔
207
                case <-ticker.C:
375✔
208
                        targetWorkerCount := p.options.WorkerCount()
375✔
209
                        currentWorkerCount := len(p.workerShutdownCh)
375✔
210
                        p.addWorker(targetWorkerCount - currentWorkerCount)
375✔
211
                        p.removeWorker(currentWorkerCount - targetWorkerCount)
375✔
212
                }
213
        }
214
}
215

216
func (p *parallelTaskProcessorImpl) addWorker(count int) {
378✔
217
        for i := 0; i < count; i++ {
403✔
218
                shutdownCh := make(chan struct{})
25✔
219
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
25✔
220

25✔
221
                p.shutdownWG.Add(1)
25✔
222
                go p.taskWorker(shutdownCh)
25✔
223
        }
25✔
224
}
225

226
func (p *parallelTaskProcessorImpl) removeWorker(count int) {
402✔
227
        if count <= 0 {
777✔
228
                return
375✔
229
        }
375✔
230

231
        currentWorkerCount := len(p.workerShutdownCh)
30✔
232
        if count > currentWorkerCount {
30✔
233
                count = currentWorkerCount
×
234
        }
×
235

236
        shutdownChToClose := p.workerShutdownCh[currentWorkerCount-count:]
30✔
237
        p.workerShutdownCh = p.workerShutdownCh[:currentWorkerCount-count]
30✔
238

30✔
239
        for _, shutdownCh := range shutdownChToClose {
3,657✔
240
                close(shutdownCh)
3,627✔
241
        }
3,627✔
242
}
243

244
func (p *parallelTaskProcessorImpl) drainAndNackTasks() {
27✔
245
        for {
146✔
246
                select {
119✔
247
                case task := <-p.tasksCh:
92✔
248
                        task.Nack()
92✔
249
                default:
27✔
250
                        return
27✔
251
                }
252
        }
253
}
254

255
func (p *parallelTaskProcessorImpl) isStopped() bool {
28,047✔
256
        return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped
28,047✔
257
}
28,047✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc