• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
No new info detected.

uber / cadence / 01875e2f-959c-4c4d-87af-1d7805759bcc

08 Apr 2023 12:26AM UTC coverage: 57.178% (+0.1%) from 57.072%
01875e2f-959c-4c4d-87af-1d7805759bcc

Pull #5197

buildkite

Steven L
bad cleanup -> good cleanup
Pull Request #5197: Demonstrate a way to get rid of the cadence-idl repo

85396 of 149351 relevant lines covered (57.18%)

2283.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.46
/service/matching/matcher.go
1
// Copyright (c) 2019 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package matching
22

23
import (
24
        "context"
25
        "errors"
26
        "time"
27

28
        "golang.org/x/time/rate"
29

30
        "github.com/uber/cadence/common/metrics"
31
        "github.com/uber/cadence/common/quotas"
32
        "github.com/uber/cadence/common/types"
33
)
34

35
// TaskMatcher matches a task producer with a task consumer
36
// Producers are usually rpc calls from history or taskReader
37
// that drains backlog from db. Consumers are the task list pollers
38
type TaskMatcher struct {
39
        // synchronous task channel to match producer/consumer
40
        taskC chan *InternalTask
41
        // synchronous task channel to match query task - the reason to have
42
        // separate channel for this is because there are cases when consumers
43
        // are interested in queryTasks but not others. Example is when domain is
44
        // not active in a cluster
45
        queryTaskC chan *InternalTask
46
        // ratelimiter that limits the rate at which tasks can be dispatched to consumers
47
        limiter *quotas.RateLimiter
48

49
        fwdr          *Forwarder
50
        scope         metrics.Scope // domain metric scope
51
        numPartitions func() int    // number of task list partitions
52
}
53

54
const (
55
        _defaultTaskDispatchRPS    = 100000.0
56
        _defaultTaskDispatchRPSTTL = 60 * time.Second
57
)
58

59
var errTasklistThrottled = errors.New("cannot add to tasklist, limit exceeded")
60

61
// newTaskMatcher returns an task matcher instance. The returned instance can be
62
// used by task producers and consumers to find a match. Both sync matches and non-sync
63
// matches should use this implementation
64
func newTaskMatcher(config *taskListConfig, fwdr *Forwarder, scope metrics.Scope) *TaskMatcher {
1,366✔
65
        dPtr := _defaultTaskDispatchRPS
1,366✔
66
        limiter := quotas.NewRateLimiter(&dPtr, _defaultTaskDispatchRPSTTL, config.MinTaskThrottlingBurstSize())
1,366✔
67
        return &TaskMatcher{
1,366✔
68
                limiter:       limiter,
1,366✔
69
                scope:         scope,
1,366✔
70
                fwdr:          fwdr,
1,366✔
71
                taskC:         make(chan *InternalTask),
1,366✔
72
                queryTaskC:    make(chan *InternalTask),
1,366✔
73
                numPartitions: config.NumReadPartitions,
1,366✔
74
        }
1,366✔
75
}
1,366✔
76

77
// Offer offers a task to a potential consumer (poller)
78
// If the task is successfully matched with a consumer, this
79
// method will return true and no error. If the task is matched
80
// but consumer returned error, then this method will return
81
// true and error message. This method should not be used for query
82
// task. This method should ONLY be used for sync match.
83
//
84
// When a local poller is not available and forwarding to a parent
85
// task list partition is possible, this method will attempt forwarding
86
// to the parent partition.
87
//
88
// Cases when this method will block:
89
//
90
// Ratelimit:
91
// When a ratelimit token is not available, this method might block
92
// waiting for a token until the provided context timeout. Rate limits are
93
// not enforced for forwarded tasks from child partition.
94
//
95
// Forwarded tasks that originated from db backlog:
96
// When this method is called with a task that is forwarded from a
97
// remote partition and if (1) this task list is root (2) task
98
// was from db backlog - this method will block until context timeout
99
// trying to match with a poller. The caller is expected to set the
100
// correct context timeout.
101
//
102
// returns error when:
103
//   - ratelimit is exceeded (does not apply to query task)
104
//   - context deadline is exceeded
105
//   - task is matched and consumer returns error in response channel
106
func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, error) {
28,104✔
107
        var err error
28,104✔
108
        var rsv *rate.Reservation
28,104✔
109
        if !task.isForwarded() {
42,568✔
110
                rsv, err = tm.ratelimit(ctx)
14,464✔
111
                if err != nil {
14,473✔
112
                        tm.scope.IncCounter(metrics.SyncThrottlePerTaskListCounter)
9✔
113
                        return false, err
9✔
114
                }
9✔
115
        }
116

117
        select {
28,095✔
118
        case tm.taskC <- task: // poller picked up the task
6,309✔
119
                if task.responseC != nil {
12,618✔
120
                        // if there is a response channel, block until resp is received
6,309✔
121
                        // and return error if the response contains error
6,309✔
122
                        err = <-task.responseC
6,309✔
123
                        return true, err
6,309✔
124
                }
6,309✔
125
                return false, nil
×
126
        default:
21,786✔
127
                // no poller waiting for tasks, try forwarding this task to the
21,786✔
128
                // root partition if possible
21,786✔
129
                select {
21,786✔
130
                case token := <-tm.fwdrAddReqTokenC():
1,065✔
131
                        if err := tm.fwdr.ForwardTask(ctx, task); err == nil {
1,593✔
132
                                // task was remotely sync matched on the parent partition
528✔
133
                                token.release()
528✔
134
                                return true, nil
528✔
135
                        }
528✔
136
                        token.release()
537✔
137
                default:
20,721✔
138
                        if !tm.isForwardingAllowed() && // we are the root partition and forwarding is not possible
20,721✔
139
                                task.source == types.TaskSourceDbBacklog && // task was from backlog (stored in db)
20,721✔
140
                                task.isForwarded() { // task came from a child partition
32,776✔
141
                                // a forwarded backlog task from a child partition, block trying
12,055✔
142
                                // to match with a poller until ctx timeout
12,055✔
143
                                return tm.offerOrTimeout(ctx, task)
12,055✔
144
                        }
12,055✔
145
                }
146

147
                if rsv != nil {
17,650✔
148
                        // there was a ratelimit token we consumed
8,447✔
149
                        // return it since we did not really do any work
8,447✔
150
                        rsv.Cancel()
8,447✔
151
                }
8,447✔
152
                return false, nil
9,203✔
153
        }
154
}
155

156
func (tm *TaskMatcher) offerOrTimeout(ctx context.Context, task *InternalTask) (bool, error) {
12,055✔
157
        select {
12,055✔
158
        case tm.taskC <- task: // poller picked up the task
97✔
159
                if task.responseC != nil {
194✔
160
                        select {
97✔
161
                        case err := <-task.responseC:
96✔
162
                                return true, err
96✔
163
                        case <-ctx.Done():
1✔
164
                                return false, nil
1✔
165
                        }
166
                }
167
                return task.activityTaskDispatchInfo != nil, nil
×
168
        case <-ctx.Done():
11,958✔
169
                return false, nil
11,958✔
170
        }
171
}
172

173
// OfferQuery will either match task to local poller or will forward query task.
174
// Local match is always attempted before forwarding is attempted. If local match occurs
175
// response and error are both nil, if forwarding occurs then response or error is returned.
176
func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *InternalTask) (*types.QueryWorkflowResponse, error) {
51✔
177
        select {
51✔
178
        case tm.queryTaskC <- task:
25✔
179
                <-task.responseC
25✔
180
                return nil, nil
25✔
181
        default:
26✔
182
        }
183

184
        fwdrTokenC := tm.fwdrAddReqTokenC()
26✔
185

26✔
186
        for {
53✔
187
                select {
27✔
188
                case tm.queryTaskC <- task:
8✔
189
                        <-task.responseC
8✔
190
                        return nil, nil
8✔
191
                case token := <-fwdrTokenC:
16✔
192
                        resp, err := tm.fwdr.ForwardQueryTask(ctx, task)
16✔
193
                        token.release()
16✔
194
                        if err == nil {
30✔
195
                                return resp, nil
14✔
196
                        }
14✔
197
                        if err == errForwarderSlowDown {
3✔
198
                                // if we are rate limited, try only local match for the
1✔
199
                                // remainder of the context timeout left
1✔
200
                                fwdrTokenC = noopForwarderTokenC
1✔
201
                                continue
1✔
202
                        }
203
                        return nil, err
1✔
204
                case <-ctx.Done():
3✔
205
                        return nil, ctx.Err()
3✔
206
                }
207
        }
208
}
209

210
// MustOffer blocks until a consumer is found to handle this task
211
// Returns error only when context is canceled or the ratelimit is set to zero (allow nothing)
212
// The passed in context MUST NOT have a deadline associated with it
213
func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error {
10,412✔
214
        if _, err := tm.ratelimit(ctx); err != nil {
11,467✔
215
                return err
1,055✔
216
        }
1,055✔
217

218
        // attempt a match with local poller first. When that
219
        // doesn't succeed, try both local match and remote match
220
        select {
9,357✔
221
        case tm.taskC <- task:
728✔
222
                return nil
728✔
223
        case <-ctx.Done():
×
224
                return ctx.Err()
×
225
        default:
8,629✔
226
        }
227

228
forLoop:
8,629✔
229
        for {
29,171✔
230
                select {
20,542✔
231
                case tm.taskC <- task:
7,975✔
232
                        return nil
7,975✔
233
                case token := <-tm.fwdrAddReqTokenC():
12,408✔
234
                        childCtx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*2))
12,408✔
235
                        err := tm.fwdr.ForwardTask(childCtx, task)
12,408✔
236
                        token.release()
12,408✔
237
                        if err != nil {
24,422✔
238
                                // forwarder returns error only when the call is rate limited. To
12,014✔
239
                                // avoid a busy loop on such rate limiting events, we only attempt to make
12,014✔
240
                                // the next forwarded call after this childCtx expires. Till then, we block
12,014✔
241
                                // hoping for a local poller match
12,014✔
242
                                select {
12,014✔
243
                                case tm.taskC <- task:
×
244
                                        cancel()
×
245
                                        return nil
×
246
                                case <-childCtx.Done():
11,913✔
247
                                case <-ctx.Done():
101✔
248
                                        cancel()
101✔
249
                                        return ctx.Err()
101✔
250
                                }
251
                                cancel()
11,913✔
252
                                continue forLoop
11,913✔
253
                        }
254
                        cancel()
394✔
255
                        // at this point, we forwarded the task to a parent partition which
394✔
256
                        // in turn dispatched the task to a poller. Make sure we delete the
394✔
257
                        // task from the database
394✔
258
                        task.finish(nil)
394✔
259
                        return nil
394✔
260
                case <-ctx.Done():
159✔
261
                        return ctx.Err()
159✔
262
                }
263
        }
264
}
265

266
// Poll blocks until a task is found or context deadline is exceeded
267
// On success, the returned task could be a query task or a regular task
268
// Returns ErrNoTasks when context deadline is exceeded
269
func (tm *TaskMatcher) Poll(ctx context.Context) (*InternalTask, error) {
16,432✔
270
        // try local match first without blocking until context timeout
16,432✔
271
        if task, err := tm.pollNonBlocking(ctx, tm.taskC, tm.queryTaskC); err == nil {
24,488✔
272
                return task, nil
8,056✔
273
        }
8,056✔
274
        // there is no local poller available to pickup this task. Now block waiting
275
        // either for a local poller or a forwarding token to be available. When a
276
        // forwarding token becomes available, send this poll to a parent partition
277
        return tm.pollOrForward(ctx, tm.taskC, tm.queryTaskC)
8,376✔
278
}
279

280
// PollForQuery blocks until a *query* task is found or context deadline is exceeded
281
// Returns ErrNoTasks when context deadline is exceeded
282
func (tm *TaskMatcher) PollForQuery(ctx context.Context) (*InternalTask, error) {
5✔
283
        // try local match first without blocking until context timeout
5✔
284
        if task, err := tm.pollNonBlocking(ctx, nil, tm.queryTaskC); err == nil {
7✔
285
                return task, nil
2✔
286
        }
2✔
287
        // there is no local poller available to pickup this task. Now block waiting
288
        // either for a local poller or a forwarding token to be available. When a
289
        // forwarding token becomes available, send this poll to a parent partition
290
        return tm.pollOrForward(ctx, nil, tm.queryTaskC)
3✔
291
}
292

293
// UpdateRatelimit updates the task dispatch rate
294
func (tm *TaskMatcher) UpdateRatelimit(rps *float64) {
16,424✔
295
        if rps == nil {
28,702✔
296
                return
12,278✔
297
        }
12,278✔
298
        rate := *rps
4,146✔
299
        nPartitions := tm.numPartitions()
4,146✔
300
        if rate > float64(nPartitions) {
8,280✔
301
                // divide the rate equally across all partitions
4,134✔
302
                rate = rate / float64(tm.numPartitions())
4,134✔
303
        }
4,134✔
304
        tm.limiter.UpdateMaxDispatch(&rate)
4,146✔
305
}
306

307
// Rate returns the current rate at which tasks are dispatched
308
func (tm *TaskMatcher) Rate() float64 {
5✔
309
        return tm.limiter.Limit()
5✔
310
}
5✔
311

312
func (tm *TaskMatcher) pollOrForward(
313
        ctx context.Context,
314
        taskC <-chan *InternalTask,
315
        queryTaskC <-chan *InternalTask,
316
) (*InternalTask, error) {
8,379✔
317
        select {
8,379✔
318
        case task := <-taskC:
7,059✔
319
                if task.responseC != nil {
13,368✔
320
                        tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
6,309✔
321
                }
6,309✔
322
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
7,059✔
323
                return task, nil
7,059✔
324
        case task := <-queryTaskC:
25✔
325
                tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
25✔
326
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
25✔
327
                return task, nil
25✔
328
        case <-ctx.Done():
292✔
329
                tm.scope.IncCounter(metrics.PollTimeoutPerTaskListCounter)
292✔
330
                return nil, ErrNoTasks
292✔
331
        case token := <-tm.fwdrPollReqTokenC():
1,003✔
332
                if task, err := tm.fwdr.ForwardPoll(ctx); err == nil {
1,955✔
333
                        token.release()
952✔
334
                        return task, nil
952✔
335
                }
952✔
336
                token.release()
51✔
337
                return tm.poll(ctx, taskC, queryTaskC)
51✔
338
        }
339
}
340

341
func (tm *TaskMatcher) poll(
342
        ctx context.Context,
343
        taskC <-chan *InternalTask,
344
        queryTaskC <-chan *InternalTask,
345
) (*InternalTask, error) {
51✔
346
        select {
51✔
347
        case task := <-taskC:
×
348
                if task.responseC != nil {
×
349
                        tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
×
350
                }
×
351
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
×
352
                return task, nil
×
353
        case task := <-queryTaskC:
×
354
                tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
×
355
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
×
356
                return task, nil
×
357
        case <-ctx.Done():
51✔
358
                tm.scope.IncCounter(metrics.PollTimeoutPerTaskListCounter)
51✔
359
                return nil, ErrNoTasks
51✔
360
        }
361
}
362

363
func (tm *TaskMatcher) pollNonBlocking(
364
        ctx context.Context,
365
        taskC <-chan *InternalTask,
366
        queryTaskC <-chan *InternalTask,
367
) (*InternalTask, error) {
16,437✔
368
        select {
16,437✔
369
        case task := <-taskC:
8,050✔
370
                if task.responseC != nil {
8,147✔
371
                        tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
97✔
372
                }
97✔
373
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
8,050✔
374
                return task, nil
8,050✔
375
        case task := <-queryTaskC:
8✔
376
                tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter)
8✔
377
                tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter)
8✔
378
                return task, nil
8✔
379
        default:
8,379✔
380
                return nil, ErrNoTasks
8,379✔
381
        }
382
}
383

384
func (tm *TaskMatcher) fwdrPollReqTokenC() <-chan *ForwarderReqToken {
8,379✔
385
        if tm.fwdr == nil {
15,730✔
386
                return noopForwarderTokenC
7,351✔
387
        }
7,351✔
388
        return tm.fwdr.PollReqTokenC()
1,028✔
389
}
390

391
func (tm *TaskMatcher) fwdrAddReqTokenC() <-chan *ForwarderReqToken {
42,354✔
392
        if tm.fwdr == nil {
71,111✔
393
                return noopForwarderTokenC
28,757✔
394
        }
28,757✔
395
        return tm.fwdr.AddReqTokenC()
13,597✔
396
}
397

398
func (tm *TaskMatcher) ratelimit(ctx context.Context) (*rate.Reservation, error) {
24,877✔
399
        select {
24,877✔
400
        case <-ctx.Done():
×
401
                return nil, ctx.Err()
×
402
        default:
24,877✔
403
        }
404

405
        deadline, ok := ctx.Deadline()
24,877✔
406
        if !ok {
35,288✔
407
                if err := tm.limiter.Wait(ctx); err != nil {
11,466✔
408
                        return nil, err
1,055✔
409
                }
1,055✔
410
                return nil, nil
9,356✔
411
        }
412

413
        rsv := tm.limiter.Reserve()
14,466✔
414
        // If we have to wait too long for reservation, give up and return
14,466✔
415
        if !rsv.OK() || rsv.Delay() > time.Until(deadline) {
14,475✔
416
                if rsv.OK() { // if we were indeed given a reservation, return it before we bail out
9✔
417
                        rsv.Cancel()
×
418
                }
×
419
                return nil, errTasklistThrottled
9✔
420
        }
421

422
        time.Sleep(rsv.Delay())
14,457✔
423
        return rsv, nil
14,457✔
424
}
425

426
func (tm *TaskMatcher) isForwardingAllowed() bool {
20,721✔
427
        return tm.fwdr != nil
20,721✔
428
}
20,721✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc