• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01880c2e-e2ba-4ec5-8826-bcc19bd1c271

11 May 2023 07:21PM UTC coverage: 57.263% (-0.07%) from 57.331%
01880c2e-e2ba-4ec5-8826-bcc19bd1c271

push

buildkite

GitHub
Adds tooling (#5283)

369 of 369 new or added lines in 9 files covered. (100.0%)

86922 of 151794 relevant lines covered (57.26%)

2472.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.75
/common/task/parallelTaskProcessor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "errors"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/backoff"
32
        "github.com/uber/cadence/common/dynamicconfig"
33
        "github.com/uber/cadence/common/log"
34
        "github.com/uber/cadence/common/metrics"
35
)
36

37
type (
38
        // ParallelTaskProcessorOptions configs PriorityTaskProcessor
39
        ParallelTaskProcessorOptions struct {
40
                QueueSize   int
41
                WorkerCount dynamicconfig.IntPropertyFn
42
                RetryPolicy backoff.RetryPolicy
43
        }
44

45
        parallelTaskProcessorImpl struct {
46
                status           int32
47
                tasksCh          chan Task
48
                shutdownCh       chan struct{}
49
                workerShutdownCh []chan struct{}
50
                shutdownWG       sync.WaitGroup
51
                logger           log.Logger
52
                metricsScope     metrics.Scope
53
                options          *ParallelTaskProcessorOptions
54
        }
55
)
56

57
const (
58
        defaultMonitorTickerDuration = 5 * time.Second
59
)
60

61
var (
62
        // ErrTaskProcessorClosed is the error returned when submiting task to a stopped processor
63
        ErrTaskProcessorClosed = errors.New("task processor has already shutdown")
64
)
65

66
// NewParallelTaskProcessor creates a new PriorityTaskProcessor
67
func NewParallelTaskProcessor(
68
        logger log.Logger,
69
        metricsClient metrics.Client,
70
        options *ParallelTaskProcessorOptions,
71
) Processor {
38✔
72
        return &parallelTaskProcessorImpl{
38✔
73
                status:           common.DaemonStatusInitialized,
38✔
74
                tasksCh:          make(chan Task, options.QueueSize),
38✔
75
                shutdownCh:       make(chan struct{}),
38✔
76
                workerShutdownCh: make([]chan struct{}, 0, options.WorkerCount()),
38✔
77
                logger:           logger,
38✔
78
                metricsScope:     metricsClient.Scope(metrics.ParallelTaskProcessingScope),
38✔
79
                options:          options,
38✔
80
        }
38✔
81
}
38✔
82

83
func (p *parallelTaskProcessorImpl) Start() {
21✔
84
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
21✔
85
                return
×
86
        }
×
87

88
        initialWorkerCount := p.options.WorkerCount()
21✔
89

21✔
90
        p.shutdownWG.Add(initialWorkerCount)
21✔
91
        for i := 0; i < initialWorkerCount; i++ {
2,439✔
92
                shutdownCh := make(chan struct{})
2,418✔
93
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
2,418✔
94
                go p.taskWorker(shutdownCh)
2,418✔
95
        }
2,418✔
96

97
        p.shutdownWG.Add(1)
21✔
98
        go p.workerMonitor(defaultMonitorTickerDuration)
21✔
99

21✔
100
        p.logger.Info("Parallel task processor started.")
21✔
101
}
102

103
func (p *parallelTaskProcessorImpl) Stop() {
21✔
104
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
21✔
105
                return
×
106
        }
×
107

108
        close(p.shutdownCh)
21✔
109

21✔
110
        p.drainAndNackTasks()
21✔
111

21✔
112
        if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
21✔
113
                p.logger.Warn("Parallel task processor timedout on shutdown.")
×
114
        }
×
115
        p.logger.Info("Parallel task processor shutdown.")
21✔
116
}
117

118
func (p *parallelTaskProcessorImpl) Submit(task Task) error {
18,967✔
119
        p.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
18,967✔
120
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
18,967✔
121
        defer sw.Stop()
18,967✔
122

18,967✔
123
        if p.isStopped() {
28,164✔
124
                return ErrTaskProcessorClosed
9,197✔
125
        }
9,197✔
126

127
        select {
9,770✔
128
        case p.tasksCh <- task:
9,766✔
129
                if p.isStopped() {
9,766✔
130
                        p.drainAndNackTasks()
×
131
                }
×
132
                return nil
9,766✔
133
        case <-p.shutdownCh:
4✔
134
                return ErrTaskProcessorClosed
4✔
135
        }
136
}
137

138
func (p *parallelTaskProcessorImpl) taskWorker(shutdownCh chan struct{}) {
2,444✔
139
        defer p.shutdownWG.Done()
2,444✔
140

2,444✔
141
        for {
14,561✔
142
                select {
12,117✔
143
                case <-shutdownCh:
2,444✔
144
                        return
2,444✔
145
                case task := <-p.tasksCh:
9,676✔
146
                        p.executeTask(task, shutdownCh)
9,676✔
147
                }
148
        }
149
}
150

151
func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struct{}) {
9,679✔
152
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskTaskProcessingLatency)
9,679✔
153
        defer sw.Stop()
9,679✔
154

9,679✔
155
        op := func() error {
19,458✔
156
                if err := task.Execute(); err != nil {
9,915✔
157
                        return task.HandleErr(err)
136✔
158
                }
136✔
159
                return nil
9,646✔
160
        }
161

162
        isRetryable := func(err error) bool {
9,815✔
163
                select {
136✔
164
                case <-shutdownCh:
2✔
165
                        return false
2✔
166
                default:
134✔
167
                }
168

169
                return task.RetryErr(err)
134✔
170
        }
171
        throttleRetry := backoff.NewThrottleRetry(
9,679✔
172
                backoff.WithRetryPolicy(p.options.RetryPolicy),
9,679✔
173
                backoff.WithRetryableError(isRetryable),
9,679✔
174
        )
9,679✔
175

9,679✔
176
        if err := throttleRetry.Do(context.Background(), op); err != nil {
9,715✔
177
                // non-retryable error or exhausted all retries or worker shutdown
36✔
178
                task.Nack()
36✔
179
                return
36✔
180
        }
36✔
181

182
        // no error
183
        task.Ack()
9,646✔
184
}
185

186
func (p *parallelTaskProcessorImpl) workerMonitor(tickerDuration time.Duration) {
22✔
187
        defer p.shutdownWG.Done()
22✔
188

22✔
189
        ticker := time.NewTicker(tickerDuration)
22✔
190

22✔
191
        for {
452✔
192
                select {
430✔
193
                case <-p.shutdownCh:
22✔
194
                        ticker.Stop()
22✔
195
                        p.removeWorker(len(p.workerShutdownCh))
22✔
196
                        return
22✔
197
                case <-ticker.C:
411✔
198
                        targetWorkerCount := p.options.WorkerCount()
411✔
199
                        currentWorkerCount := len(p.workerShutdownCh)
411✔
200
                        p.addWorker(targetWorkerCount - currentWorkerCount)
411✔
201
                        p.removeWorker(currentWorkerCount - targetWorkerCount)
411✔
202
                }
203
        }
204
}
205

206
func (p *parallelTaskProcessorImpl) addWorker(count int) {
414✔
207
        for i := 0; i < count; i++ {
439✔
208
                shutdownCh := make(chan struct{})
25✔
209
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
25✔
210

25✔
211
                p.shutdownWG.Add(1)
25✔
212
                go p.taskWorker(shutdownCh)
25✔
213
        }
25✔
214
}
215

216
func (p *parallelTaskProcessorImpl) removeWorker(count int) {
432✔
217
        if count <= 0 {
843✔
218
                return
411✔
219
        }
411✔
220

221
        currentWorkerCount := len(p.workerShutdownCh)
24✔
222
        if count > currentWorkerCount {
24✔
223
                count = currentWorkerCount
×
224
        }
×
225

226
        shutdownChToClose := p.workerShutdownCh[currentWorkerCount-count:]
24✔
227
        p.workerShutdownCh = p.workerShutdownCh[:currentWorkerCount-count]
24✔
228

24✔
229
        for _, shutdownCh := range shutdownChToClose {
2,451✔
230
                close(shutdownCh)
2,427✔
231
        }
2,427✔
232
}
233

234
func (p *parallelTaskProcessorImpl) drainAndNackTasks() {
21✔
235
        for {
132✔
236
                select {
111✔
237
                case task := <-p.tasksCh:
90✔
238
                        task.Nack()
90✔
239
                default:
21✔
240
                        return
21✔
241
                }
242
        }
243
}
244

245
func (p *parallelTaskProcessorImpl) isStopped() bool {
28,730✔
246
        return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped
28,730✔
247
}
28,730✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc