• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01907562-f5f0-40b6-8908-451d758b6138

02 Jul 2024 09:37PM UTC coverage: 71.512% (+0.003%) from 71.509%
01907562-f5f0-40b6-8908-451d758b6138

Pull #6155

buildkite

Groxx
Stop the ratelimiter collections when stopping the service
Pull Request #6155: Stop the ratelimiter collections when stopping the service

9 of 17 new or added lines in 1 file covered. (52.94%)

22 existing lines in 7 files now uncovered.

105315 of 147269 relevant lines covered (71.51%)

2600.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/service/matching/tasklist/matcher.go
1
// Copyright (c) 2019 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package tasklist
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "time"
28

29
        "github.com/uber/cadence/common/clock"
30
        "github.com/uber/cadence/common/log"
31
        "github.com/uber/cadence/common/log/tag"
32
        "github.com/uber/cadence/common/metrics"
33
        "github.com/uber/cadence/common/quotas"
34
        "github.com/uber/cadence/common/types"
35
        "github.com/uber/cadence/service/matching/config"
36
)
37

38
// TaskMatcher matches a task producer with a task consumer
39
// Producers are usually rpc calls from history or taskReader
40
// that drains backlog from db. Consumers are the task list pollers
41
type TaskMatcher struct {
42
        log log.Logger
43
        // synchronous task channel to match producer/consumer for any isolation group
44
        // tasks having no isolation requirement are added to this channel
45
        // and pollers from all isolation groups read from this channel
46
        taskC chan *InternalTask
47
        // synchronos task channels to match producer/consumer for a certain isolation group
48
        // the key is the name of the isolation group
49
        isolatedTaskC map[string]chan *InternalTask
50
        // synchronous task channel to match query task - the reason to have
51
        // separate channel for this is because there are cases when consumers
52
        // are interested in queryTasks but not others. Example is when domain is
53
        // not active in a cluster
54
        queryTaskC chan *InternalTask
55
        // ratelimiter that limits the rate at which tasks can be dispatched to consumers
56
        limiter *quotas.RateLimiter
57

58
        fwdr          *Forwarder
59
        scope         metrics.Scope // domain metric scope
60
        numPartitions func() int    // number of task list partitions
61
}
62

63
// ErrTasklistThrottled implies a tasklist was throttled
64
var ErrTasklistThrottled = errors.New("tasklist limit exceeded")
65

66
// newTaskMatcher returns an task matcher instance. The returned instance can be
67
// used by task producers and consumers to find a match. Both sync matches and non-sync
68
// matches should use this implementation
69
func newTaskMatcher(config *config.TaskListConfig, fwdr *Forwarder, scope metrics.Scope, isolationGroups []string, log log.Logger) *TaskMatcher {
1,403✔
70
        dPtr := config.TaskDispatchRPS
1,403✔
71
        limiter := quotas.NewRateLimiter(&dPtr, config.TaskDispatchRPSTTL, config.MinTaskThrottlingBurstSize())
1,403✔
72
        isolatedTaskC := make(map[string]chan *InternalTask)
1,403✔
73
        for _, g := range isolationGroups {
1,481✔
74
                isolatedTaskC[g] = make(chan *InternalTask)
78✔
75
        }
78✔
76
        return &TaskMatcher{
1,403✔
77
                log:           log,
1,403✔
78
                limiter:       limiter,
1,403✔
79
                scope:         scope,
1,403✔
80
                fwdr:          fwdr,
1,403✔
81
                taskC:         make(chan *InternalTask),
1,403✔
82
                isolatedTaskC: isolatedTaskC,
1,403✔
83
                queryTaskC:    make(chan *InternalTask),
1,403✔
84
                numPartitions: config.NumReadPartitions,
1,403✔
85
        }
1,403✔
86
}
87

88
// Offer offers a task to a potential consumer (poller)
89
// If the task is successfully matched with a consumer, this
90
// method will return true and no error. If the task is matched
91
// but consumer returned error, then this method will return
92
// true and error message. This method should not be used for query
93
// task. This method should ONLY be used for sync match.
94
//
95
// When a local poller is not available and forwarding to a parent
96
// task list partition is possible, this method will attempt forwarding
97
// to the parent partition.
98
//
99
// Cases when this method will block:
100
//
101
// Ratelimit:
102
// When a ratelimit token is not available, this method might block
103
// waiting for a token until the provided context timeout. Rate limits are
104
// not enforced for forwarded tasks from child partition.
105
//
106
// Forwarded tasks that originated from db backlog:
107
// When this method is called with a task that is forwarded from a
108
// remote partition and if (1) this task list is root (2) task
109
// was from db backlog - this method will block until context timeout
110
// trying to match with a poller. The caller is expected to set the
111
// correct context timeout.
112
//
113
// returns error when:
114
//   - ratelimit is exceeded (does not apply to query task)
115
//   - context deadline is exceeded
116
//   - task is matched and consumer returns error in response channel
117
func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, error) {
17,338✔
118
        if !task.IsForwarded() {
20,352✔
119
                err := tm.ratelimit(ctx)
3,014✔
120
                if err != nil {
3,016✔
121
                        tm.scope.IncCounter(metrics.SyncThrottlePerTaskListCounter)
2✔
122
                        return false, err
2✔
123
                }
2✔
124
        }
125

126
        select {
17,336✔
127
        case tm.getTaskC(task) <- task: // poller picked up the task
1,131✔
128
                if task.ResponseC != nil {
2,262✔
129
                        // if there is a response channel, block until resp is received
1,131✔
130
                        // and return error if the response contains error
1,131✔
131
                        err := <-task.ResponseC
1,131✔
132
                        return true, err
1,131✔
133
                }
1,131✔
134
                return false, nil
×
135
        default:
16,205✔
136
                // no poller waiting for tasks, try forwarding this task to the
16,205✔
137
                // root partition if possible
16,205✔
138
                select {
16,205✔
139
                case token := <-tm.fwdrAddReqTokenC():
1,089✔
140
                        if err := tm.fwdr.ForwardTask(ctx, task); err == nil {
1,770✔
141
                                // task was remotely sync matched on the parent partition
681✔
142
                                token.release("")
681✔
143
                                return true, nil
681✔
144
                        }
681✔
145
                        token.release("")
408✔
146
                default:
15,116✔
147
                        if !tm.isForwardingAllowed() && // we are the root partition and forwarding is not possible
15,116✔
148
                                task.source == types.TaskSourceDbBacklog && // task was from backlog (stored in db)
15,116✔
149
                                task.IsForwarded() { // task came from a child partition
28,181✔
150
                                // a forwarded backlog task from a child partition, block trying
13,065✔
151
                                // to match with a poller until ctx timeout
13,065✔
152
                                return tm.offerOrTimeout(ctx, task)
13,065✔
153
                        }
13,065✔
154
                }
155

156
                return false, nil
2,459✔
157
        }
158
}
159

160
func (tm *TaskMatcher) offerOrTimeout(ctx context.Context, task *InternalTask) (bool, error) {
13,065✔
161
        select {
13,065✔
162
        case tm.getTaskC(task) <- task: // poller picked up the task
90✔
163
                if task.ResponseC != nil {
180✔
164
                        select {
90✔
165
                        case err := <-task.ResponseC:
90✔
166
                                return true, err
90✔
UNCOV
167
                        case <-ctx.Done():
×
UNCOV
168
                                return false, nil
×
169
                        }
170
                }
171
                return task.ActivityTaskDispatchInfo != nil, nil
×
172
        case <-ctx.Done():
12,975✔
173
                return false, nil
12,975✔
174
        }
175
}
176

177
// OfferQuery will either match task to local poller or will forward query task.
178
// Local match is always attempted before forwarding is attempted. If local match occurs
179
// response and error are both nil, if forwarding occurs then response or error is returned.
180
func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *InternalTask) (*types.QueryWorkflowResponse, error) {
49✔
181
        select {
49✔
182
        case tm.queryTaskC <- task:
23✔
183
                <-task.ResponseC
23✔
184
                return nil, nil
23✔
185
        default:
26✔
186
        }
187

188
        fwdrTokenC := tm.fwdrAddReqTokenC()
26✔
189

26✔
190
        for {
53✔
191
                select {
27✔
192
                case tm.queryTaskC <- task:
10✔
193
                        <-task.ResponseC
10✔
194
                        return nil, nil
10✔
195
                case token := <-fwdrTokenC:
14✔
196
                        resp, err := tm.fwdr.ForwardQueryTask(ctx, task)
14✔
197
                        token.release("")
14✔
198
                        if err == nil {
26✔
199
                                return resp, nil
12✔
200
                        }
12✔
201
                        if err == errForwarderSlowDown {
3✔
202
                                // if we are rate limited, try only local match for the
1✔
203
                                // remainder of the context timeout left
1✔
204
                                fwdrTokenC = noopForwarderTokenC
1✔
205
                                continue
1✔
206
                        }
207
                        return nil, err
1✔
208
                case <-ctx.Done():
3✔
209
                        return nil, ctx.Err()
3✔
210
                }
211
        }
212
}
213

214
// MustOffer blocks until a consumer is found to handle this task
215
// Returns error only when context is canceled, expired or the ratelimit is set to zero (allow nothing)
216
func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error {
11,940✔
217
        if err := tm.ratelimit(ctx); err != nil {
11,941✔
218
                return fmt.Errorf("rate limit error dispatching: %w", err)
1✔
219
        }
1✔
220

221
        // attempt a match with local poller first. When that
222
        // doesn't succeed, try both local match and remote match
223
        taskC := tm.getTaskC(task)
11,939✔
224
        select {
11,939✔
225
        case taskC <- task: // poller picked up the task
174✔
226
                return nil
174✔
227
        case <-ctx.Done():
×
228
                return fmt.Errorf("context done when trying to forward local task: %w", ctx.Err())
×
229
        default:
11,765✔
230
        }
231

232
forLoop:
11,765✔
233
        for {
34,355✔
234
                select {
22,590✔
235
                case taskC <- task: // poller picked up the task
108✔
236
                        return nil
108✔
237
                case token := <-tm.fwdrAddReqTokenC():
15,439✔
238
                        childCtx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*2))
15,439✔
239
                        err := tm.fwdr.ForwardTask(childCtx, task)
15,439✔
240
                        token.release("")
15,439✔
241
                        if err != nil {
30,618✔
242

15,179✔
243
                                tm.log.Debug("failed to forward task",
15,179✔
244
                                        tag.Error(err),
15,179✔
245
                                        tag.TaskID(task.Event.TaskID),
15,179✔
246
                                )
15,179✔
247
                                // forwarder returns error only when the call is rate limited. To
15,179✔
248
                                // avoid a busy loop on such rate limiting events, we only attempt to make
15,179✔
249
                                // the next forwarded call after this childCtx expires. Till then, we block
15,179✔
250
                                // hoping for a local poller match
15,179✔
251
                                select {
15,179✔
252
                                case taskC <- task: // poller picked up the task
×
253
                                        cancel()
×
254
                                        return nil
×
255
                                case <-childCtx.Done():
10,825✔
256
                                case <-ctx.Done():
4,354✔
257
                                        cancel()
4,354✔
258
                                        return fmt.Errorf("failed to dispatch after failing to forward task: %w", ctx.Err())
4,354✔
259
                                }
260
                                cancel()
10,825✔
261
                                continue forLoop
10,825✔
262
                        }
263
                        cancel()
260✔
264
                        // at this point, we forwarded the task to a parent partition which
260✔
265
                        // in turn dispatched the task to a poller. Make sure we delete the
260✔
266
                        // task from the database
260✔
267
                        task.Finish(nil)
260✔
268
                        return nil
260✔
269
                case <-ctx.Done():
7,043✔
270
                        return fmt.Errorf("failed to offer task: %w", ctx.Err())
7,043✔
271
                }
272
        }
273
}
274

275
// Poll blocks until a task is found or context deadline is exceeded
276
// On success, the returned task could be a query task or a regular task
277
// Returns ErrNoTasks when context deadline is exceeded
278
func (tm *TaskMatcher) Poll(ctx context.Context, isolationGroup string) (*InternalTask, error) {
2,845✔
279
        isolatedTaskC, ok := tm.isolatedTaskC[isolationGroup]
2,845✔
280
        if !ok && isolationGroup != "" {
2,846✔
281
                // fallback to default isolation group instead of making poller crash if the isolation group is invalid
1✔
282
                isolatedTaskC = tm.taskC
1✔
283
                tm.scope.IncCounter(metrics.PollerInvalidIsolationGroupCounter)
1✔
284
        }
1✔
285
        // try local match first without blocking until context timeout
286
        if task, err := tm.pollNonBlocking(ctx, isolatedTaskC, tm.taskC, tm.queryTaskC); err == nil {
3,046✔
287
                return task, nil
201✔
288
        }
201✔
289
        // there is no local poller available to pickup this task. Now block waiting
290
        // either for a local poller or a forwarding token to be available. When a
291
        // forwarding token becomes available, send this poll to a parent partition
292
        tm.log.Debug("falling back to non-local polling",
2,644✔
293
                tag.IsolationGroup(isolationGroup),
2,644✔
294
                tag.Dynamic("isolated channel", len(isolatedTaskC)),
2,644✔
295
                tag.Dynamic("fallback channel", len(tm.taskC)),
2,644✔
296
        )
2,644✔
297
        return tm.pollOrForward(ctx, isolationGroup, isolatedTaskC, tm.taskC, tm.queryTaskC)
2,644✔
298
}
299

300
// PollForQuery blocks until a *query* task is found or context deadline is exceeded
301
// Returns ErrNoTasks when context deadline is exceeded
302
func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*InternalTask, error) {
5✔
303
        // try local match first without blocking until context timeout
5✔
304
        if task, err := tm.pollNonBlocking(ctx, nil, nil, tm.queryTaskC); err == nil {
7✔
305
                return task, nil
2✔
306
        }
2✔
307
        // there is no local poller available to pickup this task. Now block waiting
308
        // either for a local poller or a forwarding token to be available. When a
309
        // forwarding token becomes available, send this poll to a parent partition
310
        return tm.pollOrForward(ctx, "", nil, nil, tm.queryTaskC)
3✔
311
}
312

313
// UpdateRatelimit updates the task dispatch rate
314
func (tm *TaskMatcher) UpdateRatelimit(rps *float64) {
2,827✔
315
        if rps == nil {
5,527✔
316
                return
2,700✔
317
        }
2,700✔
318
        rate := *rps
127✔
319
        nPartitions := tm.numPartitions()
127✔
320
        if rate > float64(nPartitions) {
253✔
321
                // divide the rate equally across all partitions
126✔
322
                rate = rate / float64(tm.numPartitions())
126✔
323
        }
126✔
324
        tm.limiter.UpdateMaxDispatch(&rate)
127✔
325
}
326

327
// Rate returns the current rate at which tasks are dispatched
328
func (tm *TaskMatcher) Rate() float64 {
11,961✔
329
        return tm.limiter.Limit()
11,961✔
330
}
11,961✔
331

332
func (tm *TaskMatcher) pollOrForward(
333
        ctx context.Context,
334
        isolationGroup string,
335
        isolatedTaskC <-chan *InternalTask,
336
        taskC <-chan *InternalTask,
337
        queryTaskC <-chan *InternalTask,
338
) (*InternalTask, error) {
2,647✔
339
        select {
2,647✔
340
        case task := <-isolatedTaskC:
3✔
341
                if task.ResponseC != nil {
5✔
342
                        tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
2✔
343
                }
2✔
344
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
3✔
345
                return task, nil
3✔
346
        case task := <-taskC:
1,307✔
347
                if task.ResponseC != nil {
2,437✔
348
                        tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
1,130✔
349
                }
1,130✔
350
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
1,307✔
351
                return task, nil
1,307✔
352
        case task := <-queryTaskC:
23✔
353
                tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
23✔
354
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
23✔
355
                return task, nil
23✔
356
        case <-ctx.Done():
245✔
357
                tm.scope.IncCounter(metrics.PollTimeoutPerTaskListCounter)
245✔
358
                return nil, ErrNoTasks
245✔
359
        case token := <-tm.fwdrPollReqTokenC(isolationGroup):
1,069✔
360
                if task, err := tm.fwdr.ForwardPoll(ctx); err == nil {
2,074✔
361
                        token.release(isolationGroup)
1,005✔
362
                        return task, nil
1,005✔
363
                }
1,005✔
364
                token.release(isolationGroup)
64✔
365
                return tm.poll(ctx, isolatedTaskC, taskC, queryTaskC)
64✔
366
        }
367
}
368

369
func (tm *TaskMatcher) poll(
370
        ctx context.Context,
371
        isolatedTaskC <-chan *InternalTask,
372
        taskC <-chan *InternalTask,
373
        queryTaskC <-chan *InternalTask,
374
) (*InternalTask, error) {
64✔
375
        select {
64✔
376
        case task := <-isolatedTaskC:
×
377
                if task.ResponseC != nil {
×
378
                        tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
×
379
                }
×
380
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
×
381
                return task, nil
×
382
        case task := <-taskC:
×
383
                if task.ResponseC != nil {
×
384
                        tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
×
385
                }
×
386
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
×
387
                return task, nil
×
388
        case task := <-queryTaskC:
×
389
                tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
×
390
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
×
391
                return task, nil
×
392
        case <-ctx.Done():
64✔
393
                tm.scope.IncCounter(metrics.PollTimeoutPerTaskListCounter)
64✔
394
                return nil, ErrNoTasks
64✔
395
        }
396
}
397

398
func (tm *TaskMatcher) pollNonBlocking(
399
        ctx context.Context,
400
        isolatedTaskC <-chan *InternalTask,
401
        taskC <-chan *InternalTask,
402
        queryTaskC <-chan *InternalTask,
403
) (*InternalTask, error) {
2,850✔
404
        select {
2,850✔
405
        case task := <-isolatedTaskC:
2✔
406
                if task.ResponseC != nil {
4✔
407
                        tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
2✔
408
                }
2✔
409
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
2✔
410
                return task, nil
2✔
411
        case task := <-taskC:
191✔
412
                if task.ResponseC != nil {
278✔
413
                        tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
87✔
414
                }
87✔
415
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
191✔
416
                return task, nil
191✔
417
        case task := <-queryTaskC:
10✔
418
                tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
10✔
419
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
10✔
420
                return task, nil
10✔
421
        default:
2,647✔
422
                return nil, ErrNoTasks
2,647✔
423
        }
424
}
425

426
func (tm *TaskMatcher) fwdrPollReqTokenC(isolationGroup string) <-chan *ForwarderReqToken {
2,647✔
427
        if tm.fwdr == nil {
4,196✔
428
                return noopForwarderTokenC
1,549✔
429
        }
1,549✔
430
        return tm.fwdr.PollReqTokenC(isolationGroup)
1,098✔
431
}
432

433
func (tm *TaskMatcher) fwdrAddReqTokenC() <-chan *ForwarderReqToken {
38,821✔
434
        if tm.fwdr == nil {
58,870✔
435
                return noopForwarderTokenC
20,049✔
436
        }
20,049✔
437
        return tm.fwdr.AddReqTokenC()
18,772✔
438
}
439

440
func (tm *TaskMatcher) ratelimit(ctx context.Context) error {
14,963✔
441
        err := tm.limiter.Wait(ctx)
14,963✔
442
        if errors.Is(err, clock.ErrCannotWait) {
14,964✔
443
                // "err != ctx.Err()" may also be correct, as that would mean "gave up due to context".
1✔
444
                //
1✔
445
                // in this branch: either the request would wait longer than ctx's timeout,
1✔
446
                // or the limiter's config does not allow any operations at all (burst 0).
1✔
447
                // in either case, this is returned immediately.
1✔
448
                return ErrTasklistThrottled
1✔
449
        }
1✔
450
        return err // nil if success, non-nil if canceled
14,962✔
451
}
452

453
func (tm *TaskMatcher) isForwardingAllowed() bool {
15,116✔
454
        return tm.fwdr != nil
15,116✔
455
}
15,116✔
456

457
func (tm *TaskMatcher) getTaskC(task *InternalTask) chan<- *InternalTask {
42,340✔
458
        taskC := tm.taskC
42,340✔
459
        if isolatedTaskC, ok := tm.isolatedTaskC[task.isolationGroup]; ok && task.isolationGroup != "" {
42,351✔
460
                taskC = isolatedTaskC
11✔
461
        }
11✔
462
        return taskC
42,340✔
463
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc