• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01875e2f-959c-4c4d-87af-1d7805759bcc

08 Apr 2023 12:26AM UTC coverage: 57.178% (+0.1%) from 57.072%
01875e2f-959c-4c4d-87af-1d7805759bcc

Pull #5197

buildkite

Steven L
bad cleanup -> good cleanup
Pull Request #5197: Demonstrate a way to get rid of the cadence-idl repo

85396 of 149351 relevant lines covered (57.18%)

2283.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.64
/common/task/weightedRoundRobinTaskScheduler.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "errors"
25
        "fmt"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/backoff"
32
        "github.com/uber/cadence/common/dynamicconfig"
33
        "github.com/uber/cadence/common/log"
34
        "github.com/uber/cadence/common/log/tag"
35
        "github.com/uber/cadence/common/metrics"
36
)
37

38
type (
39
        // WeightedRoundRobinTaskSchedulerOptions configs WRR task scheduler
40
        WeightedRoundRobinTaskSchedulerOptions struct {
41
                Weights         dynamicconfig.MapPropertyFn
42
                QueueSize       int
43
                WorkerCount     dynamicconfig.IntPropertyFn
44
                DispatcherCount int
45
                RetryPolicy     backoff.RetryPolicy
46
        }
47

48
        weightedRoundRobinTaskSchedulerImpl struct {
49
                sync.RWMutex
50

51
                status       int32
52
                weights      atomic.Value // store the currently used weights
53
                taskChs      map[int]chan PriorityTask
54
                shutdownCh   chan struct{}
55
                notifyCh     chan struct{}
56
                dispatcherWG sync.WaitGroup
57
                logger       log.Logger
58
                metricsScope metrics.Scope
59
                options      *WeightedRoundRobinTaskSchedulerOptions
60

61
                processor Processor
62
        }
63
)
64

65
const (
66
        wRRTaskProcessorQueueSize    = 1
67
        defaultUpdateWeightsInterval = 5 * time.Second
68
)
69

70
var (
71
        // ErrTaskSchedulerClosed is the error returned when submitting task to a stopped scheduler
72
        ErrTaskSchedulerClosed = errors.New("task scheduler has already shutdown")
73
)
74

75
// NewWeightedRoundRobinTaskScheduler creates a new WRR task scheduler
76
func NewWeightedRoundRobinTaskScheduler(
77
        logger log.Logger,
78
        metricsClient metrics.Client,
79
        options *WeightedRoundRobinTaskSchedulerOptions,
80
) (Scheduler, error) {
24✔
81
        weights, err := common.ConvertDynamicConfigMapPropertyToIntMap(options.Weights())
24✔
82
        if err != nil {
24✔
83
                return nil, err
×
84
        }
×
85

86
        if len(weights) == 0 {
24✔
87
                return nil, errors.New("weight is not specified in the scheduler option")
×
88
        }
×
89

90
        scheduler := &weightedRoundRobinTaskSchedulerImpl{
24✔
91
                status:       common.DaemonStatusInitialized,
24✔
92
                taskChs:      make(map[int]chan PriorityTask),
24✔
93
                shutdownCh:   make(chan struct{}),
24✔
94
                notifyCh:     make(chan struct{}, 1),
24✔
95
                logger:       logger,
24✔
96
                metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
24✔
97
                options:      options,
24✔
98
                processor: NewParallelTaskProcessor(
24✔
99
                        logger,
24✔
100
                        metricsClient,
24✔
101
                        &ParallelTaskProcessorOptions{
24✔
102
                                QueueSize:   wRRTaskProcessorQueueSize,
24✔
103
                                WorkerCount: options.WorkerCount,
24✔
104
                                RetryPolicy: options.RetryPolicy,
24✔
105
                        },
24✔
106
                ),
24✔
107
        }
24✔
108
        scheduler.weights.Store(weights)
24✔
109

24✔
110
        return scheduler, nil
24✔
111
}
112

113
func (w *weightedRoundRobinTaskSchedulerImpl) Start() {
18✔
114
        if !atomic.CompareAndSwapInt32(&w.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
18✔
115
                return
×
116
        }
×
117

118
        w.processor.Start()
18✔
119

18✔
120
        w.dispatcherWG.Add(w.options.DispatcherCount)
18✔
121
        for i := 0; i != w.options.DispatcherCount; i++ {
42✔
122
                go w.dispatcher()
24✔
123
        }
24✔
124
        go w.updateWeights()
18✔
125

18✔
126
        w.logger.Info("Weighted round robin task scheduler started.")
18✔
127
}
128

129
func (w *weightedRoundRobinTaskSchedulerImpl) Stop() {
18✔
130
        if !atomic.CompareAndSwapInt32(&w.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
18✔
131
                return
×
132
        }
×
133

134
        close(w.shutdownCh)
18✔
135

18✔
136
        w.processor.Stop()
18✔
137

18✔
138
        w.RLock()
18✔
139
        for _, taskCh := range w.taskChs {
39✔
140
                drainAndNackPriorityTask(taskCh)
21✔
141
        }
21✔
142
        w.RUnlock()
18✔
143

18✔
144
        if success := common.AwaitWaitGroup(&w.dispatcherWG, time.Minute); !success {
18✔
145
                w.logger.Warn("Weighted round robin task scheduler timedout on shutdown.")
×
146
        }
×
147

148
        w.logger.Info("Weighted round robin task scheduler shutdown.")
18✔
149
}
150

151
func (w *weightedRoundRobinTaskSchedulerImpl) Submit(task PriorityTask) error {
5,539✔
152
        w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
5,539✔
153
        sw := w.metricsScope.StartTimer(metrics.PriorityTaskSubmitLatency)
5,539✔
154
        defer sw.Stop()
5,539✔
155

5,539✔
156
        if w.isStopped() {
10,441✔
157
                return ErrTaskSchedulerClosed
4,902✔
158
        }
4,902✔
159

160
        taskCh, err := w.getOrCreateTaskChan(task.Priority())
637✔
161
        if err != nil {
638✔
162
                return err
1✔
163
        }
1✔
164

165
        select {
636✔
166
        case taskCh <- task:
635✔
167
                w.notifyDispatcher()
635✔
168
                if w.isStopped() {
635✔
169
                        drainAndNackPriorityTask(taskCh)
×
170
                }
×
171
                return nil
635✔
172
        case <-w.shutdownCh:
1✔
173
                return ErrTaskSchedulerClosed
1✔
174
        }
175
}
176

177
func (w *weightedRoundRobinTaskSchedulerImpl) TrySubmit(
178
        task PriorityTask,
179
) (bool, error) {
14,303✔
180
        if w.isStopped() {
19,157✔
181
                return false, ErrTaskSchedulerClosed
4,854✔
182
        }
4,854✔
183

184
        taskCh, err := w.getOrCreateTaskChan(task.Priority())
9,452✔
185
        if err != nil {
9,452✔
186
                return false, err
×
187
        }
×
188

189
        select {
9,452✔
190
        case taskCh <- task:
9,451✔
191
                w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
9,451✔
192
                if w.isStopped() {
9,451✔
193
                        drainAndNackPriorityTask(taskCh)
×
194
                } else {
9,451✔
195
                        w.notifyDispatcher()
9,451✔
196
                }
9,451✔
197
                return true, nil
9,451✔
198
        case <-w.shutdownCh:
×
199
                return false, ErrTaskSchedulerClosed
×
200
        default:
1✔
201
                return false, nil
1✔
202
        }
203
}
204

205
func (w *weightedRoundRobinTaskSchedulerImpl) dispatcher() {
26✔
206
        defer w.dispatcherWG.Done()
26✔
207

26✔
208
        outstandingTasks := false
26✔
209
        taskChs := make(map[int]chan PriorityTask)
26✔
210

26✔
211
        for {
17,163✔
212
                if !outstandingTasks {
25,807✔
213
                        // if no task is dispatched in the last round,
8,670✔
214
                        // wait for a notification
8,670✔
215
                        select {
8,670✔
216
                        case <-w.notifyCh:
8,651✔
217
                                // block until there's a new task
218
                        case <-w.shutdownCh:
22✔
219
                                return
22✔
220
                        }
221
                }
222

223
                outstandingTasks = false
17,118✔
224
                w.updateTaskChs(taskChs)
17,118✔
225
                weights := w.getWeights()
17,118✔
226
                for priority, taskCh := range taskChs {
37,526✔
227
                        for i := 0; i < weights[priority]; i++ {
7,760,298✔
228
                                select {
7,739,890✔
229
                                case task := <-taskCh:
8,949✔
230
                                        // dispatched at least one task in this round
8,949✔
231
                                        outstandingTasks = true
8,949✔
232

8,949✔
233
                                        if err := w.processor.Submit(task); err != nil {
8,953✔
234
                                                w.logger.Error("fail to submit task to processor", tag.Error(err))
4✔
235
                                                task.Nack()
4✔
236
                                        }
4✔
237
                                case <-w.shutdownCh:
4✔
238
                                        return
4✔
239
                                default:
7,730,940✔
240
                                        // if no task, don't block. Skip to next priority
7,730,940✔
241
                                        break
7,730,940✔
242
                                }
243
                        }
244
                }
245
        }
246
}
247

248
func (w *weightedRoundRobinTaskSchedulerImpl) getOrCreateTaskChan(
249
        priority int,
250
) (chan PriorityTask, error) {
10,089✔
251
        if _, ok := w.getWeights()[priority]; !ok {
10,090✔
252
                return nil, fmt.Errorf("unknown task priority: %v", priority)
1✔
253
        }
1✔
254

255
        w.RLock()
10,088✔
256
        if taskCh, ok := w.taskChs[priority]; ok {
20,152✔
257
                w.RUnlock()
10,064✔
258
                return taskCh, nil
10,064✔
259
        }
10,064✔
260
        w.RUnlock()
27✔
261

27✔
262
        w.Lock()
27✔
263
        defer w.Unlock()
27✔
264
        if taskCh, ok := w.taskChs[priority]; ok {
27✔
265
                return taskCh, nil
×
266
        }
×
267
        taskCh := make(chan PriorityTask, w.options.QueueSize)
27✔
268
        w.taskChs[priority] = taskCh
27✔
269
        return taskCh, nil
27✔
270
}
271

272
func (w *weightedRoundRobinTaskSchedulerImpl) updateTaskChs(taskChs map[int]chan PriorityTask) {
17,118✔
273
        w.RLock()
17,118✔
274
        defer w.RUnlock()
17,118✔
275

17,118✔
276
        for priority, taskCh := range w.taskChs {
37,530✔
277
                if _, ok := taskChs[priority]; !ok {
20,449✔
278
                        taskChs[priority] = taskCh
37✔
279
                }
37✔
280
        }
281
}
282

283
func (w *weightedRoundRobinTaskSchedulerImpl) notifyDispatcher() {
10,086✔
284
        select {
10,086✔
285
        case w.notifyCh <- struct{}{}:
8,654✔
286
                // sent a notification to the dispatcher
287
        default:
1,435✔
288
                // do not block if there's already a notification
289
        }
290
}
291

292
func (w *weightedRoundRobinTaskSchedulerImpl) getWeights() map[int]int {
27,204✔
293
        return w.weights.Load().(map[int]int)
27,204✔
294
}
27,204✔
295

296
func (w *weightedRoundRobinTaskSchedulerImpl) updateWeights() {
18✔
297
        ticker := time.NewTicker(defaultUpdateWeightsInterval)
18✔
298
        for {
428✔
299
                select {
410✔
300
                case <-ticker.C:
395✔
301
                        weights, err := common.ConvertDynamicConfigMapPropertyToIntMap(w.options.Weights())
395✔
302
                        if err != nil {
395✔
303
                                w.logger.Error("failed to update weight for round robin task scheduler", tag.Error(err))
×
304
                        } else {
395✔
305
                                w.weights.Store(weights)
395✔
306
                        }
395✔
307
                case <-w.shutdownCh:
18✔
308
                        ticker.Stop()
18✔
309
                        return
18✔
310
                }
311
        }
312
}
313

314
func (w *weightedRoundRobinTaskSchedulerImpl) isStopped() bool {
29,925✔
315
        return atomic.LoadInt32(&w.status) == common.DaemonStatusStopped
29,925✔
316
}
29,925✔
317

318
func drainAndNackPriorityTask(taskCh <-chan PriorityTask) {
21✔
319
        for {
178✔
320
                select {
157✔
321
                case task := <-taskCh:
136✔
322
                        task.Nack()
136✔
323
                default:
21✔
324
                        return
21✔
325
                }
326
        }
327
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc