• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0188cab7-c9fd-4797-9ec9-93f01b5397f4

17 Jun 2023 06:57PM UTC coverage: 57.263% (+0.006%) from 57.257%
0188cab7-c9fd-4797-9ec9-93f01b5397f4

push

buildkite

web-flow
[CDNC-3578] Worklfow start metric (#5289)

* add workflow start metric

* fix lint

* remove logs and add workflow start scope

* fix lint second attempt

* emit metric in transferStandbyTaskExecutor

8 of 8 new or added lines in 2 files covered. (100.0%)

87099 of 152104 relevant lines covered (57.26%)

2483.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.12
/common/task/weightedRoundRobinTaskScheduler.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "errors"
25
        "fmt"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/backoff"
32
        "github.com/uber/cadence/common/dynamicconfig"
33
        "github.com/uber/cadence/common/log"
34
        "github.com/uber/cadence/common/log/tag"
35
        "github.com/uber/cadence/common/metrics"
36
)
37

38
type (
39
        // WeightedRoundRobinTaskSchedulerOptions configs WRR task scheduler
40
        WeightedRoundRobinTaskSchedulerOptions struct {
41
                Weights         dynamicconfig.MapPropertyFn
42
                QueueSize       int
43
                WorkerCount     dynamicconfig.IntPropertyFn
44
                DispatcherCount int
45
                RetryPolicy     backoff.RetryPolicy
46
        }
47

48
        weightedRoundRobinTaskSchedulerImpl struct {
49
                sync.RWMutex
50

51
                status       int32
52
                weights      atomic.Value // store the currently used weights
53
                taskChs      map[int]chan PriorityTask
54
                shutdownCh   chan struct{}
55
                notifyCh     chan struct{}
56
                dispatcherWG sync.WaitGroup
57
                logger       log.Logger
58
                metricsScope metrics.Scope
59
                options      *WeightedRoundRobinTaskSchedulerOptions
60

61
                processor Processor
62
        }
63
)
64

65
const (
66
        wRRTaskProcessorQueueSize    = 1
67
        defaultUpdateWeightsInterval = 5 * time.Second
68
)
69

70
var (
71
        // ErrTaskSchedulerClosed is the error returned when submitting task to a stopped scheduler
72
        ErrTaskSchedulerClosed = errors.New("task scheduler has already shutdown")
73
)
74

75
// NewWeightedRoundRobinTaskScheduler creates a new WRR task scheduler
76
func NewWeightedRoundRobinTaskScheduler(
77
        logger log.Logger,
78
        metricsClient metrics.Client,
79
        options *WeightedRoundRobinTaskSchedulerOptions,
80
) (Scheduler, error) {
24✔
81
        weights, err := common.ConvertDynamicConfigMapPropertyToIntMap(options.Weights())
24✔
82
        if err != nil {
24✔
83
                return nil, err
×
84
        }
×
85

86
        if len(weights) == 0 {
24✔
87
                return nil, errors.New("weight is not specified in the scheduler option")
×
88
        }
×
89

90
        scheduler := &weightedRoundRobinTaskSchedulerImpl{
24✔
91
                status:       common.DaemonStatusInitialized,
24✔
92
                taskChs:      make(map[int]chan PriorityTask),
24✔
93
                shutdownCh:   make(chan struct{}),
24✔
94
                notifyCh:     make(chan struct{}, 1),
24✔
95
                logger:       logger,
24✔
96
                metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
24✔
97
                options:      options,
24✔
98
                processor: NewParallelTaskProcessor(
24✔
99
                        logger,
24✔
100
                        metricsClient,
24✔
101
                        &ParallelTaskProcessorOptions{
24✔
102
                                QueueSize:   wRRTaskProcessorQueueSize,
24✔
103
                                WorkerCount: options.WorkerCount,
24✔
104
                                RetryPolicy: options.RetryPolicy,
24✔
105
                        },
24✔
106
                ),
24✔
107
        }
24✔
108
        scheduler.weights.Store(weights)
24✔
109

24✔
110
        return scheduler, nil
24✔
111
}
112

113
func (w *weightedRoundRobinTaskSchedulerImpl) Start() {
18✔
114
        if !atomic.CompareAndSwapInt32(&w.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
18✔
115
                return
×
116
        }
×
117

118
        w.processor.Start()
18✔
119

18✔
120
        w.dispatcherWG.Add(w.options.DispatcherCount)
18✔
121
        for i := 0; i != w.options.DispatcherCount; i++ {
42✔
122
                go w.dispatcher()
24✔
123
        }
24✔
124
        go w.updateWeights()
18✔
125

18✔
126
        w.logger.Info("Weighted round robin task scheduler started.")
18✔
127
}
128

129
func (w *weightedRoundRobinTaskSchedulerImpl) Stop() {
18✔
130
        if !atomic.CompareAndSwapInt32(&w.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
18✔
131
                return
×
132
        }
×
133

134
        close(w.shutdownCh)
18✔
135

18✔
136
        w.processor.Stop()
18✔
137

18✔
138
        w.RLock()
18✔
139
        for _, taskCh := range w.taskChs {
39✔
140
                drainAndNackPriorityTask(taskCh)
21✔
141
        }
21✔
142
        w.RUnlock()
18✔
143

18✔
144
        if success := common.AwaitWaitGroup(&w.dispatcherWG, time.Minute); !success {
18✔
145
                w.logger.Warn("Weighted round robin task scheduler timedout on shutdown.")
×
146
        }
×
147

148
        w.logger.Info("Weighted round robin task scheduler shutdown.")
18✔
149
}
150

151
func (w *weightedRoundRobinTaskSchedulerImpl) Submit(task PriorityTask) error {
5,539✔
152
        w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
5,539✔
153
        sw := w.metricsScope.StartTimer(metrics.PriorityTaskSubmitLatency)
5,539✔
154
        defer sw.Stop()
5,539✔
155

5,539✔
156
        if w.isStopped() {
10,438✔
157
                return ErrTaskSchedulerClosed
4,899✔
158
        }
4,899✔
159

160
        taskCh, err := w.getOrCreateTaskChan(task.Priority())
640✔
161
        if err != nil {
641✔
162
                return err
1✔
163
        }
1✔
164

165
        select {
639✔
166
        case taskCh <- task:
639✔
167
                w.notifyDispatcher()
639✔
168
                if w.isStopped() {
639✔
169
                        drainAndNackPriorityTask(taskCh)
×
170
                }
×
171
                return nil
639✔
172
        case <-w.shutdownCh:
×
173
                return ErrTaskSchedulerClosed
×
174
        }
175
}
176

177
func (w *weightedRoundRobinTaskSchedulerImpl) TrySubmit(
178
        task PriorityTask,
179
) (bool, error) {
14,519✔
180
        if w.isStopped() {
19,367✔
181
                return false, ErrTaskSchedulerClosed
4,848✔
182
        }
4,848✔
183

184
        taskCh, err := w.getOrCreateTaskChan(task.Priority())
9,674✔
185
        if err != nil {
9,674✔
186
                return false, err
×
187
        }
×
188

189
        select {
9,674✔
190
        case taskCh <- task:
9,673✔
191
                w.metricsScope.IncCounter(metrics.PriorityTaskSubmitRequest)
9,673✔
192
                if w.isStopped() {
9,674✔
193
                        drainAndNackPriorityTask(taskCh)
1✔
194
                } else {
9,673✔
195
                        w.notifyDispatcher()
9,672✔
196
                }
9,672✔
197
                return true, nil
9,673✔
198
        case <-w.shutdownCh:
×
199
                return false, ErrTaskSchedulerClosed
×
200
        default:
1✔
201
                return false, nil
1✔
202
        }
203
}
204

205
func (w *weightedRoundRobinTaskSchedulerImpl) dispatcher() {
26✔
206
        defer w.dispatcherWG.Done()
26✔
207

26✔
208
        outstandingTasks := false
26✔
209
        taskChs := make(map[int]chan PriorityTask)
26✔
210

26✔
211
        for {
16,933✔
212
                if !outstandingTasks {
25,322✔
213
                        // if no task is dispatched in the last round,
8,415✔
214
                        // wait for a notification
8,415✔
215
                        select {
8,415✔
216
                        case <-w.notifyCh:
8,396✔
217
                                // block until there's a new task
218
                        case <-w.shutdownCh:
22✔
219
                                return
22✔
220
                        }
221
                }
222

223
                outstandingTasks = false
16,888✔
224
                w.updateTaskChs(taskChs)
16,888✔
225
                weights := w.getWeights()
16,888✔
226
                for priority, taskCh := range taskChs {
35,725✔
227
                        for i := 0; i < weights[priority]; i++ {
7,973,207✔
228
                                select {
7,954,370✔
229
                                case task := <-taskCh:
9,158✔
230
                                        // dispatched at least one task in this round
9,158✔
231
                                        outstandingTasks = true
9,158✔
232

9,158✔
233
                                        if err := w.processor.Submit(task); err != nil {
9,165✔
234
                                                w.logger.Error("fail to submit task to processor", tag.Error(err))
7✔
235
                                                task.Nack()
7✔
236
                                        }
7✔
237
                                case <-w.shutdownCh:
4✔
238
                                        return
4✔
239
                                default:
7,945,211✔
240
                                        // if no task, don't block. Skip to next priority
7,945,211✔
241
                                        break
7,945,211✔
242
                                }
243
                        }
244
                }
245
        }
246
}
247

248
func (w *weightedRoundRobinTaskSchedulerImpl) getOrCreateTaskChan(
249
        priority int,
250
) (chan PriorityTask, error) {
10,314✔
251
        if _, ok := w.getWeights()[priority]; !ok {
10,315✔
252
                return nil, fmt.Errorf("unknown task priority: %v", priority)
1✔
253
        }
1✔
254

255
        w.RLock()
10,313✔
256
        if taskCh, ok := w.taskChs[priority]; ok {
20,602✔
257
                w.RUnlock()
10,289✔
258
                return taskCh, nil
10,289✔
259
        }
10,289✔
260
        w.RUnlock()
27✔
261

27✔
262
        w.Lock()
27✔
263
        defer w.Unlock()
27✔
264
        if taskCh, ok := w.taskChs[priority]; ok {
27✔
265
                return taskCh, nil
×
266
        }
×
267
        taskCh := make(chan PriorityTask, w.options.QueueSize)
27✔
268
        w.taskChs[priority] = taskCh
27✔
269
        return taskCh, nil
27✔
270
}
271

272
func (w *weightedRoundRobinTaskSchedulerImpl) updateTaskChs(taskChs map[int]chan PriorityTask) {
16,888✔
273
        w.RLock()
16,888✔
274
        defer w.RUnlock()
16,888✔
275

16,888✔
276
        for priority, taskCh := range w.taskChs {
35,729✔
277
                if _, ok := taskChs[priority]; !ok {
18,878✔
278
                        taskChs[priority] = taskCh
37✔
279
                }
37✔
280
        }
281
}
282

283
func (w *weightedRoundRobinTaskSchedulerImpl) notifyDispatcher() {
10,311✔
284
        select {
10,311✔
285
        case w.notifyCh <- struct{}{}:
8,399✔
286
                // sent a notification to the dispatcher
287
        default:
1,915✔
288
                // do not block if there's already a notification
289
        }
290
}
291

292
func (w *weightedRoundRobinTaskSchedulerImpl) getWeights() map[int]int {
27,199✔
293
        return w.weights.Load().(map[int]int)
27,199✔
294
}
27,199✔
295

296
func (w *weightedRoundRobinTaskSchedulerImpl) updateWeights() {
18✔
297
        ticker := time.NewTicker(defaultUpdateWeightsInterval)
18✔
298
        for {
440✔
299
                select {
422✔
300
                case <-ticker.C:
407✔
301
                        weights, err := common.ConvertDynamicConfigMapPropertyToIntMap(w.options.Weights())
407✔
302
                        if err != nil {
407✔
303
                                w.logger.Error("failed to update weight for round robin task scheduler", tag.Error(err))
×
304
                        } else {
407✔
305
                                w.weights.Store(weights)
407✔
306
                        }
407✔
307
                case <-w.shutdownCh:
18✔
308
                        ticker.Stop()
18✔
309
                        return
18✔
310
                }
311
        }
312
}
313

314
func (w *weightedRoundRobinTaskSchedulerImpl) isStopped() bool {
30,367✔
315
        return atomic.LoadInt32(&w.status) == common.DaemonStatusStopped
30,367✔
316
}
30,367✔
317

318
func drainAndNackPriorityTask(taskCh <-chan PriorityTask) {
22✔
319
        for {
197✔
320
                select {
175✔
321
                case task := <-taskCh:
153✔
322
                        task.Nack()
153✔
323
                default:
22✔
324
                        return
22✔
325
                }
326
        }
327
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc