• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0188269e-b660-4822-9726-317945cb1d84

16 May 2023 10:31PM UTC coverage: 57.238% (-0.08%) from 57.319%
0188269e-b660-4822-9726-317945cb1d84

Pull #5289

buildkite

Ketsia
fix lint second attempt
Pull Request #5289: [CDNC-3578] Worklfow start metric

4 of 4 new or added lines in 1 file covered. (100.0%)

86922 of 151860 relevant lines covered (57.24%)

2479.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.75
/common/task/parallelTaskProcessor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "errors"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/backoff"
32
        "github.com/uber/cadence/common/dynamicconfig"
33
        "github.com/uber/cadence/common/log"
34
        "github.com/uber/cadence/common/metrics"
35
)
36

37
type (
38
        // ParallelTaskProcessorOptions configs PriorityTaskProcessor
39
        ParallelTaskProcessorOptions struct {
40
                QueueSize   int
41
                WorkerCount dynamicconfig.IntPropertyFn
42
                RetryPolicy backoff.RetryPolicy
43
        }
44

45
        parallelTaskProcessorImpl struct {
46
                status           int32
47
                tasksCh          chan Task
48
                shutdownCh       chan struct{}
49
                workerShutdownCh []chan struct{}
50
                shutdownWG       sync.WaitGroup
51
                logger           log.Logger
52
                metricsScope     metrics.Scope
53
                options          *ParallelTaskProcessorOptions
54
        }
55
)
56

57
const (
58
        defaultMonitorTickerDuration = 5 * time.Second
59
)
60

61
var (
62
        // ErrTaskProcessorClosed is the error returned when submiting task to a stopped processor
63
        ErrTaskProcessorClosed = errors.New("task processor has already shutdown")
64
)
65

66
// NewParallelTaskProcessor creates a new PriorityTaskProcessor
67
func NewParallelTaskProcessor(
68
        logger log.Logger,
69
        metricsClient metrics.Client,
70
        options *ParallelTaskProcessorOptions,
71
) Processor {
38✔
72
        return &parallelTaskProcessorImpl{
38✔
73
                status:           common.DaemonStatusInitialized,
38✔
74
                tasksCh:          make(chan Task, options.QueueSize),
38✔
75
                shutdownCh:       make(chan struct{}),
38✔
76
                workerShutdownCh: make([]chan struct{}, 0, options.WorkerCount()),
38✔
77
                logger:           logger,
38✔
78
                metricsScope:     metricsClient.Scope(metrics.ParallelTaskProcessingScope),
38✔
79
                options:          options,
38✔
80
        }
38✔
81
}
38✔
82

83
func (p *parallelTaskProcessorImpl) Start() {
21✔
84
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
21✔
85
                return
×
86
        }
×
87

88
        initialWorkerCount := p.options.WorkerCount()
21✔
89

21✔
90
        p.shutdownWG.Add(initialWorkerCount)
21✔
91
        for i := 0; i < initialWorkerCount; i++ {
2,439✔
92
                shutdownCh := make(chan struct{})
2,418✔
93
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
2,418✔
94
                go p.taskWorker(shutdownCh)
2,418✔
95
        }
2,418✔
96

97
        p.shutdownWG.Add(1)
21✔
98
        go p.workerMonitor(defaultMonitorTickerDuration)
21✔
99

21✔
100
        p.logger.Info("Parallel task processor started.")
21✔
101
}
102

103
func (p *parallelTaskProcessorImpl) Stop() {
21✔
104
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
21✔
105
                return
×
106
        }
×
107

108
        close(p.shutdownCh)
21✔
109

21✔
110
        p.drainAndNackTasks()
21✔
111

21✔
112
        if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
21✔
113
                p.logger.Warn("Parallel task processor timedout on shutdown.")
×
114
        }
×
115
        p.logger.Info("Parallel task processor shutdown.")
21✔
116
}
117

118
func (p *parallelTaskProcessorImpl) Submit(task Task) error {
19,086✔
119
        p.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
19,086✔
120
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
19,086✔
121
        defer sw.Stop()
19,086✔
122

19,086✔
123
        if p.isStopped() {
28,279✔
124
                return ErrTaskProcessorClosed
9,193✔
125
        }
9,193✔
126

127
        select {
9,893✔
128
        case p.tasksCh <- task:
9,890✔
129
                if p.isStopped() {
9,890✔
130
                        p.drainAndNackTasks()
×
131
                }
×
132
                return nil
9,890✔
133
        case <-p.shutdownCh:
3✔
134
                return ErrTaskProcessorClosed
3✔
135
        }
136
}
137

138
func (p *parallelTaskProcessorImpl) taskWorker(shutdownCh chan struct{}) {
2,444✔
139
        defer p.shutdownWG.Done()
2,444✔
140

2,444✔
141
        for {
14,692✔
142
                select {
12,248✔
143
                case <-shutdownCh:
2,444✔
144
                        return
2,444✔
145
                case task := <-p.tasksCh:
9,807✔
146
                        p.executeTask(task, shutdownCh)
9,807✔
147
                }
148
        }
149
}
150

151
func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struct{}) {
9,810✔
152
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskTaskProcessingLatency)
9,810✔
153
        defer sw.Stop()
9,810✔
154

9,810✔
155
        op := func() error {
19,719✔
156
                if err := task.Execute(); err != nil {
10,043✔
157
                        return task.HandleErr(err)
134✔
158
                }
134✔
159
                return nil
9,778✔
160
        }
161

162
        isRetryable := func(err error) bool {
9,944✔
163
                select {
134✔
164
                case <-shutdownCh:
1✔
165
                        return false
1✔
166
                default:
133✔
167
                }
168

169
                return task.RetryErr(err)
133✔
170
        }
171
        throttleRetry := backoff.NewThrottleRetry(
9,810✔
172
                backoff.WithRetryPolicy(p.options.RetryPolicy),
9,810✔
173
                backoff.WithRetryableError(isRetryable),
9,810✔
174
        )
9,810✔
175

9,810✔
176
        if err := throttleRetry.Do(context.Background(), op); err != nil {
9,845✔
177
                // non-retryable error or exhausted all retries or worker shutdown
35✔
178
                task.Nack()
35✔
179
                return
35✔
180
        }
35✔
181

182
        // no error
183
        task.Ack()
9,778✔
184
}
185

186
func (p *parallelTaskProcessorImpl) workerMonitor(tickerDuration time.Duration) {
22✔
187
        defer p.shutdownWG.Done()
22✔
188

22✔
189
        ticker := time.NewTicker(tickerDuration)
22✔
190

22✔
191
        for {
451✔
192
                select {
429✔
193
                case <-p.shutdownCh:
22✔
194
                        ticker.Stop()
22✔
195
                        p.removeWorker(len(p.workerShutdownCh))
22✔
196
                        return
22✔
197
                case <-ticker.C:
410✔
198
                        targetWorkerCount := p.options.WorkerCount()
410✔
199
                        currentWorkerCount := len(p.workerShutdownCh)
410✔
200
                        p.addWorker(targetWorkerCount - currentWorkerCount)
410✔
201
                        p.removeWorker(currentWorkerCount - targetWorkerCount)
410✔
202
                }
203
        }
204
}
205

206
func (p *parallelTaskProcessorImpl) addWorker(count int) {
413✔
207
        for i := 0; i < count; i++ {
438✔
208
                shutdownCh := make(chan struct{})
25✔
209
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
25✔
210

25✔
211
                p.shutdownWG.Add(1)
25✔
212
                go p.taskWorker(shutdownCh)
25✔
213
        }
25✔
214
}
215

216
func (p *parallelTaskProcessorImpl) removeWorker(count int) {
431✔
217
        if count <= 0 {
841✔
218
                return
410✔
219
        }
410✔
220

221
        currentWorkerCount := len(p.workerShutdownCh)
24✔
222
        if count > currentWorkerCount {
24✔
223
                count = currentWorkerCount
×
224
        }
×
225

226
        shutdownChToClose := p.workerShutdownCh[currentWorkerCount-count:]
24✔
227
        p.workerShutdownCh = p.workerShutdownCh[:currentWorkerCount-count]
24✔
228

24✔
229
        for _, shutdownCh := range shutdownChToClose {
2,451✔
230
                close(shutdownCh)
2,427✔
231
        }
2,427✔
232
}
233

234
func (p *parallelTaskProcessorImpl) drainAndNackTasks() {
21✔
235
        for {
125✔
236
                select {
104✔
237
                case task := <-p.tasksCh:
83✔
238
                        task.Nack()
83✔
239
                default:
21✔
240
                        return
21✔
241
                }
242
        }
243
}
244

245
func (p *parallelTaskProcessorImpl) isStopped() bool {
28,973✔
246
        return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped
28,973✔
247
}
28,973✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc