• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0186ed8c-939f-4cc5-9459-fac2c80653f9

17 Mar 2023 03:30AM UTC coverage: 57.113% (-0.005%) from 57.118%
0186ed8c-939f-4cc5-9459-fac2c80653f9

push

buildkite

GitHub
[history] more cautious in deciding domain state to make decisions on dropping queued tasks (#5164)

1 of 1 new or added line in 1 file covered. (100.0%)

85268 of 149297 relevant lines covered (57.11%)

2300.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.64
/common/task/weightedRoundRobinTaskScheduler.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "errors"
25
        "fmt"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/backoff"
32
        "github.com/uber/cadence/common/dynamicconfig"
33
        "github.com/uber/cadence/common/log"
34
        "github.com/uber/cadence/common/log/tag"
35
        "github.com/uber/cadence/common/metrics"
36
)
37

38
type (
39
        // WeightedRoundRobinTaskSchedulerOptions configs WRR task scheduler
40
        WeightedRoundRobinTaskSchedulerOptions struct {
41
                Weights         dynamicconfig.MapPropertyFn
42
                QueueSize       int
43
                WorkerCount     dynamicconfig.IntPropertyFn
44
                DispatcherCount int
45
                RetryPolicy     backoff.RetryPolicy
46
        }
47

48
        weightedRoundRobinTaskSchedulerImpl struct {
49
                sync.RWMutex
50

51
                status       int32
52
                weights      atomic.Value // store the currently used weights
53
                taskChs      map[int]chan PriorityTask
54
                shutdownCh   chan struct{}
55
                notifyCh     chan struct{}
56
                dispatcherWG sync.WaitGroup
57
                logger       log.Logger
58
                metricsScope metrics.Scope
59
                options      *WeightedRoundRobinTaskSchedulerOptions
60

61
                processor Processor
62
        }
63
)
64

65
const (
66
        wRRTaskProcessorQueueSize    = 1
67
        defaultUpdateWeightsInterval = 5 * time.Second
68
)
69

70
var (
71
        // ErrTaskSchedulerClosed is the error returned when submitting task to a stopped scheduler
72
        ErrTaskSchedulerClosed = errors.New("task scheduler has already shutdown")
73
)
74

75
// NewWeightedRoundRobinTaskScheduler creates a new WRR task scheduler
76
func NewWeightedRoundRobinTaskScheduler(
77
        logger log.Logger,
78
        metricsClient metrics.Client,
79
        options *WeightedRoundRobinTaskSchedulerOptions,
80
) (Scheduler, error) {
24✔
81
        weights, err := common.ConvertDynamicConfigMapPropertyToIntMap(options.Weights())
24✔
82
        if err != nil {
24✔
83
                return nil, err
×
84
        }
×
85

86
        if len(weights) == 0 {
24✔
87
                return nil, errors.New("weight is not specified in the scheduler option")
×
88
        }
×
89

90
        scheduler := &weightedRoundRobinTaskSchedulerImpl{
24✔
91
                status:       common.DaemonStatusInitialized,
24✔
92
                taskChs:      make(map[int]chan PriorityTask),
24✔
93
                shutdownCh:   make(chan struct{}),
24✔
94
                notifyCh:     make(chan struct{}, 1),
24✔
95
                logger:       logger,
24✔
96
                metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
24✔
97
                options:      options,
24✔
98
                processor: NewParallelTaskProcessor(
24✔
99
                        logger,
24✔
100
                        metricsClient,
24✔
101
                        &ParallelTaskProcessorOptions{
24✔
102
                                QueueSize:   wRRTaskProcessorQueueSize,
24✔
103
                                WorkerCount: options.WorkerCount,
24✔
104
                                RetryPolicy: options.RetryPolicy,
24✔
105
                        },
24✔
106
                ),
24✔
107
        }
24✔
108
        scheduler.weights.Store(weights)
24✔
109

24✔
110
        return scheduler, nil
24✔
111
}
112

113
func (w *weightedRoundRobinTaskSchedulerImpl) Start() {
18✔
114
        if !atomic.CompareAndSwapInt32(&w.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
18✔
115
                return
×
116
        }
×
117

118
        w.processor.Start()
18✔
119

18✔
120
        w.dispatcherWG.Add(w.options.DispatcherCount)
18✔
121
        for i := 0; i != w.options.DispatcherCount; i++ {
42✔
122
                go w.dispatcher()
24✔
123
        }
24✔
124
        go w.updateWeights()
18✔
125

18✔
126
        w.logger.Info("Weighted round robin task scheduler started.")
18✔
127
}
128

129
func (w *weightedRoundRobinTaskSchedulerImpl) Stop() {
18✔
130
        if !atomic.CompareAndSwapInt32(&w.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
18✔
131
                return
×
132
        }
×
133

134
        close(w.shutdownCh)
18✔
135

18✔
136
        w.processor.Stop()
18✔
137

18✔
138
        w.RLock()
18✔
139
        for _, taskCh := range w.taskChs {
39✔
140
                drainAndNackPriorityTask(taskCh)
21✔
141
        }
21✔
142
        w.RUnlock()
18✔
143

18✔
144
        if success := common.AwaitWaitGroup(&w.dispatcherWG, time.Minute); !success {
18✔
145
                w.logger.Warn("Weighted round robin task scheduler timedout on shutdown.")
×
146
        }
×
147

148
        w.logger.Info("Weighted round robin task scheduler shutdown.")
18✔
149
}
150

151
func (w *weightedRoundRobinTaskSchedulerImpl) Submit(task PriorityTask) error {
5,539✔
152
        w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
5,539✔
153
        sw := w.metricsScope.StartTimer(metrics.PriorityTaskSubmitLatency)
5,539✔
154
        defer sw.Stop()
5,539✔
155

5,539✔
156
        if w.isStopped() {
10,439✔
157
                return ErrTaskSchedulerClosed
4,900✔
158
        }
4,900✔
159

160
        taskCh, err := w.getOrCreateTaskChan(task.Priority())
639✔
161
        if err != nil {
640✔
162
                return err
1✔
163
        }
1✔
164

165
        select {
638✔
166
        case taskCh <- task:
638✔
167
                w.notifyDispatcher()
638✔
168
                if w.isStopped() {
639✔
169
                        drainAndNackPriorityTask(taskCh)
1✔
170
                }
1✔
171
                return nil
638✔
172
        case <-w.shutdownCh:
×
173
                return ErrTaskSchedulerClosed
×
174
        }
175
}
176

177
func (w *weightedRoundRobinTaskSchedulerImpl) TrySubmit(
178
        task PriorityTask,
179
) (bool, error) {
14,497✔
180
        if w.isStopped() {
19,349✔
181
                return false, ErrTaskSchedulerClosed
4,852✔
182
        }
4,852✔
183

184
        taskCh, err := w.getOrCreateTaskChan(task.Priority())
9,648✔
185
        if err != nil {
9,648✔
186
                return false, err
×
187
        }
×
188

189
        select {
9,648✔
190
        case taskCh <- task:
9,647✔
191
                w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
9,647✔
192
                if w.isStopped() {
9,647✔
193
                        drainAndNackPriorityTask(taskCh)
×
194
                } else {
9,647✔
195
                        w.notifyDispatcher()
9,647✔
196
                }
9,647✔
197
                return true, nil
9,647✔
198
        case <-w.shutdownCh:
×
199
                return false, ErrTaskSchedulerClosed
×
200
        default:
1✔
201
                return false, nil
1✔
202
        }
203
}
204

205
func (w *weightedRoundRobinTaskSchedulerImpl) dispatcher() {
26✔
206
        defer w.dispatcherWG.Done()
26✔
207

26✔
208
        outstandingTasks := false
26✔
209
        taskChs := make(map[int]chan PriorityTask)
26✔
210

26✔
211
        for {
17,349✔
212
                if !outstandingTasks {
26,039✔
213
                        // if no task is dispatched in the last round,
8,716✔
214
                        // wait for a notification
8,716✔
215
                        select {
8,716✔
216
                        case <-w.notifyCh:
8,697✔
217
                                // block until there's a new task
218
                        case <-w.shutdownCh:
22✔
219
                                return
22✔
220
                        }
221
                }
222

223
                outstandingTasks = false
17,304✔
224
                w.updateTaskChs(taskChs)
17,304✔
225
                weights := w.getWeights()
17,304✔
226
                for priority, taskCh := range taskChs {
37,493✔
227
                        for i := 0; i < weights[priority]; i++ {
7,951,371✔
228
                                select {
7,931,182✔
229
                                case task := <-taskCh:
9,137✔
230
                                        // dispatched at least one task in this round
9,137✔
231
                                        outstandingTasks = true
9,137✔
232

9,137✔
233
                                        if err := w.processor.Submit(task); err != nil {
9,145✔
234
                                                w.logger.Error("fail to submit task to processor", tag.Error(err))
8✔
235
                                                task.Nack()
8✔
236
                                        }
8✔
237
                                case <-w.shutdownCh:
4✔
238
                                        return
4✔
239
                                default:
7,922,044✔
240
                                        // if no task, don't block. Skip to next priority
7,922,044✔
241
                                        break
7,922,044✔
242
                                }
243
                        }
244
                }
245
        }
246
}
247

248
func (w *weightedRoundRobinTaskSchedulerImpl) getOrCreateTaskChan(
249
        priority int,
250
) (chan PriorityTask, error) {
10,287✔
251
        if _, ok := w.getWeights()[priority]; !ok {
10,288✔
252
                return nil, fmt.Errorf("unknown task priority: %v", priority)
1✔
253
        }
1✔
254

255
        w.RLock()
10,286✔
256
        if taskCh, ok := w.taskChs[priority]; ok {
20,548✔
257
                w.RUnlock()
10,262✔
258
                return taskCh, nil
10,262✔
259
        }
10,262✔
260
        w.RUnlock()
27✔
261

27✔
262
        w.Lock()
27✔
263
        defer w.Unlock()
27✔
264
        if taskCh, ok := w.taskChs[priority]; ok {
27✔
265
                return taskCh, nil
×
266
        }
×
267
        taskCh := make(chan PriorityTask, w.options.QueueSize)
27✔
268
        w.taskChs[priority] = taskCh
27✔
269
        return taskCh, nil
27✔
270
}
271

272
func (w *weightedRoundRobinTaskSchedulerImpl) updateTaskChs(taskChs map[int]chan PriorityTask) {
17,304✔
273
        w.RLock()
17,304✔
274
        defer w.RUnlock()
17,304✔
275

17,304✔
276
        for priority, taskCh := range w.taskChs {
37,498✔
277
                if _, ok := taskChs[priority]; !ok {
20,231✔
278
                        taskChs[priority] = taskCh
37✔
279
                }
37✔
280
        }
281
}
282

283
func (w *weightedRoundRobinTaskSchedulerImpl) notifyDispatcher() {
10,285✔
284
        select {
10,285✔
285
        case w.notifyCh <- struct{}{}:
8,700✔
286
                // sent a notification to the dispatcher
287
        default:
1,588✔
288
                // do not block if there's already a notification
289
        }
290
}
291

292
func (w *weightedRoundRobinTaskSchedulerImpl) getWeights() map[int]int {
27,588✔
293
        return w.weights.Load().(map[int]int)
27,588✔
294
}
27,588✔
295

296
func (w *weightedRoundRobinTaskSchedulerImpl) updateWeights() {
18✔
297
        ticker := time.NewTicker(defaultUpdateWeightsInterval)
18✔
298
        for {
438✔
299
                select {
420✔
300
                case <-ticker.C:
405✔
301
                        weights, err := common.ConvertDynamicConfigMapPropertyToIntMap(w.options.Weights())
405✔
302
                        if err != nil {
405✔
303
                                w.logger.Error("failed to update weight for round robin task scheduler", tag.Error(err))
×
304
                        } else {
405✔
305
                                w.weights.Store(weights)
405✔
306
                        }
405✔
307
                case <-w.shutdownCh:
18✔
308
                        ticker.Stop()
18✔
309
                        return
18✔
310
                }
311
        }
312
}
313

314
func (w *weightedRoundRobinTaskSchedulerImpl) isStopped() bool {
30,318✔
315
        return atomic.LoadInt32(&w.status) == common.DaemonStatusStopped
30,318✔
316
}
30,318✔
317

318
func drainAndNackPriorityTask(taskCh <-chan PriorityTask) {
22✔
319
        for {
191✔
320
                select {
169✔
321
                case task := <-taskCh:
147✔
322
                        task.Nack()
147✔
323
                default:
22✔
324
                        return
22✔
325
                }
326
        }
327
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc