• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.39
/service/matching/taskReader.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4

5
// Permission is hereby granted, free of charge, to any person obtaining a copy
6
// of this software and associated documentation files (the "Software"), to deal
7
// in the Software without restriction, including without limitation the rights
8
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
// copies of the Software, and to permit persons to whom the Software is
10
// furnished to do so, subject to the following conditions:
11
//
12
// The above copyright notice and this permission notice shall be included in all
13
// copies or substantial portions of the Software.
14
//
15
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
// SOFTWARE.
22

23
package matching
24

25
import (
26
        "context"
27
        "runtime"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/uber/cadence/common/backoff"
32
        "github.com/uber/cadence/common/log"
33
        "github.com/uber/cadence/common/log/tag"
34
        "github.com/uber/cadence/common/messaging"
35
        "github.com/uber/cadence/common/metrics"
36
        "github.com/uber/cadence/common/persistence"
37
        "github.com/uber/cadence/common/types"
38
)
39

40
var epochStartTime = time.Unix(0, 0)
41

42
type (
43
        taskReader struct {
44
                taskBuffer     chan *persistence.TaskInfo // tasks loaded from persistence
45
                notifyC        chan struct{}              // Used as signal to notify pump of new tasks
46
                tlMgr          *taskListManagerImpl
47
                taskListID     *taskListID
48
                config         *taskListConfig
49
                db             *taskListDB
50
                taskWriter     *taskWriter
51
                taskGC         *taskGC
52
                taskAckManager messaging.AckManager
53
                // The cancel objects are to cancel the ratelimiter Wait in dispatchBufferedTasks. The ideal
54
                // approach is to use request-scoped contexts and use a unique one for each call to Wait. However
55
                // in order to cancel it on shutdown, we need a new goroutine for each call that would wait on
56
                // the shutdown channel. To optimize on efficiency, we instead create one and tag it on the struct
57
                // so the cancel can be called directly on shutdown.
58
                cancelCtx                context.Context
59
                cancelFunc               context.CancelFunc
60
                stopped                  int64 // set to 1 if the reader is stopped or is shutting down
61
                logger                   log.Logger
62
                scope                    metrics.Scope
63
                throttleRetry            *backoff.ThrottleRetry
64
                handleErr                func(error) error
65
                onFatalErr               func()
66
                dispatchTask             func(context.Context, *InternalTask) error
67
                getIsolationGroupForTask func(context.Context, *persistence.TaskInfo) (string, error)
68
        }
69
)
70

71
func newTaskReader(tlMgr *taskListManagerImpl) *taskReader {
1,399✔
72
        ctx, cancel := context.WithCancel(context.Background())
1,399✔
73
        return &taskReader{
1,399✔
74
                tlMgr:          tlMgr,
1,399✔
75
                taskListID:     tlMgr.taskListID,
1,399✔
76
                config:         tlMgr.config,
1,399✔
77
                db:             tlMgr.db,
1,399✔
78
                taskWriter:     tlMgr.taskWriter,
1,399✔
79
                taskGC:         tlMgr.taskGC,
1,399✔
80
                taskAckManager: tlMgr.taskAckManager,
1,399✔
81
                cancelCtx:      ctx,
1,399✔
82
                cancelFunc:     cancel,
1,399✔
83
                notifyC:        make(chan struct{}, 1),
1,399✔
84
                // we always dequeue the head of the buffer and try to dispatch it to a poller
1,399✔
85
                // so allocate one less than desired target buffer size
1,399✔
86
                taskBuffer:               make(chan *persistence.TaskInfo, tlMgr.config.GetTasksBatchSize()-1),
1,399✔
87
                logger:                   tlMgr.logger,
1,399✔
88
                scope:                    tlMgr.scope,
1,399✔
89
                handleErr:                tlMgr.handleErr,
1,399✔
90
                onFatalErr:               tlMgr.Stop,
1,399✔
91
                dispatchTask:             tlMgr.DispatchTask,
1,399✔
92
                getIsolationGroupForTask: tlMgr.getIsolationGroupForTask,
1,399✔
93
                throttleRetry: backoff.NewThrottleRetry(
1,399✔
94
                        backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
1,399✔
95
                        backoff.WithRetryableError(persistence.IsTransientError),
1,399✔
96
                ),
1,399✔
97
        }
1,399✔
98
}
1,399✔
99

100
func (tr *taskReader) Start() {
1,387✔
101
        tr.Signal()
1,387✔
102
        go tr.dispatchBufferedTasks()
1,387✔
103
        go tr.getTasksPump()
1,387✔
104
}
1,387✔
105

106
func (tr *taskReader) Stop() {
1,365✔
107
        if atomic.CompareAndSwapInt64(&tr.stopped, 0, 1) {
2,730✔
108
                tr.cancelFunc()
1,365✔
109
                if err := tr.persistAckLevel(); err != nil {
1,367✔
110
                        tr.logger.Error("Persistent store operation failure",
2✔
111
                                tag.StoreOperationUpdateTaskList,
2✔
112
                                tag.Error(err))
2✔
113
                }
2✔
114
                tr.taskGC.RunNow(tr.taskAckManager.GetAckLevel())
1,365✔
115
        }
116
}
117

118
func (tr *taskReader) Signal() {
42,278✔
119
        var event struct{}
42,278✔
120
        select {
42,278✔
121
        case tr.notifyC <- event:
36,004✔
122
        default: // channel already has an event, don't block
6,274✔
123
        }
124
}
125

126
func (tr *taskReader) dispatchBufferedTasks() {
1,395✔
127
dispatchLoop:
1,395✔
128
        for {
15,796✔
129
                select {
14,401✔
130
                case taskInfo, ok := <-tr.taskBuffer:
13,270✔
131
                        if !ok { // Task list getTasks pump is shutdown
13,271✔
132
                                break dispatchLoop
1✔
133
                        }
134
                        for {
28,208✔
135
                                // find isolation group of the task
14,939✔
136
                                isolationGroup, err := tr.getIsolationGroupForTask(tr.cancelCtx, taskInfo)
14,939✔
137
                                if err != nil {
14,939✔
138
                                        // it only errors when the tasklist is a sticky tasklist and
×
139
                                        // the sticky pollers are not available, in this case, we just complete the task
×
140
                                        // and let the decision get timed out and rescheduled to non-sticky tasklist
×
141
                                        if err == _stickyPollerUnavailableError {
×
142
                                                tr.completeTask(taskInfo, nil)
×
143
                                        } else {
×
144
                                                // it should never happen, unless there is a bug in 'getIsolationGroupForTask' method
×
145
                                                tr.logger.Error("taskReader: unexpected error getting isolation group", tag.Error(err))
×
146
                                                tr.completeTask(taskInfo, err)
×
147
                                        }
×
148
                                        break
×
149
                                }
150
                                task := newInternalTask(taskInfo, tr.completeTask, types.TaskSourceDbBacklog, "", false, nil, isolationGroup)
14,939✔
151
                                dispatchCtx, cancel := tr.newDispatchContext(isolationGroup)
14,939✔
152
                                timerScope := tr.scope.StartTimer(metrics.AsyncMatchLatencyPerTaskList)
14,939✔
153
                                err = tr.dispatchTask(dispatchCtx, task)
14,939✔
154
                                timerScope.Stop()
14,939✔
155
                                cancel()
14,939✔
156
                                if err == nil {
27,945✔
157
                                        break
13,006✔
158
                                }
159
                                if err == context.Canceled {
2,196✔
160
                                        tr.logger.Info("Tasklist manager context is cancelled, shutting down")
263✔
161
                                        break dispatchLoop
263✔
162
                                }
163
                                if err == context.DeadlineExceeded {
1,817✔
164
                                        // it only happens when isolation is enabled and there is no pollers from the given isolation group
147✔
165
                                        // if this happens, we don't want to block the task dispatching, because there might be pollers from
147✔
166
                                        // other isolation groups, we just simply continue and dispatch the task to a new isolation group which
147✔
167
                                        // has pollers
147✔
168
                                        tr.logger.Warn("Async task dispatch timed out")
147✔
169
                                        tr.scope.IncCounter(metrics.AsyncMatchDispatchTimeoutCounterPerTaskList)
147✔
170
                                        continue
147✔
171
                                }
172
                                // this should never happen unless there is a bug - don't drop the task
173
                                tr.scope.IncCounter(metrics.BufferThrottlePerTaskListCounter)
1,523✔
174
                                tr.logger.Error("taskReader: unexpected error dispatching task", tag.Error(err))
1,523✔
175
                                runtime.Gosched()
1,523✔
176
                        }
177
                case <-tr.cancelCtx.Done():
1,105✔
178
                        break dispatchLoop
1,105✔
179
                }
180
        }
181
}
182

183
func (tr *taskReader) getTasksPump() {
1,391✔
184
        defer close(tr.taskBuffer)
1,391✔
185

1,391✔
186
        updateAckTimer := time.NewTimer(tr.config.UpdateAckInterval())
1,391✔
187
        defer updateAckTimer.Stop()
1,391✔
188
getTasksPumpLoop:
1,391✔
189
        for {
43,350✔
190
                select {
41,959✔
191
                case <-tr.cancelCtx.Done():
1,362✔
192
                        break getTasksPumpLoop
1,362✔
193
                case <-tr.notifyC:
36,001✔
194
                        {
72,002✔
195
                                tasks, readLevel, isReadBatchDone, err := tr.getTaskBatch()
36,001✔
196
                                if err != nil {
36,001✔
197
                                        tr.Signal() // re-enqueue the event
×
198
                                        // TODO: Should we ever stop retrying on db errors?
×
199
                                        continue getTasksPumpLoop
×
200
                                }
201

202
                                if len(tasks) == 0 {
61,184✔
203
                                        tr.taskAckManager.SetReadLevel(readLevel)
25,183✔
204
                                        if !isReadBatchDone {
25,315✔
205
                                                tr.Signal()
132✔
206
                                        }
132✔
207
                                        continue getTasksPumpLoop
25,183✔
208
                                }
209

210
                                if !tr.addTasksToBuffer(tasks) {
10,821✔
211
                                        break getTasksPumpLoop
3✔
212
                                }
213
                                // There maybe more tasks. We yield now, but signal pump to check again later.
214
                                tr.Signal()
10,815✔
215
                        }
216
                case <-updateAckTimer.C:
4,570✔
217
                        {
9,140✔
218
                                if err := tr.handleErr(tr.persistAckLevel()); err != nil {
4,570✔
219
                                        tr.logger.Error("Persistent store operation failure",
×
220
                                                tag.StoreOperationUpdateTaskList,
×
221
                                                tag.Error(err))
×
222
                                        // keep going as saving ack is not critical
×
223
                                }
×
224
                                tr.Signal() // periodically signal pump to check persistence for tasks
4,570✔
225
                                updateAckTimer = time.NewTimer(tr.config.UpdateAckInterval())
4,570✔
226
                        }
227
                }
228
                scope := tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.taskType))
15,385✔
229
                scope.UpdateGauge(metrics.TaskBacklogPerTaskListGauge, float64(tr.taskAckManager.GetBacklogCount()))
15,385✔
230
        }
231

232
}
233

234
func (tr *taskReader) getTaskBatchWithRange(readLevel int64, maxReadLevel int64) ([]*persistence.TaskInfo, error) {
12,225✔
235
        var response *persistence.GetTasksResponse
12,225✔
236
        op := func() (err error) {
24,450✔
237
                response, err = tr.db.GetTasks(readLevel, maxReadLevel, tr.config.GetTasksBatchSize())
12,225✔
238
                return
12,225✔
239
        }
12,225✔
240
        err := tr.throttleRetry.Do(context.Background(), op)
12,225✔
241
        if err != nil {
12,225✔
242
                tr.logger.Error("Persistent store operation failure",
×
243
                        tag.StoreOperationGetTasks,
×
244
                        tag.Error(err),
×
245
                        tag.WorkflowTaskListName(tr.taskListID.name),
×
246
                        tag.WorkflowTaskListType(tr.taskListID.taskType))
×
247
                return nil, err
×
248
        }
×
249
        return response.Tasks, nil
12,225✔
250
}
251

252
// Returns a batch of tasks from persistence starting form current read level.
253
// Also return a number that can be used to update readLevel
254
// Also return a bool to indicate whether read is finished
255
func (tr *taskReader) getTaskBatch() ([]*persistence.TaskInfo, int64, bool, error) {
36,006✔
256
        var tasks []*persistence.TaskInfo
36,006✔
257
        readLevel := tr.taskAckManager.GetReadLevel()
36,006✔
258
        maxReadLevel := tr.taskWriter.GetMaxReadLevel()
36,006✔
259

36,006✔
260
        // counter i is used to break and let caller check whether tasklist is still alive and need resume read.
36,006✔
261
        for i := 0; i < 10 && readLevel < maxReadLevel; i++ {
48,231✔
262
                upper := readLevel + tr.config.RangeSize
12,225✔
263
                if upper > maxReadLevel {
22,561✔
264
                        upper = maxReadLevel
10,336✔
265
                }
10,336✔
266
                tasks, err := tr.getTaskBatchWithRange(readLevel, upper)
12,225✔
267
                if err != nil {
12,225✔
268
                        return nil, readLevel, true, err
×
269
                }
×
270
                // return as long as it grabs any tasks
271
                if len(tasks) > 0 {
23,045✔
272
                        return tasks, upper, true, nil
10,820✔
273
                }
10,820✔
274
                readLevel = upper
1,405✔
275
        }
276
        return tasks, readLevel, readLevel == maxReadLevel, nil // caller will update readLevel when no task grabbed
25,186✔
277
}
278

279
func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo, now time.Time) bool {
14,806✔
280
        return t.Expiry.After(epochStartTime) && time.Now().After(t.Expiry)
14,806✔
281
}
14,806✔
282

283
func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool {
10,820✔
284
        now := time.Now()
10,820✔
285
        for _, t := range tasks {
25,626✔
286
                if tr.isTaskExpired(t, now) {
14,852✔
287
                        tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter)
46✔
288
                        // Also increment readLevel for expired tasks otherwise it could result in
46✔
289
                        // looping over the same tasks if all tasks read in the batch are expired
46✔
290
                        tr.taskAckManager.SetReadLevel(t.TaskID)
46✔
291
                        continue
46✔
292
                }
293
                if !tr.addSingleTaskToBuffer(t) {
14,763✔
294
                        return false // we are shutting down the task list
3✔
295
                }
3✔
296
        }
297
        return true
10,817✔
298
}
299

300
func (tr *taskReader) addSingleTaskToBuffer(task *persistence.TaskInfo) bool {
14,760✔
301
        err := tr.taskAckManager.ReadItem(task.TaskID)
14,760✔
302
        if err != nil {
14,760✔
303
                tr.logger.Fatal("critical bug when adding item to ackManager", tag.Error(err))
×
304
        }
×
305
        for {
29,520✔
306
                select {
14,760✔
307
                case tr.taskBuffer <- task:
14,757✔
308
                        return true
14,757✔
309
                case <-tr.cancelCtx.Done():
3✔
310
                        return false
3✔
311
                }
312
        }
313
}
314

315
func (tr *taskReader) persistAckLevel() error {
5,935✔
316
        ackLevel := tr.taskAckManager.GetAckLevel()
5,935✔
317
        if ackLevel >= 0 {
11,867✔
318
                maxReadLevel := tr.taskWriter.GetMaxReadLevel()
5,932✔
319
                scope := tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.taskType))
5,932✔
320
                // note: this metrics is only an estimation for the lag. taskID in DB may not be continuous,
5,932✔
321
                // especially when task list ownership changes.
5,932✔
322
                scope.UpdateGauge(metrics.TaskLagPerTaskListGauge, float64(maxReadLevel-ackLevel))
5,932✔
323

5,932✔
324
                return tr.db.UpdateState(ackLevel)
5,932✔
325
        }
5,932✔
326
        return nil
3✔
327
}
328

329
// completeTask marks a task as processed. Only tasks created by taskReader (i.e. backlog from db) reach
330
// here. As part of completion:
331
//   - task is deleted from the database when err is nil
332
//   - new task is created and current task is deleted when err is not nil
333
func (tr *taskReader) completeTask(task *persistence.TaskInfo, err error) {
13,006✔
334
        if err != nil {
13,007✔
335
                // failed to start the task.
1✔
336
                // We cannot just remove it from persistence because then it will be lost.
1✔
337
                // We handle this by writing the task back to persistence with a higher taskID.
1✔
338
                // This will allow subsequent tasks to make progress, and hopefully by the time this task is picked-up
1✔
339
                // again the underlying reason for failing to start will be resolved.
1✔
340
                // Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be
1✔
341
                // re-written to persistence frequently.
1✔
342
                op := func() error {
2✔
343
                        wf := &types.WorkflowExecution{WorkflowID: task.WorkflowID, RunID: task.RunID}
1✔
344
                        _, err := tr.taskWriter.appendTask(wf, task)
1✔
345
                        return err
1✔
346
                }
1✔
347
                err = tr.throttleRetry.Do(context.Background(), op)
1✔
348
                if err != nil {
1✔
349
                        // OK, we also failed to write to persistence.
×
350
                        // This should only happen in very extreme cases where persistence is completely down.
×
351
                        // We still can't lose the old task so we just unload the entire task list
×
352
                        tr.logger.Error("Failed to complete task", tag.Error(err))
×
353
                        tr.onFatalErr()
×
354
                        return
×
355
                }
×
356
                tr.Signal()
1✔
357
        }
358
        ackLevel := tr.taskAckManager.AckItem(task.TaskID)
13,006✔
359
        tr.taskGC.Run(ackLevel)
13,006✔
360
}
361

362
func (tr *taskReader) newDispatchContext(isolationGroup string) (context.Context, context.CancelFunc) {
14,939✔
363
        if isolationGroup != "" {
20,295✔
364
                return context.WithTimeout(tr.cancelCtx, tr.config.AsyncTaskDispatchTimeout())
5,356✔
365
        }
5,356✔
366
        return tr.cancelCtx, func() {}
19,166✔
367
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc