• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01909e15-6794-4b38-abfa-ab8dde3a5a9b

28 Jun 2024 10:07PM UTC coverage: 71.517% (+0.08%) from 71.438%
01909e15-6794-4b38-abfa-ab8dde3a5a9b

push

buildkite

web-flow
Global ratelimiter: everything else (#6141)

After too many attempts to break this apart and build different portions in self-contained ways, and running into various inter-dependent roadblocks... I just gave up and did it all at once.

# Rollout plan for people who don't want or need this system

Do nothing :)

As of this PR, you'll use "disabled" and that should be as close to "no changes at all" as possible.
Soon, you'll get "local", and then you'll have some new metrics you can use (or ignore) but otherwise no behavior changes.

And that'll be it.  The "global" load-balanced stuff is likely to remain opt-in.

# Rollout plan for us

For deployment: any order is fine / should not behave (too) badly.  Even if "global" or either shadow mode is selected on the initial deploy.  Frontends will have background `RatelimitUpdate` request failures until History is deployed, but that'll just mean it continues to use the "local" internal fallback and that's in practice the same behavior as "local" or "disabled", just slightly noisier.

The _smoothest_ deployment is: deploy everything on "disabled" or "local" (the default(s), so no requests are sent until deploy is done), then switch to "local-shadow-global" to warm global limiters / check that it's working, then "global" to use the global behavior.  

Rolling back is just the opposite.  Ideally disable things first to stop the requests, but even if you don't it should be fine.

In more detail:

1. At merge time, this will set the "key mode" (`frontend.globalRatelimiterMode`) to "disabled", which gets as close as is reasonably possible to acting _exactly_ like it did before this PR.
   - This is also effectively the panic button for the initial rollout.
2. Once that proves to not immediately explode, switch to "local" for all keys.  This will keep the current ratelimiter rates, but will start collecting and emitting ratelimiter-usage metrics, so we can make sure that doesn't explode eithe... (continued)

688 of 850 new or added lines in 29 files covered. (80.94%)

14 existing lines in 7 files now uncovered.

105310 of 147252 relevant lines covered (71.52%)

2625.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.06
/common/task/parallel_task_processor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/uber/cadence/common"
32
        "github.com/uber/cadence/common/backoff"
33
        "github.com/uber/cadence/common/dynamicconfig"
34
        "github.com/uber/cadence/common/log"
35
        "github.com/uber/cadence/common/log/tag"
36
        "github.com/uber/cadence/common/metrics"
37
)
38

39
type (
40
        // ParallelTaskProcessorOptions configs PriorityTaskProcessor
41
        ParallelTaskProcessorOptions struct {
42
                QueueSize   int
43
                WorkerCount dynamicconfig.IntPropertyFn
44
                RetryPolicy backoff.RetryPolicy
45
        }
46

47
        parallelTaskProcessorImpl struct {
48
                status           int32
49
                tasksCh          chan Task
50
                shutdownCh       chan struct{}
51
                workerShutdownCh []chan struct{}
52
                shutdownWG       sync.WaitGroup
53
                logger           log.Logger
54
                metricsScope     metrics.Scope
55
                options          *ParallelTaskProcessorOptions
56
        }
57
)
58

59
const (
60
        defaultMonitorTickerDuration = 5 * time.Second
61
)
62

63
var (
64
        // ErrTaskProcessorClosed is the error returned when submiting task to a stopped processor
65
        ErrTaskProcessorClosed = errors.New("task processor has already shutdown")
66
)
67

68
// NewParallelTaskProcessor creates a new PriorityTaskProcessor
69
func NewParallelTaskProcessor(
70
        logger log.Logger,
71
        metricsClient metrics.Client,
72
        options *ParallelTaskProcessorOptions,
73
) Processor {
45✔
74
        return &parallelTaskProcessorImpl{
45✔
75
                status:           common.DaemonStatusInitialized,
45✔
76
                tasksCh:          make(chan Task, options.QueueSize),
45✔
77
                shutdownCh:       make(chan struct{}),
45✔
78
                workerShutdownCh: make([]chan struct{}, 0, options.WorkerCount()),
45✔
79
                logger:           logger,
45✔
80
                metricsScope:     metricsClient.Scope(metrics.ParallelTaskProcessingScope),
45✔
81
                options:          options,
45✔
82
        }
45✔
83
}
45✔
84

85
func (p *parallelTaskProcessorImpl) Start() {
27✔
86
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
27✔
87
                return
×
88
        }
×
89

90
        initialWorkerCount := p.options.WorkerCount()
27✔
91

27✔
92
        p.shutdownWG.Add(initialWorkerCount)
27✔
93
        for i := 0; i < initialWorkerCount; i++ {
3,645✔
94
                shutdownCh := make(chan struct{})
3,618✔
95
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
3,618✔
96
                go p.taskWorker(shutdownCh)
3,618✔
97
        }
3,618✔
98

99
        p.shutdownWG.Add(1)
27✔
100
        go p.workerMonitor(defaultMonitorTickerDuration)
27✔
101

27✔
102
        p.logger.Info("Parallel task processor started.")
27✔
103
}
104

105
func (p *parallelTaskProcessorImpl) Stop() {
27✔
106
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
27✔
107
                return
×
108
        }
×
109

110
        close(p.shutdownCh)
27✔
111

27✔
112
        p.drainAndNackTasks()
27✔
113

27✔
114
        if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
27✔
115
                p.logger.Warn("Parallel task processor timedout on shutdown.")
×
116
        }
×
117
        p.logger.Info("Parallel task processor shutdown.")
27✔
118
}
119

120
func (p *parallelTaskProcessorImpl) Submit(task Task) error {
19,039✔
121
        p.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
19,039✔
122
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
19,039✔
123
        defer sw.Stop()
19,039✔
124

19,039✔
125
        if p.isStopped() {
28,094✔
126
                return ErrTaskProcessorClosed
9,055✔
127
        }
9,055✔
128

129
        select {
9,984✔
130
        case p.tasksCh <- task:
9,980✔
131
                if p.isStopped() {
9,980✔
UNCOV
132
                        p.drainAndNackTasks()
×
UNCOV
133
                }
×
134
                return nil
9,980✔
135
        case <-p.shutdownCh:
4✔
136
                return ErrTaskProcessorClosed
4✔
137
        }
138
}
139

140
func (p *parallelTaskProcessorImpl) taskWorker(shutdownCh chan struct{}) {
3,644✔
141
        defer p.shutdownWG.Done()
3,644✔
142

3,644✔
143
        for {
17,169✔
144
                select {
13,525✔
145
                case <-shutdownCh:
3,644✔
146
                        return
3,644✔
147
                case task := <-p.tasksCh:
9,884✔
148
                        p.executeTask(task, shutdownCh)
9,884✔
149
                }
150
        }
151
}
152

153
func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struct{}) {
9,888✔
154
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskTaskProcessingLatency)
9,888✔
155
        defer sw.Stop()
9,888✔
156

9,888✔
157
        defer func() {
19,776✔
158
                if r := recover(); r != nil {
9,889✔
159
                        p.logger.Error("recovered panic in task execution", tag.Dynamic("recovered-panic", r))
1✔
160
                        task.HandleErr(fmt.Errorf("recovered panic: %v", r))
1✔
161
                        task.Nack()
1✔
162
                }
1✔
163
        }()
164

165
        op := func() error {
19,975✔
166
                if err := task.Execute(); err != nil {
10,339✔
167
                        return task.HandleErr(err)
252✔
168
                }
252✔
169
                return nil
9,837✔
170
        }
171

172
        isRetryable := func(err error) bool {
10,140✔
173
                select {
252✔
174
                case <-shutdownCh:
1✔
175
                        return false
1✔
176
                default:
251✔
177
                }
178

179
                return task.RetryErr(err)
251✔
180
        }
181
        throttleRetry := backoff.NewThrottleRetry(
9,888✔
182
                backoff.WithRetryPolicy(p.options.RetryPolicy),
9,888✔
183
                backoff.WithRetryableError(isRetryable),
9,888✔
184
        )
9,888✔
185

9,888✔
186
        if err := throttleRetry.Do(context.Background(), op); err != nil {
9,941✔
187
                // non-retryable error or exhausted all retries or worker shutdown
53✔
188
                task.Nack()
53✔
189
                return
53✔
190
        }
53✔
191

192
        // no error
193
        task.Ack()
9,837✔
194
}
195

196
func (p *parallelTaskProcessorImpl) workerMonitor(tickerDuration time.Duration) {
28✔
197
        defer p.shutdownWG.Done()
28✔
198

28✔
199
        ticker := time.NewTicker(tickerDuration)
28✔
200

28✔
201
        for {
431✔
202
                select {
403✔
203
                case <-p.shutdownCh:
28✔
204
                        ticker.Stop()
28✔
205
                        p.removeWorker(len(p.workerShutdownCh))
28✔
206
                        return
28✔
207
                case <-ticker.C:
378✔
208
                        targetWorkerCount := p.options.WorkerCount()
378✔
209
                        currentWorkerCount := len(p.workerShutdownCh)
378✔
210
                        p.addWorker(targetWorkerCount - currentWorkerCount)
378✔
211
                        p.removeWorker(currentWorkerCount - targetWorkerCount)
378✔
212
                }
213
        }
214
}
215

216
func (p *parallelTaskProcessorImpl) addWorker(count int) {
381✔
217
        for i := 0; i < count; i++ {
406✔
218
                shutdownCh := make(chan struct{})
25✔
219
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
25✔
220

25✔
221
                p.shutdownWG.Add(1)
25✔
222
                go p.taskWorker(shutdownCh)
25✔
223
        }
25✔
224
}
225

226
func (p *parallelTaskProcessorImpl) removeWorker(count int) {
405✔
227
        if count <= 0 {
783✔
228
                return
378✔
229
        }
378✔
230

231
        currentWorkerCount := len(p.workerShutdownCh)
30✔
232
        if count > currentWorkerCount {
30✔
233
                count = currentWorkerCount
×
234
        }
×
235

236
        shutdownChToClose := p.workerShutdownCh[currentWorkerCount-count:]
30✔
237
        p.workerShutdownCh = p.workerShutdownCh[:currentWorkerCount-count]
30✔
238

30✔
239
        for _, shutdownCh := range shutdownChToClose {
3,657✔
240
                close(shutdownCh)
3,627✔
241
        }
3,627✔
242
}
243

244
func (p *parallelTaskProcessorImpl) drainAndNackTasks() {
27✔
245
        for {
150✔
246
                select {
123✔
247
                case task := <-p.tasksCh:
96✔
248
                        task.Nack()
96✔
249
                default:
27✔
250
                        return
27✔
251
                }
252
        }
253
}
254

255
func (p *parallelTaskProcessorImpl) isStopped() bool {
29,016✔
256
        return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped
29,016✔
257
}
29,016✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc