• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

08 May 2023 05:10PM UTC coverage: 57.153% (-0.07%) from 57.225%
0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

push

buildkite

GitHub
Update persistence layer to adopt idl update for isolation (#5254)

204 of 204 new or added lines in 15 files covered. (100.0%)

85781 of 150089 relevant lines covered (57.15%)

2419.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.32
/service/matching/taskReader.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4

5
// Permission is hereby granted, free of charge, to any person obtaining a copy
6
// of this software and associated documentation files (the "Software"), to deal
7
// in the Software without restriction, including without limitation the rights
8
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
// copies of the Software, and to permit persons to whom the Software is
10
// furnished to do so, subject to the following conditions:
11
//
12
// The above copyright notice and this permission notice shall be included in all
13
// copies or substantial portions of the Software.
14
//
15
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
// SOFTWARE.
22

23
package matching
24

25
import (
26
        "context"
27
        "runtime"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/uber/cadence/common/backoff"
32
        "github.com/uber/cadence/common/log"
33
        "github.com/uber/cadence/common/log/tag"
34
        "github.com/uber/cadence/common/messaging"
35
        "github.com/uber/cadence/common/metrics"
36
        "github.com/uber/cadence/common/persistence"
37
        "github.com/uber/cadence/common/types"
38
)
39

40
var epochStartTime = time.Unix(0, 0)
41

42
type (
43
        taskReader struct {
44
                taskBuffer     chan *persistence.TaskInfo // tasks loaded from persistence
45
                notifyC        chan struct{}              // Used as signal to notify pump of new tasks
46
                tlMgr          *taskListManagerImpl
47
                taskListID     *taskListID
48
                config         *taskListConfig
49
                db             *taskListDB
50
                taskWriter     *taskWriter
51
                taskGC         *taskGC
52
                taskAckManager messaging.AckManager
53
                // The cancel objects are to cancel the ratelimiter Wait in dispatchBufferedTasks. The ideal
54
                // approach is to use request-scoped contexts and use a unique one for each call to Wait. However
55
                // in order to cancel it on shutdown, we need a new goroutine for each call that would wait on
56
                // the shutdown channel. To optimize on efficiency, we instead create one and tag it on the struct
57
                // so the cancel can be called directly on shutdown.
58
                cancelCtx     context.Context
59
                cancelFunc    context.CancelFunc
60
                stopped       int64 // set to 1 if the reader is stopped or is shutting down
61
                logger        log.Logger
62
                scope         metrics.Scope
63
                throttleRetry *backoff.ThrottleRetry
64
                handleErr     func(error) error
65
                onFatalErr    func()
66
                dispatchTask  func(context.Context, *InternalTask) error
67
        }
68
)
69

70
func newTaskReader(tlMgr *taskListManagerImpl) *taskReader {
1,358✔
71
        ctx, cancel := context.WithCancel(context.Background())
1,358✔
72
        return &taskReader{
1,358✔
73
                tlMgr:          tlMgr,
1,358✔
74
                taskListID:     tlMgr.taskListID,
1,358✔
75
                config:         tlMgr.config,
1,358✔
76
                db:             tlMgr.db,
1,358✔
77
                taskWriter:     tlMgr.taskWriter,
1,358✔
78
                taskGC:         tlMgr.taskGC,
1,358✔
79
                taskAckManager: tlMgr.taskAckManager,
1,358✔
80
                cancelCtx:      ctx,
1,358✔
81
                cancelFunc:     cancel,
1,358✔
82
                notifyC:        make(chan struct{}, 1),
1,358✔
83
                // we always dequeue the head of the buffer and try to dispatch it to a poller
1,358✔
84
                // so allocate one less than desired target buffer size
1,358✔
85
                taskBuffer:   make(chan *persistence.TaskInfo, tlMgr.config.GetTasksBatchSize()-1),
1,358✔
86
                logger:       tlMgr.logger,
1,358✔
87
                scope:        tlMgr.scope,
1,358✔
88
                handleErr:    tlMgr.handleErr,
1,358✔
89
                onFatalErr:   tlMgr.Stop,
1,358✔
90
                dispatchTask: tlMgr.DispatchTask,
1,358✔
91
                throttleRetry: backoff.NewThrottleRetry(
1,358✔
92
                        backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
1,358✔
93
                        backoff.WithRetryableError(persistence.IsTransientError),
1,358✔
94
                ),
1,358✔
95
        }
1,358✔
96
}
1,358✔
97

98
func (tr *taskReader) Start() {
1,346✔
99
        tr.Signal()
1,346✔
100
        go tr.dispatchBufferedTasks()
1,346✔
101
        go tr.getTasksPump()
1,346✔
102
}
1,346✔
103

104
func (tr *taskReader) Stop() {
1,336✔
105
        if atomic.CompareAndSwapInt64(&tr.stopped, 0, 1) {
2,672✔
106
                tr.cancelFunc()
1,336✔
107
                if err := tr.persistAckLevel(); err != nil {
1,338✔
108
                        tr.logger.Error("Persistent store operation failure",
2✔
109
                                tag.StoreOperationUpdateTaskList,
2✔
110
                                tag.Error(err))
2✔
111
                }
2✔
112
                tr.taskGC.RunNow(tr.taskAckManager.GetAckLevel())
1,336✔
113
        }
114
}
115

116
func (tr *taskReader) Signal() {
27,724✔
117
        var event struct{}
27,724✔
118
        select {
27,724✔
119
        case tr.notifyC <- event:
24,922✔
120
        default: // channel already has an event, don't block
2,802✔
121
        }
122
}
123

124
func (tr *taskReader) dispatchBufferedTasks() {
1,354✔
125
dispatchLoop:
1,354✔
126
        for {
11,036✔
127
                select {
9,682✔
128
                case taskInfo, ok := <-tr.taskBuffer:
8,593✔
129
                        if !ok { // Task list getTasks pump is shutdown
8,594✔
130
                                break dispatchLoop
1✔
131
                        }
132
                        task := newInternalTask(taskInfo, tr.completeTask, types.TaskSourceDbBacklog, "", false, nil)
8,592✔
133
                        for {
18,205✔
134
                                err := tr.dispatchTask(tr.cancelCtx, task)
9,613✔
135
                                if err == nil {
17,941✔
136
                                        break
8,328✔
137
                                }
138
                                if err == context.Canceled {
1,549✔
139
                                        tr.logger.Info("Tasklist manager context is cancelled, shutting down")
264✔
140
                                        break dispatchLoop
264✔
141
                                }
142
                                // this should never happen unless there is a bug - don't drop the task
143
                                tr.scope.IncCounter(metrics.BufferThrottlePerTaskListCounter)
1,021✔
144
                                tr.logger.Error("taskReader: unexpected error dispatching task", tag.Error(err))
1,021✔
145
                                runtime.Gosched()
1,021✔
146
                        }
147
                case <-tr.cancelCtx.Done():
1,075✔
148
                        break dispatchLoop
1,075✔
149
                }
150
        }
151
}
152

153
func (tr *taskReader) getTasksPump() {
1,350✔
154
        defer close(tr.taskBuffer)
1,350✔
155

1,350✔
156
        updateAckTimer := time.NewTimer(tr.config.UpdateAckInterval())
1,350✔
157
        defer updateAckTimer.Stop()
1,350✔
158
getTasksPumpLoop:
1,350✔
159
        for {
32,146✔
160
                select {
30,796✔
161
                case <-tr.cancelCtx.Done():
1,332✔
162
                        break getTasksPumpLoop
1,332✔
163
                case <-tr.notifyC:
24,921✔
164
                        {
49,842✔
165
                                tasks, readLevel, isReadBatchDone, err := tr.getTaskBatch()
24,921✔
166
                                if err != nil {
24,921✔
167
                                        tr.Signal() // re-enqueue the event
×
168
                                        // TODO: Should we ever stop retrying on db errors?
×
169
                                        continue getTasksPumpLoop
×
170
                                }
171

172
                                if len(tasks) == 0 {
43,368✔
173
                                        tr.taskAckManager.SetReadLevel(readLevel)
18,447✔
174
                                        if !isReadBatchDone {
18,487✔
175
                                                tr.Signal()
40✔
176
                                        }
40✔
177
                                        continue getTasksPumpLoop
18,447✔
178
                                }
179

180
                                if !tr.addTasksToBuffer(tasks) {
6,478✔
181
                                        break getTasksPumpLoop
4✔
182
                                }
183
                                // There maybe more tasks. We yield now, but signal pump to check again later.
184
                                tr.Signal()
6,470✔
185
                        }
186
                case <-updateAckTimer.C:
4,529✔
187
                        {
9,058✔
188
                                if err := tr.handleErr(tr.persistAckLevel()); err != nil {
4,529✔
189
                                        tr.logger.Error("Persistent store operation failure",
×
190
                                                tag.StoreOperationUpdateTaskList,
×
191
                                                tag.Error(err))
×
192
                                        // keep going as saving ack is not critical
×
193
                                }
×
194
                                tr.Signal() // periodically signal pump to check persistence for tasks
4,529✔
195
                                updateAckTimer = time.NewTimer(tr.config.UpdateAckInterval())
4,529✔
196
                        }
197
                }
198
                scope := tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.taskType))
10,999✔
199
                scope.UpdateGauge(metrics.TaskBacklogPerTaskListGauge, float64(tr.taskAckManager.GetBacklogCount()))
10,999✔
200
        }
201

202
}
203

204
func (tr *taskReader) getTaskBatchWithRange(readLevel int64, maxReadLevel int64) ([]*persistence.TaskInfo, error) {
6,921✔
205
        var response *persistence.GetTasksResponse
6,921✔
206
        op := func() (err error) {
13,842✔
207
                response, err = tr.db.GetTasks(readLevel, maxReadLevel, tr.config.GetTasksBatchSize())
6,921✔
208
                return
6,921✔
209
        }
6,921✔
210
        err := tr.throttleRetry.Do(context.Background(), op)
6,921✔
211
        if err != nil {
6,921✔
212
                tr.logger.Error("Persistent store operation failure",
×
213
                        tag.StoreOperationGetTasks,
×
214
                        tag.Error(err),
×
215
                        tag.WorkflowTaskListName(tr.taskListID.name),
×
216
                        tag.WorkflowTaskListType(tr.taskListID.taskType))
×
217
                return nil, err
×
218
        }
×
219
        return response.Tasks, nil
6,921✔
220
}
221

222
// Returns a batch of tasks from persistence starting form current read level.
223
// Also return a number that can be used to update readLevel
224
// Also return a bool to indicate whether read is finished
225
func (tr *taskReader) getTaskBatch() ([]*persistence.TaskInfo, int64, bool, error) {
24,926✔
226
        var tasks []*persistence.TaskInfo
24,926✔
227
        readLevel := tr.taskAckManager.GetReadLevel()
24,926✔
228
        maxReadLevel := tr.taskWriter.GetMaxReadLevel()
24,926✔
229

24,926✔
230
        // counter i is used to break and let caller check whether tasklist is still alive and need resume read.
24,926✔
231
        for i := 0; i < 10 && readLevel < maxReadLevel; i++ {
31,847✔
232
                upper := readLevel + tr.config.RangeSize
6,921✔
233
                if upper > maxReadLevel {
12,999✔
234
                        upper = maxReadLevel
6,078✔
235
                }
6,078✔
236
                tasks, err := tr.getTaskBatchWithRange(readLevel, upper)
6,921✔
237
                if err != nil {
6,921✔
238
                        return nil, readLevel, true, err
×
239
                }
×
240
                // return as long as it grabs any tasks
241
                if len(tasks) > 0 {
13,397✔
242
                        return tasks, upper, true, nil
6,476✔
243
                }
6,476✔
244
                readLevel = upper
445✔
245
        }
246
        return tasks, readLevel, readLevel == maxReadLevel, nil // caller will update readLevel when no task grabbed
18,450✔
247
}
248

249
func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo, now time.Time) bool {
10,108✔
250
        return t.Expiry.After(epochStartTime) && time.Now().After(t.Expiry)
10,108✔
251
}
10,108✔
252

253
func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool {
6,476✔
254
        now := time.Now()
6,476✔
255
        for _, t := range tasks {
16,584✔
256
                if tr.isTaskExpired(t, now) {
10,153✔
257
                        tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter)
45✔
258
                        // Also increment readLevel for expired tasks otherwise it could result in
45✔
259
                        // looping over the same tasks if all tasks read in the batch are expired
45✔
260
                        tr.taskAckManager.SetReadLevel(t.TaskID)
45✔
261
                        continue
45✔
262
                }
263
                if !tr.addSingleTaskToBuffer(t) {
10,067✔
264
                        return false // we are shutting down the task list
4✔
265
                }
4✔
266
        }
267
        return true
6,472✔
268
}
269

270
func (tr *taskReader) addSingleTaskToBuffer(task *persistence.TaskInfo) bool {
10,063✔
271
        err := tr.taskAckManager.ReadItem(task.TaskID)
10,063✔
272
        if err != nil {
10,063✔
273
                tr.logger.Fatal("critical bug when adding item to ackManager", tag.Error(err))
×
274
        }
×
275
        for {
20,126✔
276
                select {
10,063✔
277
                case tr.taskBuffer <- task:
10,059✔
278
                        return true
10,059✔
279
                case <-tr.cancelCtx.Done():
4✔
280
                        return false
4✔
281
                }
282
        }
283
}
284

285
func (tr *taskReader) persistAckLevel() error {
5,865✔
286
        ackLevel := tr.taskAckManager.GetAckLevel()
5,865✔
287
        if ackLevel >= 0 {
11,726✔
288
                maxReadLevel := tr.taskWriter.GetMaxReadLevel()
5,861✔
289
                scope := tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.taskType))
5,861✔
290
                // note: this metrics is only an estimation for the lag. taskID in DB may not be continuous,
5,861✔
291
                // especially when task list ownership changes.
5,861✔
292
                scope.UpdateGauge(metrics.TaskLagPerTaskListGauge, float64(maxReadLevel-ackLevel))
5,861✔
293

5,861✔
294
                return tr.db.UpdateState(ackLevel)
5,861✔
295
        }
5,861✔
296
        return nil
4✔
297
}
298

299
// completeTask marks a task as processed. Only tasks created by taskReader (i.e. backlog from db) reach
300
// here. As part of completion:
301
//   - task is deleted from the database when err is nil
302
//   - new task is created and current task is deleted when err is not nil
303
func (tr *taskReader) completeTask(task *persistence.TaskInfo, err error) {
8,328✔
304
        if err != nil {
8,329✔
305
                // failed to start the task.
1✔
306
                // We cannot just remove it from persistence because then it will be lost.
1✔
307
                // We handle this by writing the task back to persistence with a higher taskID.
1✔
308
                // This will allow subsequent tasks to make progress, and hopefully by the time this task is picked-up
1✔
309
                // again the underlying reason for failing to start will be resolved.
1✔
310
                // Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be
1✔
311
                // re-written to persistence frequently.
1✔
312
                op := func() error {
2✔
313
                        wf := &types.WorkflowExecution{WorkflowID: task.WorkflowID, RunID: task.RunID}
1✔
314
                        _, err := tr.taskWriter.appendTask(wf, task)
1✔
315
                        return err
1✔
316
                }
1✔
317
                err = tr.throttleRetry.Do(context.Background(), op)
1✔
318
                if err != nil {
1✔
319
                        // OK, we also failed to write to persistence.
×
320
                        // This should only happen in very extreme cases where persistence is completely down.
×
321
                        // We still can't lose the old task so we just unload the entire task list
×
322
                        tr.logger.Error("Failed to complete task", tag.Error(err))
×
323
                        tr.onFatalErr()
×
324
                        return
×
325
                }
×
326
                tr.Signal()
1✔
327
        }
328
        ackLevel := tr.taskAckManager.AckItem(task.TaskID)
8,328✔
329
        tr.taskGC.Run(ackLevel)
8,328✔
330
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc