• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0188b6ba-6aaf-4739-8b30-52eaead5800c

13 Jun 2023 09:47PM UTC coverage: 57.204% (-0.01%) from 57.217%
0188b6ba-6aaf-4739-8b30-52eaead5800c

push

buildkite

web-flow
Bugfix/isolation groups domain drains (#5315)

What changed?
Fixes several bugs in the domain isolation-group handling which wasn't tested properly in replication it notably:

Fixes the problem of upserting configuration from the inactive region, which was previously would error
Fixes the problem of replication of configuration, which was entirely not working
Refactors the domain controller by splitting out this functionality into its own, much simpler function rather than continuing to overload the already incomprehensible domain controller.
Why?

How did you test it?

cadence --env docstore-prod11 --proxy_region dca admin isolation-groups get-domain --domain cadence-canary-global
Isolation Groups State
asdf5            Drained
asfd             Drained
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region dca admin isolation-groups update-domain --domain cadence-canary-global  --remove-all-drains
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region dca admin isolation-groups get-domain --domain cadence-canary-global
-- No groups found --
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region phx admin isolation-groups get-domain --domain cadence-canary-global
-- No groups found --
Potential risks

141 of 141 new or added lines in 5 files covered. (100.0%)

86988 of 152065 relevant lines covered (57.2%)

2482.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.64
/common/task/weightedRoundRobinTaskScheduler.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "errors"
25
        "fmt"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/backoff"
32
        "github.com/uber/cadence/common/dynamicconfig"
33
        "github.com/uber/cadence/common/log"
34
        "github.com/uber/cadence/common/log/tag"
35
        "github.com/uber/cadence/common/metrics"
36
)
37

38
type (
39
        // WeightedRoundRobinTaskSchedulerOptions configs WRR task scheduler
40
        WeightedRoundRobinTaskSchedulerOptions struct {
41
                Weights         dynamicconfig.MapPropertyFn
42
                QueueSize       int
43
                WorkerCount     dynamicconfig.IntPropertyFn
44
                DispatcherCount int
45
                RetryPolicy     backoff.RetryPolicy
46
        }
47

48
        weightedRoundRobinTaskSchedulerImpl struct {
49
                sync.RWMutex
50

51
                status       int32
52
                weights      atomic.Value // store the currently used weights
53
                taskChs      map[int]chan PriorityTask
54
                shutdownCh   chan struct{}
55
                notifyCh     chan struct{}
56
                dispatcherWG sync.WaitGroup
57
                logger       log.Logger
58
                metricsScope metrics.Scope
59
                options      *WeightedRoundRobinTaskSchedulerOptions
60

61
                processor Processor
62
        }
63
)
64

65
const (
66
        wRRTaskProcessorQueueSize    = 1
67
        defaultUpdateWeightsInterval = 5 * time.Second
68
)
69

70
var (
71
        // ErrTaskSchedulerClosed is the error returned when submitting task to a stopped scheduler
72
        ErrTaskSchedulerClosed = errors.New("task scheduler has already shutdown")
73
)
74

75
// NewWeightedRoundRobinTaskScheduler creates a new WRR task scheduler
76
func NewWeightedRoundRobinTaskScheduler(
77
        logger log.Logger,
78
        metricsClient metrics.Client,
79
        options *WeightedRoundRobinTaskSchedulerOptions,
80
) (Scheduler, error) {
24✔
81
        weights, err := common.ConvertDynamicConfigMapPropertyToIntMap(options.Weights())
24✔
82
        if err != nil {
24✔
83
                return nil, err
×
84
        }
×
85

86
        if len(weights) == 0 {
24✔
87
                return nil, errors.New("weight is not specified in the scheduler option")
×
88
        }
×
89

90
        scheduler := &weightedRoundRobinTaskSchedulerImpl{
24✔
91
                status:       common.DaemonStatusInitialized,
24✔
92
                taskChs:      make(map[int]chan PriorityTask),
24✔
93
                shutdownCh:   make(chan struct{}),
24✔
94
                notifyCh:     make(chan struct{}, 1),
24✔
95
                logger:       logger,
24✔
96
                metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
24✔
97
                options:      options,
24✔
98
                processor: NewParallelTaskProcessor(
24✔
99
                        logger,
24✔
100
                        metricsClient,
24✔
101
                        &ParallelTaskProcessorOptions{
24✔
102
                                QueueSize:   wRRTaskProcessorQueueSize,
24✔
103
                                WorkerCount: options.WorkerCount,
24✔
104
                                RetryPolicy: options.RetryPolicy,
24✔
105
                        },
24✔
106
                ),
24✔
107
        }
24✔
108
        scheduler.weights.Store(weights)
24✔
109

24✔
110
        return scheduler, nil
24✔
111
}
112

113
func (w *weightedRoundRobinTaskSchedulerImpl) Start() {
18✔
114
        if !atomic.CompareAndSwapInt32(&w.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
18✔
115
                return
×
116
        }
×
117

118
        w.processor.Start()
18✔
119

18✔
120
        w.dispatcherWG.Add(w.options.DispatcherCount)
18✔
121
        for i := 0; i != w.options.DispatcherCount; i++ {
42✔
122
                go w.dispatcher()
24✔
123
        }
24✔
124
        go w.updateWeights()
18✔
125

18✔
126
        w.logger.Info("Weighted round robin task scheduler started.")
18✔
127
}
128

129
func (w *weightedRoundRobinTaskSchedulerImpl) Stop() {
18✔
130
        if !atomic.CompareAndSwapInt32(&w.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
18✔
131
                return
×
132
        }
×
133

134
        close(w.shutdownCh)
18✔
135

18✔
136
        w.processor.Stop()
18✔
137

18✔
138
        w.RLock()
18✔
139
        for _, taskCh := range w.taskChs {
39✔
140
                drainAndNackPriorityTask(taskCh)
21✔
141
        }
21✔
142
        w.RUnlock()
18✔
143

18✔
144
        if success := common.AwaitWaitGroup(&w.dispatcherWG, time.Minute); !success {
18✔
145
                w.logger.Warn("Weighted round robin task scheduler timedout on shutdown.")
×
146
        }
×
147

148
        w.logger.Info("Weighted round robin task scheduler shutdown.")
18✔
149
}
150

151
func (w *weightedRoundRobinTaskSchedulerImpl) Submit(task PriorityTask) error {
5,539✔
152
        w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
5,539✔
153
        sw := w.metricsScope.StartTimer(metrics.PriorityTaskSubmitLatency)
5,539✔
154
        defer sw.Stop()
5,539✔
155

5,539✔
156
        if w.isStopped() {
10,430✔
157
                return ErrTaskSchedulerClosed
4,891✔
158
        }
4,891✔
159

160
        taskCh, err := w.getOrCreateTaskChan(task.Priority())
648✔
161
        if err != nil {
649✔
162
                return err
1✔
163
        }
1✔
164

165
        select {
647✔
166
        case taskCh <- task:
647✔
167
                w.notifyDispatcher()
647✔
168
                if w.isStopped() {
647✔
169
                        drainAndNackPriorityTask(taskCh)
×
170
                }
×
171
                return nil
647✔
172
        case <-w.shutdownCh:
×
173
                return ErrTaskSchedulerClosed
×
174
        }
175
}
176

177
func (w *weightedRoundRobinTaskSchedulerImpl) TrySubmit(
178
        task PriorityTask,
179
) (bool, error) {
14,563✔
180
        if w.isStopped() {
19,404✔
181
                return false, ErrTaskSchedulerClosed
4,841✔
182
        }
4,841✔
183

184
        taskCh, err := w.getOrCreateTaskChan(task.Priority())
9,725✔
185
        if err != nil {
9,725✔
186
                return false, err
×
187
        }
×
188

189
        select {
9,725✔
190
        case taskCh <- task:
9,723✔
191
                w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
9,723✔
192
                if w.isStopped() {
9,723✔
193
                        drainAndNackPriorityTask(taskCh)
×
194
                } else {
9,723✔
195
                        w.notifyDispatcher()
9,723✔
196
                }
9,723✔
197
                return true, nil
9,723✔
198
        case <-w.shutdownCh:
1✔
199
                return false, ErrTaskSchedulerClosed
1✔
200
        default:
1✔
201
                return false, nil
1✔
202
        }
203
}
204

205
func (w *weightedRoundRobinTaskSchedulerImpl) dispatcher() {
26✔
206
        defer w.dispatcherWG.Done()
26✔
207

26✔
208
        outstandingTasks := false
26✔
209
        taskChs := make(map[int]chan PriorityTask)
26✔
210

26✔
211
        for {
17,272✔
212
                if !outstandingTasks {
25,903✔
213
                        // if no task is dispatched in the last round,
8,657✔
214
                        // wait for a notification
8,657✔
215
                        select {
8,657✔
216
                        case <-w.notifyCh:
8,638✔
217
                                // block until there's a new task
218
                        case <-w.shutdownCh:
22✔
219
                                return
22✔
220
                        }
221
                }
222

223
                outstandingTasks = false
17,227✔
224
                w.updateTaskChs(taskChs)
17,227✔
225
                weights := w.getWeights()
17,227✔
226
                for priority, taskCh := range taskChs {
36,972✔
227
                        for i := 0; i < weights[priority]; i++ {
8,002,828✔
228
                                select {
7,983,083✔
229
                                case task := <-taskCh:
9,185✔
230
                                        // dispatched at least one task in this round
9,185✔
231
                                        outstandingTasks = true
9,185✔
232

9,185✔
233
                                        if err := w.processor.Submit(task); err != nil {
9,189✔
234
                                                w.logger.Error("fail to submit task to processor", tag.Error(err))
4✔
235
                                                task.Nack()
4✔
236
                                        }
4✔
237
                                case <-w.shutdownCh:
4✔
238
                                        return
4✔
239
                                default:
7,973,897✔
240
                                        // if no task, don't block. Skip to next priority
7,973,897✔
241
                                        break
7,973,897✔
242
                                }
243
                        }
244
                }
245
        }
246
}
247

248
func (w *weightedRoundRobinTaskSchedulerImpl) getOrCreateTaskChan(
249
        priority int,
250
) (chan PriorityTask, error) {
10,373✔
251
        if _, ok := w.getWeights()[priority]; !ok {
10,374✔
252
                return nil, fmt.Errorf("unknown task priority: %v", priority)
1✔
253
        }
1✔
254

255
        w.RLock()
10,372✔
256
        if taskCh, ok := w.taskChs[priority]; ok {
20,720✔
257
                w.RUnlock()
10,348✔
258
                return taskCh, nil
10,348✔
259
        }
10,348✔
260
        w.RUnlock()
27✔
261

27✔
262
        w.Lock()
27✔
263
        defer w.Unlock()
27✔
264
        if taskCh, ok := w.taskChs[priority]; ok {
27✔
265
                return taskCh, nil
×
266
        }
×
267
        taskCh := make(chan PriorityTask, w.options.QueueSize)
27✔
268
        w.taskChs[priority] = taskCh
27✔
269
        return taskCh, nil
27✔
270
}
271

272
func (w *weightedRoundRobinTaskSchedulerImpl) updateTaskChs(taskChs map[int]chan PriorityTask) {
17,227✔
273
        w.RLock()
17,227✔
274
        defer w.RUnlock()
17,227✔
275

17,227✔
276
        for priority, taskCh := range w.taskChs {
36,976✔
277
                if _, ok := taskChs[priority]; !ok {
19,786✔
278
                        taskChs[priority] = taskCh
37✔
279
                }
37✔
280
        }
281
}
282

283
func (w *weightedRoundRobinTaskSchedulerImpl) notifyDispatcher() {
10,370✔
284
        select {
10,370✔
285
        case w.notifyCh <- struct{}{}:
8,641✔
286
                // sent a notification to the dispatcher
287
        default:
1,732✔
288
                // do not block if there's already a notification
289
        }
290
}
291

292
func (w *weightedRoundRobinTaskSchedulerImpl) getWeights() map[int]int {
27,597✔
293
        return w.weights.Load().(map[int]int)
27,597✔
294
}
27,597✔
295

296
func (w *weightedRoundRobinTaskSchedulerImpl) updateWeights() {
18✔
297
        ticker := time.NewTicker(defaultUpdateWeightsInterval)
18✔
298
        for {
441✔
299
                select {
423✔
300
                case <-ticker.C:
408✔
301
                        weights, err := common.ConvertDynamicConfigMapPropertyToIntMap(w.options.Weights())
408✔
302
                        if err != nil {
408✔
303
                                w.logger.Error("failed to update weight for round robin task scheduler", tag.Error(err))
×
304
                        } else {
408✔
305
                                w.weights.Store(weights)
408✔
306
                        }
408✔
307
                case <-w.shutdownCh:
18✔
308
                        ticker.Stop()
18✔
309
                        return
18✔
310
                }
311
        }
312
}
313

314
func (w *weightedRoundRobinTaskSchedulerImpl) isStopped() bool {
30,469✔
315
        return atomic.LoadInt32(&w.status) == common.DaemonStatusStopped
30,469✔
316
}
30,469✔
317

318
func drainAndNackPriorityTask(taskCh <-chan PriorityTask) {
21✔
319
        for {
226✔
320
                select {
205✔
321
                case task := <-taskCh:
184✔
322
                        task.Nack()
184✔
323
                default:
21✔
324
                        return
21✔
325
                }
326
        }
327
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc