• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01907562-f5f0-40b6-8908-451d758b6138

02 Jul 2024 09:37PM UTC coverage: 71.512% (+0.003%) from 71.509%
01907562-f5f0-40b6-8908-451d758b6138

Pull #6155

buildkite

Groxx
Stop the ratelimiter collections when stopping the service
Pull Request #6155: Stop the ratelimiter collections when stopping the service

9 of 17 new or added lines in 1 file covered. (52.94%)

22 existing lines in 7 files now uncovered.

105315 of 147269 relevant lines covered (71.51%)

2600.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.05
/common/task/weighted_round_robin_task_scheduler.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "errors"
25
        "fmt"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/log"
32
        "github.com/uber/cadence/common/log/tag"
33
        "github.com/uber/cadence/common/metrics"
34
)
35

36
type weightedRoundRobinTaskSchedulerImpl struct {
37
        sync.RWMutex
38

39
        status       int32
40
        weights      atomic.Value // store the currently used weights
41
        taskChs      map[int]chan PriorityTask
42
        shutdownCh   chan struct{}
43
        notifyCh     chan struct{}
44
        dispatcherWG sync.WaitGroup
45
        logger       log.Logger
46
        metricsScope metrics.Scope
47
        options      *WeightedRoundRobinTaskSchedulerOptions
48

49
        processor Processor
50
}
51

52
const (
53
        wRRTaskProcessorQueueSize    = 1
54
        defaultUpdateWeightsInterval = 5 * time.Second
55
)
56

57
var (
58
        // ErrTaskSchedulerClosed is the error returned when submitting task to a stopped scheduler
59
        ErrTaskSchedulerClosed = errors.New("task scheduler has already shutdown")
60
)
61

62
// NewWeightedRoundRobinTaskScheduler creates a new WRR task scheduler
63
func NewWeightedRoundRobinTaskScheduler(
64
        logger log.Logger,
65
        metricsClient metrics.Client,
66
        options *WeightedRoundRobinTaskSchedulerOptions,
67
) (Scheduler, error) {
30✔
68
        weights, err := common.ConvertDynamicConfigMapPropertyToIntMap(options.Weights())
30✔
69
        if err != nil {
30✔
70
                return nil, err
×
71
        }
×
72

73
        if len(weights) == 0 {
30✔
74
                return nil, errors.New("weight is not specified in the scheduler option")
×
75
        }
×
76

77
        scheduler := &weightedRoundRobinTaskSchedulerImpl{
30✔
78
                status:       common.DaemonStatusInitialized,
30✔
79
                taskChs:      make(map[int]chan PriorityTask),
30✔
80
                shutdownCh:   make(chan struct{}),
30✔
81
                notifyCh:     make(chan struct{}, 1),
30✔
82
                logger:       logger,
30✔
83
                metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
30✔
84
                options:      options,
30✔
85
                processor: NewParallelTaskProcessor(
30✔
86
                        logger,
30✔
87
                        metricsClient,
30✔
88
                        &ParallelTaskProcessorOptions{
30✔
89
                                QueueSize:   wRRTaskProcessorQueueSize,
30✔
90
                                WorkerCount: options.WorkerCount,
30✔
91
                                RetryPolicy: options.RetryPolicy,
30✔
92
                        },
30✔
93
                ),
30✔
94
        }
30✔
95
        scheduler.weights.Store(weights)
30✔
96

30✔
97
        return scheduler, nil
30✔
98
}
99

100
func (w *weightedRoundRobinTaskSchedulerImpl) Start() {
24✔
101
        if !atomic.CompareAndSwapInt32(&w.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
24✔
102
                return
×
103
        }
×
104

105
        w.processor.Start()
24✔
106

24✔
107
        w.dispatcherWG.Add(w.options.DispatcherCount)
24✔
108
        for i := 0; i != w.options.DispatcherCount; i++ {
54✔
109
                go w.dispatcher()
30✔
110
        }
30✔
111
        go w.updateWeights()
24✔
112

24✔
113
        w.logger.Info("Weighted round robin task scheduler started.")
24✔
114
}
115

116
func (w *weightedRoundRobinTaskSchedulerImpl) Stop() {
24✔
117
        if !atomic.CompareAndSwapInt32(&w.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
24✔
118
                return
×
119
        }
×
120

121
        close(w.shutdownCh)
24✔
122

24✔
123
        w.processor.Stop()
24✔
124

24✔
125
        w.RLock()
24✔
126
        for _, taskCh := range w.taskChs {
51✔
127
                drainAndNackPriorityTask(taskCh)
27✔
128
        }
27✔
129
        w.RUnlock()
24✔
130

24✔
131
        if success := common.AwaitWaitGroup(&w.dispatcherWG, time.Minute); !success {
24✔
132
                w.logger.Warn("Weighted round robin task scheduler timedout on shutdown.")
×
133
        }
×
134

135
        w.logger.Info("Weighted round robin task scheduler shutdown.")
24✔
136
}
137

138
func (w *weightedRoundRobinTaskSchedulerImpl) Submit(task PriorityTask) error {
5,602✔
139
        w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
5,602✔
140
        sw := w.metricsScope.StartTimer(metrics.PriorityTaskSubmitLatency)
5,602✔
141
        defer sw.Stop()
5,602✔
142

5,602✔
143
        if w.isStopped() {
10,150✔
144
                return ErrTaskSchedulerClosed
4,548✔
145
        }
4,548✔
146

147
        taskCh, err := w.getOrCreateTaskChan(task.Priority())
1,054✔
148
        if err != nil {
1,055✔
149
                return err
1✔
150
        }
1✔
151

152
        select {
1,053✔
153
        case taskCh <- task:
1,053✔
154
                w.notifyDispatcher()
1,053✔
155
                if w.isStopped() {
1,054✔
156
                        drainAndNackPriorityTask(taskCh)
1✔
157
                }
1✔
158
                return nil
1,053✔
159
        case <-w.shutdownCh:
×
160
                return ErrTaskSchedulerClosed
×
161
        }
162
}
163

164
func (w *weightedRoundRobinTaskSchedulerImpl) TrySubmit(
165
        task PriorityTask,
166
) (bool, error) {
14,365✔
167
        if w.isStopped() {
18,760✔
168
                return false, ErrTaskSchedulerClosed
4,395✔
169
        }
4,395✔
170

171
        taskCh, err := w.getOrCreateTaskChan(task.Priority())
9,970✔
172
        if err != nil {
9,970✔
173
                return false, err
×
174
        }
×
175

176
        select {
9,970✔
177
        case taskCh <- task:
9,969✔
178
                w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
9,969✔
179
                if w.isStopped() {
9,969✔
UNCOV
180
                        drainAndNackPriorityTask(taskCh)
×
181
                } else {
9,969✔
182
                        w.notifyDispatcher()
9,969✔
183
                }
9,969✔
184
                return true, nil
9,969✔
185
        case <-w.shutdownCh:
×
186
                return false, ErrTaskSchedulerClosed
×
187
        default:
1✔
188
                return false, nil
1✔
189
        }
190
}
191

192
func (w *weightedRoundRobinTaskSchedulerImpl) dispatcher() {
32✔
193
        defer w.dispatcherWG.Done()
32✔
194

32✔
195
        outstandingTasks := false
32✔
196
        taskChs := make(map[int]chan PriorityTask)
32✔
197

32✔
198
        for {
15,168✔
199
                if !outstandingTasks {
23,010✔
200
                        // if no task is dispatched in the last round,
7,874✔
201
                        // wait for a notification
7,874✔
202
                        w.logger.Debug("Weighted round robin task scheduler is waiting for new task notification because there was no task dispatched in the last round.")
7,874✔
203
                        select {
7,874✔
204
                        case <-w.notifyCh:
7,850✔
205
                                // block until there's a new task
7,850✔
206
                                w.logger.Debug("Weighted round robin task scheduler got notification so will check for new tasks.")
7,850✔
207
                        case <-w.shutdownCh:
27✔
208
                                return
27✔
209
                        }
210
                }
211

212
                outstandingTasks = false
15,112✔
213
                w.updateTaskChs(taskChs)
15,112✔
214
                weights := w.getWeights()
15,112✔
215
                for priority, taskCh := range taskChs {
33,065✔
216
                        count, ok := weights[priority]
17,953✔
217
                        if !ok {
17,953✔
218
                                w.logger.Error("weights not found for task priority", tag.Dynamic("priority", priority), tag.Dynamic("weights", weights))
×
219
                                continue
×
220
                        }
221
                Submit_Loop:
17,953✔
222
                        for i := 0; i < count; i++ {
44,641✔
223
                                select {
26,688✔
224
                                case task := <-taskCh:
9,313✔
225
                                        // dispatched at least one task in this round
9,313✔
226
                                        outstandingTasks = true
9,313✔
227

9,313✔
228
                                        if err := w.processor.Submit(task); err != nil {
9,321✔
229
                                                w.logger.Error("fail to submit task to processor", tag.Error(err))
8✔
230
                                                task.Nack()
8✔
231
                                        }
8✔
232
                                case <-w.shutdownCh:
5✔
233
                                        return
5✔
234
                                default:
17,373✔
235
                                        // if no task, don't block. Skip to next priority
17,373✔
236
                                        break Submit_Loop
17,373✔
237
                                }
238
                        }
239
                }
240
        }
241
}
242

243
func (w *weightedRoundRobinTaskSchedulerImpl) getOrCreateTaskChan(priority int) (chan PriorityTask, error) {
11,024✔
244
        if _, ok := w.getWeights()[priority]; !ok {
11,025✔
245
                return nil, fmt.Errorf("unknown task priority: %v", priority)
1✔
246
        }
1✔
247

248
        w.RLock()
11,023✔
249
        if taskCh, ok := w.taskChs[priority]; ok {
22,016✔
250
                w.RUnlock()
10,993✔
251
                return taskCh, nil
10,993✔
252
        }
10,993✔
253
        w.RUnlock()
33✔
254

33✔
255
        w.Lock()
33✔
256
        defer w.Unlock()
33✔
257
        if taskCh, ok := w.taskChs[priority]; ok {
33✔
258
                return taskCh, nil
×
259
        }
×
260
        taskCh := make(chan PriorityTask, w.options.QueueSize)
33✔
261
        w.taskChs[priority] = taskCh
33✔
262
        return taskCh, nil
33✔
263
}
264

265
func (w *weightedRoundRobinTaskSchedulerImpl) updateTaskChs(taskChs map[int]chan PriorityTask) {
15,112✔
266
        w.RLock()
15,112✔
267
        defer w.RUnlock()
15,112✔
268

15,112✔
269
        for priority, taskCh := range w.taskChs {
33,066✔
270
                if _, ok := taskChs[priority]; !ok {
17,997✔
271
                        taskChs[priority] = taskCh
43✔
272
                }
43✔
273
        }
274
}
275

276
func (w *weightedRoundRobinTaskSchedulerImpl) notifyDispatcher() {
11,022✔
277
        select {
11,022✔
278
        case w.notifyCh <- struct{}{}:
7,853✔
279
                // sent a notification to the dispatcher
280
        default:
3,172✔
281
                // do not block if there's already a notification
282
        }
283
}
284

285
func (w *weightedRoundRobinTaskSchedulerImpl) getWeights() map[int]int {
26,133✔
286
        return w.weights.Load().(map[int]int)
26,133✔
287
}
26,133✔
288

289
func (w *weightedRoundRobinTaskSchedulerImpl) updateWeights() {
24✔
290
        ticker := time.NewTicker(defaultUpdateWeightsInterval)
24✔
291
        for {
419✔
292
                select {
395✔
293
                case <-ticker.C:
374✔
294
                        weights, err := common.ConvertDynamicConfigMapPropertyToIntMap(w.options.Weights())
374✔
295
                        if err != nil {
374✔
296
                                w.logger.Error("failed to update weight for round robin task scheduler", tag.Error(err))
×
297
                        } else {
374✔
298
                                w.weights.Store(weights)
374✔
299
                        }
374✔
300
                case <-w.shutdownCh:
24✔
301
                        ticker.Stop()
24✔
302
                        return
24✔
303
                }
304
        }
305
}
306

307
func (w *weightedRoundRobinTaskSchedulerImpl) isStopped() bool {
30,986✔
308
        return atomic.LoadInt32(&w.status) == common.DaemonStatusStopped
30,986✔
309
}
30,986✔
310

311
func drainAndNackPriorityTask(taskCh <-chan PriorityTask) {
28✔
312
        for {
764✔
313
                select {
736✔
314
                case task := <-taskCh:
708✔
315
                        task.Nack()
708✔
316
                default:
28✔
317
                        return
28✔
318
                }
319
        }
320
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc