• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018ecf37-6c70-46a8-8890-f48c24c91b7e

11 Apr 2024 10:11PM UTC coverage: 67.347% (-0.01%) from 67.357%
018ecf37-6c70-46a8-8890-f48c24c91b7e

push

buildkite

web-flow
Persist workflow request ids into Cassandra (#5826)

285 of 484 new or added lines in 7 files covered. (58.88%)

33 existing lines in 13 files now uncovered.

98455 of 146191 relevant lines covered (67.35%)

2396.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.62
/service/history/queue/timer_queue_processor_base.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "fmt"
26
        "math"
27
        "math/rand"
28
        "sync"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/uber/cadence/common"
33
        "github.com/uber/cadence/common/backoff"
34
        "github.com/uber/cadence/common/dynamicconfig"
35
        "github.com/uber/cadence/common/log"
36
        "github.com/uber/cadence/common/log/tag"
37
        "github.com/uber/cadence/common/metrics"
38
        "github.com/uber/cadence/common/persistence"
39
        "github.com/uber/cadence/service/history/config"
40
        "github.com/uber/cadence/service/history/shard"
41
        "github.com/uber/cadence/service/history/task"
42
)
43

44
var (
45
        maximumTimerTaskKey = newTimerTaskKey(
46
                time.Unix(0, math.MaxInt64),
47
                0,
48
        )
49
)
50

51
type (
52
        timerTaskKey struct {
53
                visibilityTimestamp time.Time
54
                taskID              int64
55
        }
56

57
        timeTaskReadProgress struct {
58
                currentQueue  ProcessingQueue
59
                readLevel     task.Key
60
                maxReadLevel  task.Key
61
                nextPageToken []byte
62
        }
63

64
        timerQueueProcessorBase struct {
65
                *processorBase
66

67
                taskInitializer task.Initializer
68
                clusterName     string
69
                pollTimeLock    sync.Mutex
70
                backoffTimer    map[int]*time.Timer
71
                nextPollTime    map[int]time.Time
72
                timerGate       TimerGate
73

74
                // timer notification
75
                newTimerCh  chan struct{}
76
                newTimeLock sync.Mutex
77
                newTime     time.Time
78

79
                processingQueueReadProgress map[int]timeTaskReadProgress
80

81
                updateAckLevelFn                 func() (bool, task.Key, error)
82
                splitProcessingQueueCollectionFn func(splitPolicy ProcessingQueueSplitPolicy, upsertPollTimeFn func(int, time.Time))
83
        }
84

85
        filteredTimerTasksResponse struct {
86
                timerTasks    []*persistence.TimerTaskInfo
87
                lookAheadTask *persistence.TimerTaskInfo
88
                nextPageToken []byte
89
        }
90
)
91

92
func newTimerQueueProcessorBase(
93
        clusterName string,
94
        shard shard.Context,
95
        processingQueueStates []ProcessingQueueState,
96
        taskProcessor task.Processor,
97
        timerGate TimerGate,
98
        options *queueProcessorOptions,
99
        updateMaxReadLevel updateMaxReadLevelFn,
100
        updateClusterAckLevel updateClusterAckLevelFn,
101
        updateProcessingQueueStates updateProcessingQueueStatesFn,
102
        queueShutdown queueShutdownFn,
103
        taskFilter task.Filter,
104
        taskExecutor task.Executor,
105
        logger log.Logger,
106
        metricsClient metrics.Client,
107
) *timerQueueProcessorBase {
139✔
108
        processorBase := newProcessorBase(
139✔
109
                shard,
139✔
110
                processingQueueStates,
139✔
111
                taskProcessor,
139✔
112
                options,
139✔
113
                updateMaxReadLevel,
139✔
114
                updateClusterAckLevel,
139✔
115
                updateProcessingQueueStates,
139✔
116
                queueShutdown,
139✔
117
                logger.WithTags(tag.ComponentTimerQueue),
139✔
118
                metricsClient,
139✔
119
        )
139✔
120

139✔
121
        queueType := task.QueueTypeActiveTimer
139✔
122
        if options.MetricScope == metrics.TimerStandbyQueueProcessorScope {
202✔
123
                queueType = task.QueueTypeStandbyTimer
63✔
124
        }
63✔
125

126
        t := &timerQueueProcessorBase{
139✔
127
                processorBase: processorBase,
139✔
128
                taskInitializer: func(taskInfo task.Info) task.Task {
2,423✔
129
                        return task.NewTimerTask(
2,284✔
130
                                shard,
2,284✔
131
                                taskInfo,
2,284✔
132
                                queueType,
2,284✔
133
                                task.InitializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
2,284✔
134
                                taskFilter,
2,284✔
135
                                taskExecutor,
2,284✔
136
                                taskProcessor,
2,284✔
137
                                processorBase.redispatcher.AddTask,
2,284✔
138
                                shard.GetConfig().TaskCriticalRetryCount,
2,284✔
139
                        )
2,284✔
140
                },
2,284✔
141
                clusterName:                 clusterName,
142
                backoffTimer:                make(map[int]*time.Timer),
143
                nextPollTime:                make(map[int]time.Time),
144
                timerGate:                   timerGate,
145
                newTimerCh:                  make(chan struct{}, 1),
146
                processingQueueReadProgress: make(map[int]timeTaskReadProgress),
147
        }
148

149
        t.updateAckLevelFn = t.updateAckLevel
139✔
150
        t.splitProcessingQueueCollectionFn = t.splitProcessingQueueCollection
139✔
151
        return t
139✔
152
}
153

154
func (t *timerQueueProcessorBase) Start() {
125✔
155
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
125✔
156
                return
×
157
        }
×
158

159
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarting)
125✔
160
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarted)
125✔
161

125✔
162
        t.redispatcher.Start()
125✔
163

125✔
164
        newPollTime := time.Time{}
125✔
165
        if startJitter := t.options.MaxStartJitterInterval(); startJitter > 0 {
125✔
166
                now := t.shard.GetTimeSource().Now()
×
167
                newPollTime = now.Add(time.Duration(rand.Int63n(int64(startJitter))))
×
168
        }
×
169
        for _, queueCollections := range t.processingQueueCollections {
250✔
170
                t.upsertPollTime(queueCollections.Level(), newPollTime)
125✔
171
        }
125✔
172

173
        t.shutdownWG.Add(1)
125✔
174
        go t.processorPump()
125✔
175
}
176

177
// Edge Case: Stop doesn't stop TimerGate if timerQueueProcessorBase is only initiliazed without starting
178
// As a result, TimerGate needs to be stopped separately
179
// One way to fix this is to make sure TimerGate doesn't start daemon loop on initilization and requires explicit Start
180
func (t *timerQueueProcessorBase) Stop() {
125✔
181
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
125✔
182
                return
×
183
        }
×
184

185
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopping)
125✔
186
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopped)
125✔
187

125✔
188
        t.timerGate.Close()
125✔
189
        close(t.shutdownCh)
125✔
190
        t.pollTimeLock.Lock()
125✔
191
        for _, timer := range t.backoffTimer {
125✔
192
                timer.Stop()
×
193
        }
×
194
        t.pollTimeLock.Unlock()
125✔
195

125✔
196
        if success := common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout); !success {
125✔
197
                t.logger.Warn("timerQueueProcessorBase timed out on shut down", tag.LifeCycleStopTimedout)
×
198
        }
×
199

200
        t.redispatcher.Stop()
125✔
201
}
202

203
func (t *timerQueueProcessorBase) processorPump() {
125✔
204
        defer t.shutdownWG.Done()
125✔
205
        updateAckTimer := time.NewTimer(backoff.JitDuration(t.options.UpdateAckInterval(), t.options.UpdateAckIntervalJitterCoefficient()))
125✔
206
        defer updateAckTimer.Stop()
125✔
207
        splitQueueTimer := time.NewTimer(backoff.JitDuration(t.options.SplitQueueInterval(), t.options.SplitQueueIntervalJitterCoefficient()))
125✔
208
        defer splitQueueTimer.Stop()
125✔
209

125✔
210
        for {
6,445✔
211
                select {
6,320✔
212
                case <-t.shutdownCh:
125✔
213
                        return
125✔
214
                case <-t.timerGate.FireChan():
2,392✔
215
                        t.updateTimerGates()
2,392✔
216
                case <-updateAckTimer.C:
417✔
217
                        if stopPump := t.handleAckLevelUpdate(updateAckTimer); stopPump {
417✔
218
                                if !t.options.EnableGracefulSyncShutdown() {
×
219
                                        go t.Stop()
×
220
                                        return
×
221
                                }
×
222

223
                                t.Stop()
×
224
                                return
×
225
                        }
226
                case <-t.newTimerCh:
3,023✔
227
                        t.handleNewTimer()
3,023✔
228
                case <-splitQueueTimer.C:
198✔
229
                        t.splitQueue(splitQueueTimer)
198✔
230
                case notification := <-t.actionNotifyCh:
177✔
231
                        t.handleActionNotification(notification)
177✔
232
                }
233
        }
234
}
235

236
func (t *timerQueueProcessorBase) processQueueCollections(levels map[int]struct{}) {
2,396✔
237
        for _, queueCollection := range t.processingQueueCollections {
4,792✔
238
                level := queueCollection.Level()
2,396✔
239
                if _, ok := levels[level]; !ok {
2,396✔
240
                        continue
×
241
                }
242

243
                activeQueue := queueCollection.ActiveQueue()
2,396✔
244
                if activeQueue == nil {
2,396✔
245
                        // process for this queue collection has finished
×
246
                        // it's possible that new queue will be added to this collection later though,
×
247
                        // pollTime will be updated after split/merge
×
248
                        t.logger.Debug("Active queue is nil for timer queue at this level", tag.QueueLevel(level))
×
249
                        continue
×
250
                }
251

252
                t.upsertPollTime(level, t.shard.GetCurrentTime(t.clusterName).Add(backoff.JitDuration(
2,396✔
253
                        t.options.MaxPollInterval(),
2,396✔
254
                        t.options.MaxPollIntervalJitterCoefficient(),
2,396✔
255
                )))
2,396✔
256

2,396✔
257
                var nextPageToken []byte
2,396✔
258
                readLevel := activeQueue.State().ReadLevel()
2,396✔
259
                maxReadLevel := minTaskKey(activeQueue.State().MaxLevel(), t.updateMaxReadLevel())
2,396✔
260
                domainFilter := activeQueue.State().DomainFilter()
2,396✔
261

2,396✔
262
                if progress, ok := t.processingQueueReadProgress[level]; ok {
2,398✔
263
                        if progress.currentQueue == activeQueue {
4✔
264
                                readLevel = progress.readLevel
2✔
265
                                maxReadLevel = progress.maxReadLevel
2✔
266
                                nextPageToken = progress.nextPageToken
2✔
267
                        }
2✔
268
                        delete(t.processingQueueReadProgress, level)
2✔
269
                }
270

271
                if !readLevel.Less(maxReadLevel) {
2,462✔
272
                        // notify timer gate about the min time
66✔
273
                        t.upsertPollTime(level, readLevel.(timerTaskKey).visibilityTimestamp)
66✔
274
                        t.logger.Debug("Skipping processing timer queue at this level because readLevel >= maxReadLevel", tag.QueueLevel(level))
66✔
275
                        continue
66✔
276
                }
277

278
                ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay)
2,333✔
279
                if err := t.rateLimiter.Wait(ctx); err != nil {
2,333✔
280
                        cancel()
×
281
                        if level == defaultProcessingQueueLevel {
×
282
                                t.upsertPollTime(level, time.Time{})
×
283
                        } else {
×
284
                                t.setupBackoffTimer(level)
×
285
                        }
×
286
                        continue
×
287
                }
288
                cancel()
2,333✔
289

2,333✔
290
                resp, err := t.readAndFilterTasks(readLevel, maxReadLevel, nextPageToken)
2,333✔
291
                if err != nil {
2,333✔
292
                        t.logger.Error("Processor unable to retrieve tasks", tag.Error(err))
×
293
                        t.upsertPollTime(level, time.Time{}) // re-enqueue the event
×
294
                        continue
×
295
                }
296

297
                tasks := make(map[task.Key]task.Task)
2,333✔
298
                taskChFull := false
2,333✔
299
                submittedCount := 0
2,333✔
300
                for _, taskInfo := range resp.timerTasks {
4,619✔
301
                        if !domainFilter.Filter(taskInfo.GetDomainID()) {
2,288✔
302
                                continue
2✔
303
                        }
304

305
                        task := t.taskInitializer(taskInfo)
2,284✔
306
                        tasks[newTimerTaskKey(taskInfo.GetVisibilityTimestamp(), taskInfo.GetTaskID())] = task
2,284✔
307
                        submitted, err := t.submitTask(task)
2,284✔
308
                        if err != nil {
2,284✔
309
                                // only err here is due to the fact that processor has been shutdown
×
310
                                // return instead of continue
×
311
                                return
×
312
                        }
×
313
                        taskChFull = taskChFull || !submitted
2,284✔
314
                        if submitted {
4,568✔
315
                                submittedCount++
2,284✔
316
                        }
2,284✔
317
                }
318
                t.logger.Debugf("Submitted %d timer tasks successfully out of %d tasks", submittedCount, len(resp.timerTasks))
2,333✔
319

2,333✔
320
                var newReadLevel task.Key
2,333✔
321
                if len(resp.nextPageToken) == 0 {
4,665✔
322
                        newReadLevel = maxReadLevel
2,332✔
323
                        if resp.lookAheadTask != nil {
4,603✔
324
                                // lookAheadTask may exist only when nextPageToken is empty
2,271✔
325
                                // notice that lookAheadTask.VisibilityTimestamp may be larger than shard max read level,
2,271✔
326
                                // which means new tasks can be generated before that timestamp. This issue is solved by
2,271✔
327
                                // upsertPollTime whenever there are new tasks
2,271✔
328
                                lookAheadTimestamp := resp.lookAheadTask.GetVisibilityTimestamp()
2,271✔
329
                                t.upsertPollTime(level, lookAheadTimestamp)
2,271✔
330
                                newReadLevel = minTaskKey(newReadLevel, newTimerTaskKey(lookAheadTimestamp, 0))
2,271✔
331
                                t.logger.Debugf("nextPageToken is empty for timer queue at level %d so setting newReadLevel to max(lookAheadTask.timestamp: %v, maxReadLevel: %v)", level, lookAheadTimestamp, maxReadLevel)
2,271✔
332
                        } else {
2,335✔
333
                                // else we have no idea when the next poll should happen
64✔
334
                                // rely on notifyNewTask to trigger the next poll even for non-default queue.
64✔
335
                                // another option for non-default queue is that we can setup a backoff timer to check back later
64✔
336
                                t.logger.Debugf("nextPageToken is empty for timer queue at level %d and there' no lookAheadTask. setting readLevel to maxReadLevel: %v", level, maxReadLevel)
64✔
337
                        }
64✔
338
                } else {
1✔
339
                        // more tasks should be loaded for this processing queue
1✔
340
                        // record the current progress and update the poll time
1✔
341
                        if level == defaultProcessingQueueLevel || !taskChFull {
2✔
342
                                t.logger.Debugf("upserting poll time for timer queue at level %d because nextPageToken is not empty and !taskChFull", level)
1✔
343
                                t.upsertPollTime(level, time.Time{})
1✔
344
                        } else {
1✔
345
                                t.logger.Debugf("setting up backoff timer for timer queue at level %d because nextPageToken is not empty and taskChFull", level)
×
346
                                t.setupBackoffTimer(level)
×
347
                        }
×
348
                        t.processingQueueReadProgress[level] = timeTaskReadProgress{
1✔
349
                                currentQueue:  activeQueue,
1✔
350
                                readLevel:     readLevel,
1✔
351
                                maxReadLevel:  maxReadLevel,
1✔
352
                                nextPageToken: resp.nextPageToken,
1✔
353
                        }
1✔
354
                        if len(resp.timerTasks) > 0 {
2✔
355
                                newReadLevel = newTimerTaskKey(resp.timerTasks[len(resp.timerTasks)-1].GetVisibilityTimestamp(), 0)
1✔
356
                        }
1✔
357
                }
358
                queueCollection.AddTasks(tasks, newReadLevel)
2,333✔
359
        }
360
}
361

362
// splitQueue splits the processing queue collection based on some policy
363
// and resets the timer with jitter for next run
364
func (t *timerQueueProcessorBase) splitQueue(splitQueueTimer *time.Timer) {
198✔
365
        splitPolicy := t.initializeSplitPolicy(
198✔
366
                func(key task.Key, domainID string) task.Key {
198✔
367
                        return newTimerTaskKey(
×
368
                                key.(timerTaskKey).visibilityTimestamp.Add(
×
369
                                        t.options.SplitLookAheadDurationByDomainID(domainID),
×
370
                                ),
×
371
                                0,
×
372
                        )
×
373
                },
×
374
        )
375

376
        t.splitProcessingQueueCollectionFn(splitPolicy, t.upsertPollTime)
198✔
377

198✔
378
        splitQueueTimer.Reset(backoff.JitDuration(
198✔
379
                t.options.SplitQueueInterval(),
198✔
380
                t.options.SplitQueueIntervalJitterCoefficient(),
198✔
381
        ))
198✔
382
}
383

384
// handleAckLevelUpdate updates ack level and resets timer with jitter.
385
// returns true if processing should be terminated
386
func (t *timerQueueProcessorBase) handleAckLevelUpdate(updateAckTimer *time.Timer) bool {
417✔
387
        processFinished, _, err := t.updateAckLevelFn()
417✔
388
        if err == shard.ErrShardClosed || (err == nil && processFinished) {
417✔
389
                return true
×
390
        }
×
391
        updateAckTimer.Reset(backoff.JitDuration(
417✔
392
                t.options.UpdateAckInterval(),
417✔
393
                t.options.UpdateAckIntervalJitterCoefficient(),
417✔
394
        ))
417✔
395
        return false
417✔
396
}
397

398
func (t *timerQueueProcessorBase) updateTimerGates() {
2,392✔
399
        maxRedispatchQueueSize := t.options.MaxRedispatchQueueSize()
2,392✔
400
        if t.redispatcher.Size() > maxRedispatchQueueSize {
2,392✔
401
                t.redispatcher.Redispatch(maxRedispatchQueueSize)
×
402
                if t.redispatcher.Size() > maxRedispatchQueueSize {
×
403
                        // if redispatcher still has a large number of tasks
×
404
                        // this only happens when system is under very high load
×
405
                        // we should backoff here instead of keeping submitting tasks to task processor
×
406
                        // don't call t.timerGate.Update(time.Now() + loadQueueTaskThrottleRetryDelay) as the time in
×
407
                        // standby timer processor is not real time and is managed separately
×
408
                        time.Sleep(backoff.JitDuration(
×
409
                                t.options.PollBackoffInterval(),
×
410
                                t.options.PollBackoffIntervalJitterCoefficient(),
×
411
                        ))
×
412
                }
×
413
                t.timerGate.Update(time.Time{})
×
414
                return
×
415
        }
416

417
        t.pollTimeLock.Lock()
2,392✔
418
        levels := make(map[int]struct{})
2,392✔
419
        now := t.shard.GetCurrentTime(t.clusterName)
2,392✔
420
        for level, pollTime := range t.nextPollTime {
4,784✔
421
                if !now.Before(pollTime) {
4,784✔
422
                        levels[level] = struct{}{}
2,392✔
423
                        delete(t.nextPollTime, level)
2,392✔
424
                } else {
2,392✔
425
                        t.timerGate.Update(pollTime)
×
426
                }
×
427
        }
428
        t.pollTimeLock.Unlock()
2,392✔
429

2,392✔
430
        t.processQueueCollections(levels)
2,392✔
431
}
432

433
func (t *timerQueueProcessorBase) handleNewTimer() {
3,023✔
434
        t.newTimeLock.Lock()
3,023✔
435
        newTime := t.newTime
3,023✔
436
        t.newTime = time.Time{}
3,023✔
437
        t.newTimeLock.Unlock()
3,023✔
438

3,023✔
439
        // New Timer has arrived.
3,023✔
440
        t.metricsScope.IncCounter(metrics.NewTimerNotifyCounter)
3,023✔
441
        // notify all queue collections as they are waiting for the notification when there's
3,023✔
442
        // no more task to process. For non-default queue, we choose to do periodic polling
3,023✔
443
        // in the future, then we don't need to notify them.
3,023✔
444
        for _, queueCollection := range t.processingQueueCollections {
6,046✔
445
                t.upsertPollTime(queueCollection.Level(), newTime)
3,023✔
446
        }
3,023✔
447
}
448

449
func (t *timerQueueProcessorBase) handleActionNotification(notification actionNotification) {
177✔
450
        t.processorBase.handleActionNotification(notification, func() {
354✔
451
                switch notification.action.ActionType {
177✔
452
                case ActionTypeReset:
×
453
                        t.upsertPollTime(defaultProcessingQueueLevel, time.Time{})
×
454
                }
455
        })
456
}
457

458
func (t *timerQueueProcessorBase) readAndFilterTasks(readLevel, maxReadLevel task.Key, nextPageToken []byte) (*filteredTimerTasksResponse, error) {
2,338✔
459
        resp, err := t.getTimerTasks(readLevel, maxReadLevel, nextPageToken, t.options.BatchSize())
2,338✔
460
        if err != nil {
2,338✔
461
                return nil, err
×
462
        }
×
463

464
        var lookAheadTask *persistence.TimerTaskInfo
2,338✔
465
        filteredTasks := []*persistence.TimerTaskInfo{}
2,338✔
466
        for _, timerTask := range resp.Timers {
6,183✔
467
                if !t.isProcessNow(timerTask.GetVisibilityTimestamp()) {
5,400✔
468
                        // found the first task that is not ready to be processed yet.
1,555✔
469
                        // reset NextPageToken so we can load more tasks starting from this lookAheadTask next time.
1,555✔
470
                        lookAheadTask = timerTask
1,555✔
471
                        resp.NextPageToken = nil
1,555✔
472
                        break
1,555✔
473
                }
474
                filteredTasks = append(filteredTasks, timerTask)
2,292✔
475
        }
476

477
        if len(resp.NextPageToken) == 0 && lookAheadTask == nil {
3,121✔
478
                // only look ahead within the processing queue boundary
783✔
479
                lookAheadTask, err = t.readLookAheadTask(maxReadLevel, maximumTimerTaskKey)
783✔
480
                if err != nil {
784✔
481
                        // we don't know if look ahead task exists or not, but we know if it exists,
1✔
482
                        // it's visibility timestamp is larger than or equal to maxReadLevel.
1✔
483
                        // so, create a fake look ahead task so another load can be triggered at that time.
1✔
484
                        lookAheadTask = &persistence.TimerTaskInfo{
1✔
485
                                VisibilityTimestamp: maxReadLevel.(timerTaskKey).visibilityTimestamp,
1✔
486
                        }
1✔
487
                }
1✔
488
        }
489

490
        t.logger.Debugf("readAndFilterTasks returning %d tasks and lookAheadTask: %#v for readLevel: %#v, maxReadLevel: %#v", len(filteredTasks), lookAheadTask, readLevel, maxReadLevel)
2,338✔
491
        return &filteredTimerTasksResponse{
2,338✔
492
                timerTasks:    filteredTasks,
2,338✔
493
                lookAheadTask: lookAheadTask,
2,338✔
494
                nextPageToken: resp.NextPageToken,
2,338✔
495
        }, nil
2,338✔
496
}
497

498
func (t *timerQueueProcessorBase) readLookAheadTask(lookAheadStartLevel task.Key, lookAheadMaxLevel task.Key) (*persistence.TimerTaskInfo, error) {
784✔
499
        resp, err := t.getTimerTasks(lookAheadStartLevel, lookAheadMaxLevel, nil, 1)
784✔
500
        if err != nil {
785✔
501
                return nil, err
1✔
502
        }
1✔
503

504
        if len(resp.Timers) == 1 {
1,504✔
505
                return resp.Timers[0], nil
721✔
506
        }
721✔
507
        return nil, nil
65✔
508
}
509

510
func (t *timerQueueProcessorBase) getTimerTasks(readLevel, maxReadLevel task.Key, nextPageToken []byte, batchSize int) (*persistence.GetTimerIndexTasksResponse, error) {
3,121✔
511
        request := &persistence.GetTimerIndexTasksRequest{
3,121✔
512
                MinTimestamp:  readLevel.(timerTaskKey).visibilityTimestamp,
3,121✔
513
                MaxTimestamp:  maxReadLevel.(timerTaskKey).visibilityTimestamp,
3,121✔
514
                BatchSize:     batchSize,
3,121✔
515
                NextPageToken: nextPageToken,
3,121✔
516
        }
3,121✔
517

3,121✔
518
        var err error
3,121✔
519
        var response *persistence.GetTimerIndexTasksResponse
3,121✔
520
        retryCount := t.shard.GetConfig().TimerProcessorGetFailureRetryCount()
3,121✔
521
        for attempt := 0; attempt < retryCount; attempt++ {
6,246✔
522
                response, err = t.shard.GetExecutionManager().GetTimerIndexTasks(context.Background(), request)
3,125✔
523
                if err == nil {
6,245✔
524
                        return response, nil
3,120✔
525
                }
3,120✔
526
                backoff := time.Duration(attempt*100) * time.Millisecond
5✔
527
                t.logger.Debugf("Failed to get timer tasks from execution manager. error: %v, attempt: %d, retryCount: %d, backoff: %v", err, attempt, retryCount, backoff)
5✔
528
                time.Sleep(backoff)
5✔
529
        }
530
        return nil, err
1✔
531
}
532

533
func (t *timerQueueProcessorBase) isProcessNow(expiryTime time.Time) bool {
3,849✔
534
        if expiryTime.IsZero() {
3,850✔
535
                // return true, but somewhere probably have bug creating empty timerTask.
1✔
536
                t.logger.Warn("Timer task has timestamp zero")
1✔
537
        }
1✔
538
        return !t.shard.GetCurrentTime(t.clusterName).Before(expiryTime)
3,849✔
539
}
540

541
func (t *timerQueueProcessorBase) notifyNewTimers(timerTasks []persistence.Task) {
3,050✔
542
        if len(timerTasks) == 0 {
3,050✔
543
                return
×
544
        }
×
545

546
        isActive := t.options.MetricScope == metrics.TimerActiveQueueProcessorScope
3,050✔
547
        minNewTime := timerTasks[0].GetVisibilityTimestamp()
3,050✔
548
        shardIDTag := metrics.ShardIDTag(t.shard.GetShardID())
3,050✔
549
        for _, timerTask := range timerTasks {
6,194✔
550
                ts := timerTask.GetVisibilityTimestamp()
3,144✔
551
                if ts.Before(minNewTime) {
3,220✔
552
                        minNewTime = ts
76✔
553
                }
76✔
554

555
                taskScopeIdx := task.GetTimerTaskMetricScope(
3,144✔
556
                        timerTask.GetType(),
3,144✔
557
                        isActive,
3,144✔
558
                )
3,144✔
559
                t.metricsClient.Scope(taskScopeIdx).Tagged(shardIDTag).IncCounter(metrics.NewTimerNotifyCounter)
3,144✔
560
        }
561

562
        t.notifyNewTimer(minNewTime)
3,050✔
563
}
564

565
func (t *timerQueueProcessorBase) notifyNewTimer(newTime time.Time) {
3,050✔
566
        t.newTimeLock.Lock()
3,050✔
567
        defer t.newTimeLock.Unlock()
3,050✔
568

3,050✔
569
        if t.newTime.IsZero() || newTime.Before(t.newTime) {
6,074✔
570
                t.logger.Debugf("Updating newTime from %v to %v", t.newTime, newTime)
3,024✔
571
                t.newTime = newTime
3,024✔
572
                select {
3,024✔
573
                case t.newTimerCh <- struct{}{}:
3,024✔
574
                        // Notified about new time.
UNCOV
575
                default:
×
576
                        // Channel "full" -> drop and move on, this will happen only if service is in high load.
577
                }
578
        }
579
}
580

581
func (t *timerQueueProcessorBase) upsertPollTime(level int, newPollTime time.Time) {
7,870✔
582
        t.pollTimeLock.Lock()
7,870✔
583
        defer t.pollTimeLock.Unlock()
7,870✔
584

7,870✔
585
        if _, ok := t.backoffTimer[level]; ok {
7,870✔
586
                // honor existing backoff timer
×
587
                t.logger.Debugf("Skipping upsertPollTime for timer queue at level %d because there's a backoff timer", level)
×
588
                return
×
589
        }
×
590

591
        currentPollTime, ok := t.nextPollTime[level]
7,870✔
592
        if !ok || newPollTime.Before(currentPollTime) {
13,075✔
593
                t.logger.Debugf("Updating poll timer for timer queue at level %d. CurrentPollTime: %v, newPollTime: %v", level, currentPollTime, newPollTime)
5,205✔
594
                t.nextPollTime[level] = newPollTime
5,205✔
595
                t.timerGate.Update(newPollTime)
5,205✔
596
                return
5,205✔
597
        }
5,205✔
598

599
        t.logger.Debugf("Skipping upsertPollTime for level %d because currentPollTime %v is before newPollTime %v", level, currentPollTime, newPollTime)
2,668✔
600
}
601

602
// setupBackoffTimer will trigger a poll for the specified processing queue collection
603
// after a certain period of (real) time. This means for standby timer, even if the cluster time
604
// has not been updated, the poll will still be triggered when the timer fired. Use this function
605
// for delaying the load for processing queue. If a poll should be triggered immediately
606
// use upsertPollTime.
607
func (t *timerQueueProcessorBase) setupBackoffTimer(level int) {
×
608
        t.pollTimeLock.Lock()
×
609
        defer t.pollTimeLock.Unlock()
×
610

×
611
        if _, ok := t.backoffTimer[level]; ok {
×
612
                // honor existing backoff timer
×
613
                return
×
614
        }
×
615

616
        t.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
×
617
        t.logger.Info("Throttled processing queue", tag.QueueLevel(level))
×
618
        backoffDuration := backoff.JitDuration(
×
619
                t.options.PollBackoffInterval(),
×
620
                t.options.PollBackoffIntervalJitterCoefficient(),
×
621
        )
×
622
        t.backoffTimer[level] = time.AfterFunc(backoffDuration, func() {
×
623
                select {
×
624
                case <-t.shutdownCh:
×
625
                        return
×
626
                default:
×
627
                }
628

629
                t.pollTimeLock.Lock()
×
630
                defer t.pollTimeLock.Unlock()
×
631

×
632
                t.nextPollTime[level] = time.Time{}
×
633
                t.timerGate.Update(time.Time{})
×
634
                delete(t.backoffTimer, level)
×
635
        })
636
}
637

638
func newTimerTaskKey(visibilityTimestamp time.Time, taskID int64) task.Key {
7,659✔
639
        return timerTaskKey{
7,659✔
640
                visibilityTimestamp: visibilityTimestamp,
7,659✔
641
                taskID:              taskID,
7,659✔
642
        }
7,659✔
643
}
7,659✔
644

645
func (k timerTaskKey) Less(
646
        key task.Key,
647
) bool {
28,419✔
648
        timerKey := key.(timerTaskKey)
28,419✔
649
        if k.visibilityTimestamp.Equal(timerKey.visibilityTimestamp) {
29,066✔
650
                return k.taskID < timerKey.taskID
647✔
651
        }
647✔
652
        return k.visibilityTimestamp.Before(timerKey.visibilityTimestamp)
27,775✔
653
}
654

655
func (k timerTaskKey) String() string {
2,332✔
656
        return fmt.Sprintf("{visibilityTimestamp: %v, taskID: %v}", k.visibilityTimestamp, k.taskID)
2,332✔
657
}
2,332✔
658

659
func newTimerQueueProcessorOptions(
660
        config *config.Config,
661
        isActive bool,
662
        isFailover bool,
663
) *queueProcessorOptions {
139✔
664
        options := &queueProcessorOptions{
139✔
665
                BatchSize:                            config.TimerTaskBatchSize,
139✔
666
                DeleteBatchSize:                      config.TimerTaskDeleteBatchSize,
139✔
667
                MaxPollRPS:                           config.TimerProcessorMaxPollRPS,
139✔
668
                MaxPollInterval:                      config.TimerProcessorMaxPollInterval,
139✔
669
                MaxPollIntervalJitterCoefficient:     config.TimerProcessorMaxPollIntervalJitterCoefficient,
139✔
670
                UpdateAckInterval:                    config.TimerProcessorUpdateAckInterval,
139✔
671
                UpdateAckIntervalJitterCoefficient:   config.TimerProcessorUpdateAckIntervalJitterCoefficient,
139✔
672
                RedispatchIntervalJitterCoefficient:  config.TaskRedispatchIntervalJitterCoefficient,
139✔
673
                MaxRedispatchQueueSize:               config.TimerProcessorMaxRedispatchQueueSize,
139✔
674
                SplitQueueInterval:                   config.TimerProcessorSplitQueueInterval,
139✔
675
                SplitQueueIntervalJitterCoefficient:  config.TimerProcessorSplitQueueIntervalJitterCoefficient,
139✔
676
                PollBackoffInterval:                  config.QueueProcessorPollBackoffInterval,
139✔
677
                PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
139✔
678
                EnableGracefulSyncShutdown:           config.QueueProcessorEnableGracefulSyncShutdown,
139✔
679
        }
139✔
680

139✔
681
        if isFailover {
139✔
682
                // disable queue split for failover processor
×
683
                options.EnableSplit = dynamicconfig.GetBoolPropertyFn(false)
×
684

×
685
                // disable persist and load processing queue states for failover processor as it will never be split
×
686
                options.EnablePersistQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
687
                options.EnableLoadQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
688

×
689
                options.MaxStartJitterInterval = config.TimerProcessorFailoverMaxStartJitterInterval
×
690
        } else {
139✔
691
                options.EnableSplit = config.QueueProcessorEnableSplit
139✔
692
                options.SplitMaxLevel = config.QueueProcessorSplitMaxLevel
139✔
693
                options.EnableRandomSplitByDomainID = config.QueueProcessorEnableRandomSplitByDomainID
139✔
694
                options.RandomSplitProbability = config.QueueProcessorRandomSplitProbability
139✔
695
                options.EnablePendingTaskSplitByDomainID = config.QueueProcessorEnablePendingTaskSplitByDomainID
139✔
696
                options.PendingTaskSplitThreshold = config.QueueProcessorPendingTaskSplitThreshold
139✔
697
                options.EnableStuckTaskSplitByDomainID = config.QueueProcessorEnableStuckTaskSplitByDomainID
139✔
698
                options.StuckTaskSplitThreshold = config.QueueProcessorStuckTaskSplitThreshold
139✔
699
                options.SplitLookAheadDurationByDomainID = config.QueueProcessorSplitLookAheadDurationByDomainID
139✔
700

139✔
701
                options.EnablePersistQueueStates = config.QueueProcessorEnablePersistQueueStates
139✔
702
                options.EnableLoadQueueStates = config.QueueProcessorEnableLoadQueueStates
139✔
703

139✔
704
                options.MaxStartJitterInterval = dynamicconfig.GetDurationPropertyFn(0)
139✔
705
        }
139✔
706

707
        if isActive {
218✔
708
                options.MetricScope = metrics.TimerActiveQueueProcessorScope
79✔
709
                options.RedispatchInterval = config.ActiveTaskRedispatchInterval
79✔
710
        } else {
142✔
711
                options.MetricScope = metrics.TimerStandbyQueueProcessorScope
63✔
712
                options.RedispatchInterval = config.StandbyTaskRedispatchInterval
63✔
713
        }
63✔
714

715
        return options
139✔
716
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc