• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018f185f-6b8d-4346-91c0-8e814148aa4f

26 Apr 2024 03:07AM UTC coverage: 67.782% (+0.08%) from 67.702%
018f185f-6b8d-4346-91c0-8e814148aa4f

push

buildkite

web-flow
Global ratelimiter, part 2: Any-typed RPCs, mappers, and stub handler (#5817)

Docs are broadly the same as https://github.com/uber/cadence-idl/pull/168, but are repeated here for visibility with some additions at the end for stuff unique to this repo:

# Any type:
This API is purely for inter-Cadence-service use, and the Thrift equivalent is in the github.com/uber/cadence-idl repository: https://github.com/uber/cadence-idl/pull/168

Design-wise: this has gone through several adjustment and experimenting rounds, and nearly all top-level keys I can come up with are either totally insufficient or totally unnecessary for one ratelimit design or another, so I've leaned more and more towards "define nothing, allow passing anything".

In the extreme, this means: just one `Any` field.  We can add more later if needed / if we discover some kind of truly universal data that is always worth including.
Obviously this is not super ergonomic, but as the ratelimiter is intended to be pluggable by outside implementations it cannot *wholly* be defined in internal protocol definitions.  There needs to be an extendable type of some kind, and arbitrarily requiring e.g. a `list<?>` or `map<string,?>` doesn't actually lend any beneficial semantics to the system, nor reduce space on the wire.

Implementers will need to maintain in-process definitions on both ends of the protocol, and check type IDs before decoding.  Generally speaking this is probably best done with either a shared protobuf-or-something definition (anywhere) or something schema-free like JSON, so cross-version communication can be done safely e.g. during server upgrades.

# Intended use
Eventually this will be part of a pluggable global-ratelimit-data-exchange system, for anything that can make use of the high level "periodically check in to sharded hosts, share data, update limits" pattern that github.com/uber/cadence/common/quotas/global is building up.

The first version will end up using... (continued)

65 of 65 new or added lines in 5 files covered. (100.0%)

11 existing lines in 6 files now uncovered.

99468 of 146748 relevant lines covered (67.78%)

2457.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.06
/common/task/parallel_task_processor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/uber/cadence/common"
32
        "github.com/uber/cadence/common/backoff"
33
        "github.com/uber/cadence/common/dynamicconfig"
34
        "github.com/uber/cadence/common/log"
35
        "github.com/uber/cadence/common/log/tag"
36
        "github.com/uber/cadence/common/metrics"
37
)
38

39
type (
40
        // ParallelTaskProcessorOptions configs PriorityTaskProcessor
41
        ParallelTaskProcessorOptions struct {
42
                QueueSize   int
43
                WorkerCount dynamicconfig.IntPropertyFn
44
                RetryPolicy backoff.RetryPolicy
45
        }
46

47
        parallelTaskProcessorImpl struct {
48
                status           int32
49
                tasksCh          chan Task
50
                shutdownCh       chan struct{}
51
                workerShutdownCh []chan struct{}
52
                shutdownWG       sync.WaitGroup
53
                logger           log.Logger
54
                metricsScope     metrics.Scope
55
                options          *ParallelTaskProcessorOptions
56
        }
57
)
58

59
const (
60
        defaultMonitorTickerDuration = 5 * time.Second
61
)
62

63
var (
64
        // ErrTaskProcessorClosed is the error returned when submiting task to a stopped processor
65
        ErrTaskProcessorClosed = errors.New("task processor has already shutdown")
66
)
67

68
// NewParallelTaskProcessor creates a new PriorityTaskProcessor
69
func NewParallelTaskProcessor(
70
        logger log.Logger,
71
        metricsClient metrics.Client,
72
        options *ParallelTaskProcessorOptions,
73
) Processor {
42✔
74
        return &parallelTaskProcessorImpl{
42✔
75
                status:           common.DaemonStatusInitialized,
42✔
76
                tasksCh:          make(chan Task, options.QueueSize),
42✔
77
                shutdownCh:       make(chan struct{}),
42✔
78
                workerShutdownCh: make([]chan struct{}, 0, options.WorkerCount()),
42✔
79
                logger:           logger,
42✔
80
                metricsScope:     metricsClient.Scope(metrics.ParallelTaskProcessingScope),
42✔
81
                options:          options,
42✔
82
        }
42✔
83
}
42✔
84

85
func (p *parallelTaskProcessorImpl) Start() {
24✔
86
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
24✔
87
                return
×
88
        }
×
89

90
        initialWorkerCount := p.options.WorkerCount()
24✔
91

24✔
92
        p.shutdownWG.Add(initialWorkerCount)
24✔
93
        for i := 0; i < initialWorkerCount; i++ {
3,042✔
94
                shutdownCh := make(chan struct{})
3,018✔
95
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
3,018✔
96
                go p.taskWorker(shutdownCh)
3,018✔
97
        }
3,018✔
98

99
        p.shutdownWG.Add(1)
24✔
100
        go p.workerMonitor(defaultMonitorTickerDuration)
24✔
101

24✔
102
        p.logger.Info("Parallel task processor started.")
24✔
103
}
104

105
func (p *parallelTaskProcessorImpl) Stop() {
24✔
106
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
24✔
107
                return
×
108
        }
×
109

110
        close(p.shutdownCh)
24✔
111

24✔
112
        p.drainAndNackTasks()
24✔
113

24✔
114
        if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
24✔
115
                p.logger.Warn("Parallel task processor timedout on shutdown.")
×
116
        }
×
117
        p.logger.Info("Parallel task processor shutdown.")
24✔
118
}
119

120
func (p *parallelTaskProcessorImpl) Submit(task Task) error {
18,426✔
121
        p.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
18,426✔
122
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
18,426✔
123
        defer sw.Stop()
18,426✔
124

18,426✔
125
        if p.isStopped() {
27,250✔
126
                return ErrTaskProcessorClosed
8,824✔
127
        }
8,824✔
128

129
        select {
9,602✔
130
        case p.tasksCh <- task:
9,598✔
131
                if p.isStopped() {
9,598✔
UNCOV
132
                        p.drainAndNackTasks()
×
UNCOV
133
                }
×
134
                return nil
9,598✔
135
        case <-p.shutdownCh:
4✔
136
                return ErrTaskProcessorClosed
4✔
137
        }
138
}
139

140
func (p *parallelTaskProcessorImpl) taskWorker(shutdownCh chan struct{}) {
3,044✔
141
        defer p.shutdownWG.Done()
3,044✔
142

3,044✔
143
        for {
15,597✔
144
                select {
12,553✔
145
                case <-shutdownCh:
3,044✔
146
                        return
3,044✔
147
                case task := <-p.tasksCh:
9,512✔
148
                        p.executeTask(task, shutdownCh)
9,512✔
149
                }
150
        }
151
}
152

153
func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struct{}) {
9,516✔
154
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskTaskProcessingLatency)
9,516✔
155
        defer sw.Stop()
9,516✔
156

9,516✔
157
        defer func() {
19,032✔
158
                if r := recover(); r != nil {
9,517✔
159
                        p.logger.Error("recovered panic in task execution", tag.Dynamic("recovered-panic", r))
1✔
160
                        task.HandleErr(fmt.Errorf("recovered panic: %v", r))
1✔
161
                        task.Nack()
1✔
162
                }
1✔
163
        }()
164

165
        op := func() error {
19,131✔
166
                if err := task.Execute(); err != nil {
9,749✔
167
                        return task.HandleErr(err)
134✔
168
                }
134✔
169
                return nil
9,483✔
170
        }
171

172
        isRetryable := func(err error) bool {
9,650✔
173
                select {
134✔
174
                case <-shutdownCh:
1✔
175
                        return false
1✔
176
                default:
133✔
177
                }
178

179
                return task.RetryErr(err)
133✔
180
        }
181
        throttleRetry := backoff.NewThrottleRetry(
9,516✔
182
                backoff.WithRetryPolicy(p.options.RetryPolicy),
9,516✔
183
                backoff.WithRetryableError(isRetryable),
9,516✔
184
        )
9,516✔
185

9,516✔
186
        if err := throttleRetry.Do(context.Background(), op); err != nil {
9,551✔
187
                // non-retryable error or exhausted all retries or worker shutdown
35✔
188
                task.Nack()
35✔
189
                return
35✔
190
        }
35✔
191

192
        // no error
193
        task.Ack()
9,483✔
194
}
195

196
func (p *parallelTaskProcessorImpl) workerMonitor(tickerDuration time.Duration) {
25✔
197
        defer p.shutdownWG.Done()
25✔
198

25✔
199
        ticker := time.NewTicker(tickerDuration)
25✔
200

25✔
201
        for {
420✔
202
                select {
395✔
203
                case <-p.shutdownCh:
25✔
204
                        ticker.Stop()
25✔
205
                        p.removeWorker(len(p.workerShutdownCh))
25✔
206
                        return
25✔
207
                case <-ticker.C:
373✔
208
                        targetWorkerCount := p.options.WorkerCount()
373✔
209
                        currentWorkerCount := len(p.workerShutdownCh)
373✔
210
                        p.addWorker(targetWorkerCount - currentWorkerCount)
373✔
211
                        p.removeWorker(currentWorkerCount - targetWorkerCount)
373✔
212
                }
213
        }
214
}
215

216
func (p *parallelTaskProcessorImpl) addWorker(count int) {
376✔
217
        for i := 0; i < count; i++ {
401✔
218
                shutdownCh := make(chan struct{})
25✔
219
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
25✔
220

25✔
221
                p.shutdownWG.Add(1)
25✔
222
                go p.taskWorker(shutdownCh)
25✔
223
        }
25✔
224
}
225

226
func (p *parallelTaskProcessorImpl) removeWorker(count int) {
397✔
227
        if count <= 0 {
770✔
228
                return
373✔
229
        }
373✔
230

231
        currentWorkerCount := len(p.workerShutdownCh)
27✔
232
        if count > currentWorkerCount {
27✔
233
                count = currentWorkerCount
×
234
        }
×
235

236
        shutdownChToClose := p.workerShutdownCh[currentWorkerCount-count:]
27✔
237
        p.workerShutdownCh = p.workerShutdownCh[:currentWorkerCount-count]
27✔
238

27✔
239
        for _, shutdownCh := range shutdownChToClose {
3,054✔
240
                close(shutdownCh)
3,027✔
241
        }
3,027✔
242
}
243

244
func (p *parallelTaskProcessorImpl) drainAndNackTasks() {
24✔
245
        for {
134✔
246
                select {
110✔
247
                case task := <-p.tasksCh:
86✔
248
                        task.Nack()
86✔
249
                default:
24✔
250
                        return
24✔
251
                }
252
        }
253
}
254

255
func (p *parallelTaskProcessorImpl) isStopped() bool {
28,021✔
256
        return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped
28,021✔
257
}
28,021✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc