• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e9bcd-d2d9-47d4-85d6-f340468831ee

01 Apr 2024 10:35PM UTC coverage: 65.306% (+0.03%) from 65.275%
018e9bcd-d2d9-47d4-85d6-f340468831ee

push

buildkite

web-flow
mock object and basic test cases for visibility single manager (#5829)

* mock object and basic test cases for visibility single manager

* forgot to add the new mock file

95565 of 146334 relevant lines covered (65.31%)

2340.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.82
/service/history/queue/timer_queue_processor_base.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "fmt"
26
        "math"
27
        "math/rand"
28
        "sync"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/uber/cadence/common"
33
        "github.com/uber/cadence/common/backoff"
34
        "github.com/uber/cadence/common/dynamicconfig"
35
        "github.com/uber/cadence/common/log"
36
        "github.com/uber/cadence/common/log/tag"
37
        "github.com/uber/cadence/common/metrics"
38
        "github.com/uber/cadence/common/persistence"
39
        "github.com/uber/cadence/service/history/config"
40
        "github.com/uber/cadence/service/history/shard"
41
        "github.com/uber/cadence/service/history/task"
42
)
43

44
var (
45
        maximumTimerTaskKey = newTimerTaskKey(
46
                time.Unix(0, math.MaxInt64),
47
                0,
48
        )
49
)
50

51
type (
52
        timerTaskKey struct {
53
                visibilityTimestamp time.Time
54
                taskID              int64
55
        }
56

57
        timeTaskReadProgress struct {
58
                currentQueue  ProcessingQueue
59
                readLevel     task.Key
60
                maxReadLevel  task.Key
61
                nextPageToken []byte
62
        }
63

64
        timerQueueProcessorBase struct {
65
                *processorBase
66

67
                taskInitializer task.Initializer
68
                clusterName     string
69
                pollTimeLock    sync.Mutex
70
                backoffTimer    map[int]*time.Timer
71
                nextPollTime    map[int]time.Time
72
                timerGate       TimerGate
73

74
                // timer notification
75
                newTimerCh  chan struct{}
76
                newTimeLock sync.Mutex
77
                newTime     time.Time
78

79
                processingQueueReadProgress map[int]timeTaskReadProgress
80

81
                updateAckLevelFn                 func() (bool, task.Key, error)
82
                splitProcessingQueueCollectionFn func(splitPolicy ProcessingQueueSplitPolicy, upsertPollTimeFn func(int, time.Time))
83
        }
84

85
        filteredTimerTasksResponse struct {
86
                timerTasks    []*persistence.TimerTaskInfo
87
                lookAheadTask *persistence.TimerTaskInfo
88
                nextPageToken []byte
89
        }
90
)
91

92
func newTimerQueueProcessorBase(
93
        clusterName string,
94
        shard shard.Context,
95
        processingQueueStates []ProcessingQueueState,
96
        taskProcessor task.Processor,
97
        timerGate TimerGate,
98
        options *queueProcessorOptions,
99
        updateMaxReadLevel updateMaxReadLevelFn,
100
        updateClusterAckLevel updateClusterAckLevelFn,
101
        updateProcessingQueueStates updateProcessingQueueStatesFn,
102
        queueShutdown queueShutdownFn,
103
        taskFilter task.Filter,
104
        taskExecutor task.Executor,
105
        logger log.Logger,
106
        metricsClient metrics.Client,
107
) *timerQueueProcessorBase {
115✔
108
        processorBase := newProcessorBase(
115✔
109
                shard,
115✔
110
                processingQueueStates,
115✔
111
                taskProcessor,
115✔
112
                options,
115✔
113
                updateMaxReadLevel,
115✔
114
                updateClusterAckLevel,
115✔
115
                updateProcessingQueueStates,
115✔
116
                queueShutdown,
115✔
117
                logger.WithTags(tag.ComponentTimerQueue),
115✔
118
                metricsClient,
115✔
119
        )
115✔
120

115✔
121
        queueType := task.QueueTypeActiveTimer
115✔
122
        if options.MetricScope == metrics.TimerStandbyQueueProcessorScope {
166✔
123
                queueType = task.QueueTypeStandbyTimer
51✔
124
        }
51✔
125

126
        t := &timerQueueProcessorBase{
115✔
127
                processorBase: processorBase,
115✔
128
                taskInitializer: func(taskInfo task.Info) task.Task {
2,375✔
129
                        return task.NewTimerTask(
2,260✔
130
                                shard,
2,260✔
131
                                taskInfo,
2,260✔
132
                                queueType,
2,260✔
133
                                task.InitializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
2,260✔
134
                                taskFilter,
2,260✔
135
                                taskExecutor,
2,260✔
136
                                taskProcessor,
2,260✔
137
                                processorBase.redispatcher.AddTask,
2,260✔
138
                                shard.GetConfig().TaskCriticalRetryCount,
2,260✔
139
                        )
2,260✔
140
                },
2,260✔
141
                clusterName:                 clusterName,
142
                backoffTimer:                make(map[int]*time.Timer),
143
                nextPollTime:                make(map[int]time.Time),
144
                timerGate:                   timerGate,
145
                newTimerCh:                  make(chan struct{}, 1),
146
                processingQueueReadProgress: make(map[int]timeTaskReadProgress),
147
        }
148

149
        t.updateAckLevelFn = t.updateAckLevel
115✔
150
        t.splitProcessingQueueCollectionFn = t.splitProcessingQueueCollection
115✔
151
        return t
115✔
152
}
153

154
func (t *timerQueueProcessorBase) Start() {
101✔
155
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
101✔
156
                return
×
157
        }
×
158

159
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarting)
101✔
160
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarted)
101✔
161

101✔
162
        t.redispatcher.Start()
101✔
163

101✔
164
        newPollTime := time.Time{}
101✔
165
        if startJitter := t.options.MaxStartJitterInterval(); startJitter > 0 {
101✔
166
                now := t.shard.GetTimeSource().Now()
×
167
                newPollTime = now.Add(time.Duration(rand.Int63n(int64(startJitter))))
×
168
        }
×
169
        for _, queueCollections := range t.processingQueueCollections {
202✔
170
                t.upsertPollTime(queueCollections.Level(), newPollTime)
101✔
171
        }
101✔
172

173
        t.shutdownWG.Add(1)
101✔
174
        go t.processorPump()
101✔
175
}
176

177
func (t *timerQueueProcessorBase) Stop() {
101✔
178
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
101✔
179
                return
×
180
        }
×
181

182
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopping)
101✔
183
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopped)
101✔
184

101✔
185
        t.timerGate.Close()
101✔
186
        close(t.shutdownCh)
101✔
187
        t.pollTimeLock.Lock()
101✔
188
        for _, timer := range t.backoffTimer {
101✔
189
                timer.Stop()
×
190
        }
×
191
        t.pollTimeLock.Unlock()
101✔
192

101✔
193
        if success := common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout); !success {
101✔
194
                t.logger.Warn("timerQueueProcessorBase timed out on shut down", tag.LifeCycleStopTimedout)
×
195
        }
×
196

197
        t.redispatcher.Stop()
101✔
198
}
199

200
func (t *timerQueueProcessorBase) processorPump() {
101✔
201
        defer t.shutdownWG.Done()
101✔
202
        updateAckTimer := time.NewTimer(backoff.JitDuration(t.options.UpdateAckInterval(), t.options.UpdateAckIntervalJitterCoefficient()))
101✔
203
        defer updateAckTimer.Stop()
101✔
204
        splitQueueTimer := time.NewTimer(backoff.JitDuration(t.options.SplitQueueInterval(), t.options.SplitQueueIntervalJitterCoefficient()))
101✔
205
        defer splitQueueTimer.Stop()
101✔
206

101✔
207
        for {
6,319✔
208
                select {
6,218✔
209
                case <-t.shutdownCh:
101✔
210
                        return
101✔
211
                case <-t.timerGate.FireChan():
2,353✔
212
                        t.updateTimerGates()
2,353✔
213
                case <-updateAckTimer.C:
405✔
214
                        if stopPump := t.handleAckLevelUpdate(updateAckTimer); stopPump {
405✔
215
                                if !t.options.EnableGracefulSyncShutdown() {
×
216
                                        go t.Stop()
×
217
                                        return
×
218
                                }
×
219

220
                                t.Stop()
×
221
                                return
×
222
                        }
223
                case <-t.newTimerCh:
3,009✔
224
                        t.handleNewTimer()
3,009✔
225
                case <-splitQueueTimer.C:
191✔
226
                        t.splitQueue(splitQueueTimer)
191✔
227
                case notification := <-t.actionNotifyCh:
168✔
228
                        t.handleActionNotification(notification)
168✔
229
                }
230
        }
231
}
232

233
func (t *timerQueueProcessorBase) processQueueCollections(levels map[int]struct{}) {
2,357✔
234
        for _, queueCollection := range t.processingQueueCollections {
4,714✔
235
                level := queueCollection.Level()
2,357✔
236
                if _, ok := levels[level]; !ok {
2,357✔
237
                        continue
×
238
                }
239

240
                activeQueue := queueCollection.ActiveQueue()
2,357✔
241
                if activeQueue == nil {
2,357✔
242
                        // process for this queue collection has finished
×
243
                        // it's possible that new queue will be added to this collection later though,
×
244
                        // pollTime will be updated after split/merge
×
245
                        t.logger.Debug("Active queue is nil for timer queue at this level", tag.QueueLevel(level))
×
246
                        continue
×
247
                }
248

249
                t.upsertPollTime(level, t.shard.GetCurrentTime(t.clusterName).Add(backoff.JitDuration(
2,357✔
250
                        t.options.MaxPollInterval(),
2,357✔
251
                        t.options.MaxPollIntervalJitterCoefficient(),
2,357✔
252
                )))
2,357✔
253

2,357✔
254
                var nextPageToken []byte
2,357✔
255
                readLevel := activeQueue.State().ReadLevel()
2,357✔
256
                maxReadLevel := minTaskKey(activeQueue.State().MaxLevel(), t.updateMaxReadLevel())
2,357✔
257
                domainFilter := activeQueue.State().DomainFilter()
2,357✔
258

2,357✔
259
                if progress, ok := t.processingQueueReadProgress[level]; ok {
2,359✔
260
                        if progress.currentQueue == activeQueue {
4✔
261
                                readLevel = progress.readLevel
2✔
262
                                maxReadLevel = progress.maxReadLevel
2✔
263
                                nextPageToken = progress.nextPageToken
2✔
264
                        }
2✔
265
                        delete(t.processingQueueReadProgress, level)
2✔
266
                }
267

268
                if !readLevel.Less(maxReadLevel) {
2,411✔
269
                        // notify timer gate about the min time
54✔
270
                        t.upsertPollTime(level, readLevel.(timerTaskKey).visibilityTimestamp)
54✔
271
                        t.logger.Debug("Skipping processing timer queue at this level because readLevel >= maxReadLevel", tag.QueueLevel(level))
54✔
272
                        continue
54✔
273
                }
274

275
                ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay)
2,306✔
276
                if err := t.rateLimiter.Wait(ctx); err != nil {
2,306✔
277
                        cancel()
×
278
                        if level == defaultProcessingQueueLevel {
×
279
                                t.upsertPollTime(level, time.Time{})
×
280
                        } else {
×
281
                                t.setupBackoffTimer(level)
×
282
                        }
×
283
                        continue
×
284
                }
285
                cancel()
2,306✔
286

2,306✔
287
                resp, err := t.readAndFilterTasks(readLevel, maxReadLevel, nextPageToken)
2,306✔
288
                if err != nil {
2,306✔
289
                        t.logger.Error("Processor unable to retrieve tasks", tag.Error(err))
×
290
                        t.upsertPollTime(level, time.Time{}) // re-enqueue the event
×
291
                        continue
×
292
                }
293

294
                tasks := make(map[task.Key]task.Task)
2,306✔
295
                taskChFull := false
2,306✔
296
                submittedCount := 0
2,306✔
297
                for _, taskInfo := range resp.timerTasks {
4,568✔
298
                        if !domainFilter.Filter(taskInfo.GetDomainID()) {
2,264✔
299
                                continue
2✔
300
                        }
301

302
                        task := t.taskInitializer(taskInfo)
2,260✔
303
                        tasks[newTimerTaskKey(taskInfo.GetVisibilityTimestamp(), taskInfo.GetTaskID())] = task
2,260✔
304
                        submitted, err := t.submitTask(task)
2,260✔
305
                        if err != nil {
2,260✔
306
                                // only err here is due to the fact that processor has been shutdown
×
307
                                // return instead of continue
×
308
                                return
×
309
                        }
×
310
                        taskChFull = taskChFull || !submitted
2,260✔
311
                        if submitted {
4,520✔
312
                                submittedCount++
2,260✔
313
                        }
2,260✔
314
                }
315
                t.logger.Debugf("Submitted %d timer tasks successfully out of %d tasks", submittedCount, len(resp.timerTasks))
2,306✔
316

2,306✔
317
                var newReadLevel task.Key
2,306✔
318
                if len(resp.nextPageToken) == 0 {
4,611✔
319
                        newReadLevel = maxReadLevel
2,305✔
320
                        if resp.lookAheadTask != nil {
4,561✔
321
                                // lookAheadTask may exist only when nextPageToken is empty
2,256✔
322
                                // notice that lookAheadTask.VisibilityTimestamp may be larger than shard max read level,
2,256✔
323
                                // which means new tasks can be generated before that timestamp. This issue is solved by
2,256✔
324
                                // upsertPollTime whenever there are new tasks
2,256✔
325
                                lookAheadTimestamp := resp.lookAheadTask.GetVisibilityTimestamp()
2,256✔
326
                                t.upsertPollTime(level, lookAheadTimestamp)
2,256✔
327
                                newReadLevel = minTaskKey(newReadLevel, newTimerTaskKey(lookAheadTimestamp, 0))
2,256✔
328
                                t.logger.Debugf("nextPageToken is empty for timer queue at level %d so setting newReadLevel to max(lookAheadTask.timestamp: %v, maxReadLevel: %v)", level, lookAheadTimestamp, maxReadLevel)
2,256✔
329
                        } else {
2,308✔
330
                                // else we have no idea when the next poll should happen
52✔
331
                                // rely on notifyNewTask to trigger the next poll even for non-default queue.
52✔
332
                                // another option for non-default queue is that we can setup a backoff timer to check back later
52✔
333
                                t.logger.Debugf("nextPageToken is empty for timer queue at level %d and there' no lookAheadTask. setting readLevel to maxReadLevel: %v", level, maxReadLevel)
52✔
334
                        }
52✔
335
                } else {
1✔
336
                        // more tasks should be loaded for this processing queue
1✔
337
                        // record the current progress and update the poll time
1✔
338
                        if level == defaultProcessingQueueLevel || !taskChFull {
2✔
339
                                t.logger.Debugf("upserting poll time for timer queue at level %d because nextPageToken is not empty and !taskChFull", level)
1✔
340
                                t.upsertPollTime(level, time.Time{})
1✔
341
                        } else {
1✔
342
                                t.logger.Debugf("setting up backoff timer for timer queue at level %d because nextPageToken is not empty and taskChFull", level)
×
343
                                t.setupBackoffTimer(level)
×
344
                        }
×
345
                        t.processingQueueReadProgress[level] = timeTaskReadProgress{
1✔
346
                                currentQueue:  activeQueue,
1✔
347
                                readLevel:     readLevel,
1✔
348
                                maxReadLevel:  maxReadLevel,
1✔
349
                                nextPageToken: resp.nextPageToken,
1✔
350
                        }
1✔
351
                        if len(resp.timerTasks) > 0 {
2✔
352
                                newReadLevel = newTimerTaskKey(resp.timerTasks[len(resp.timerTasks)-1].GetVisibilityTimestamp(), 0)
1✔
353
                        }
1✔
354
                }
355
                queueCollection.AddTasks(tasks, newReadLevel)
2,306✔
356
        }
357
}
358

359
// splitQueue splits the processing queue collection based on some policy
360
// and resets the timer with jitter for next run
361
func (t *timerQueueProcessorBase) splitQueue(splitQueueTimer *time.Timer) {
191✔
362
        splitPolicy := t.initializeSplitPolicy(
191✔
363
                func(key task.Key, domainID string) task.Key {
191✔
364
                        return newTimerTaskKey(
×
365
                                key.(timerTaskKey).visibilityTimestamp.Add(
×
366
                                        t.options.SplitLookAheadDurationByDomainID(domainID),
×
367
                                ),
×
368
                                0,
×
369
                        )
×
370
                },
×
371
        )
372

373
        t.splitProcessingQueueCollectionFn(splitPolicy, t.upsertPollTime)
191✔
374

191✔
375
        splitQueueTimer.Reset(backoff.JitDuration(
191✔
376
                t.options.SplitQueueInterval(),
191✔
377
                t.options.SplitQueueIntervalJitterCoefficient(),
191✔
378
        ))
191✔
379
}
380

381
// handleAckLevelUpdate updates ack level and resets timer with jitter.
382
// returns true if processing should be terminated
383
func (t *timerQueueProcessorBase) handleAckLevelUpdate(updateAckTimer *time.Timer) bool {
405✔
384
        processFinished, _, err := t.updateAckLevelFn()
405✔
385
        if err == shard.ErrShardClosed || (err == nil && processFinished) {
405✔
386
                return true
×
387
        }
×
388
        updateAckTimer.Reset(backoff.JitDuration(
405✔
389
                t.options.UpdateAckInterval(),
405✔
390
                t.options.UpdateAckIntervalJitterCoefficient(),
405✔
391
        ))
405✔
392
        return false
405✔
393
}
394

395
func (t *timerQueueProcessorBase) updateTimerGates() {
2,353✔
396
        maxRedispatchQueueSize := t.options.MaxRedispatchQueueSize()
2,353✔
397
        if t.redispatcher.Size() > maxRedispatchQueueSize {
2,353✔
398
                t.redispatcher.Redispatch(maxRedispatchQueueSize)
×
399
                if t.redispatcher.Size() > maxRedispatchQueueSize {
×
400
                        // if redispatcher still has a large number of tasks
×
401
                        // this only happens when system is under very high load
×
402
                        // we should backoff here instead of keeping submitting tasks to task processor
×
403
                        // don't call t.timerGate.Update(time.Now() + loadQueueTaskThrottleRetryDelay) as the time in
×
404
                        // standby timer processor is not real time and is managed separately
×
405
                        time.Sleep(backoff.JitDuration(
×
406
                                t.options.PollBackoffInterval(),
×
407
                                t.options.PollBackoffIntervalJitterCoefficient(),
×
408
                        ))
×
409
                }
×
410
                t.timerGate.Update(time.Time{})
×
411
                return
×
412
        }
413

414
        t.pollTimeLock.Lock()
2,353✔
415
        levels := make(map[int]struct{})
2,353✔
416
        now := t.shard.GetCurrentTime(t.clusterName)
2,353✔
417
        for level, pollTime := range t.nextPollTime {
4,706✔
418
                if !now.Before(pollTime) {
4,706✔
419
                        levels[level] = struct{}{}
2,353✔
420
                        delete(t.nextPollTime, level)
2,353✔
421
                } else {
2,353✔
422
                        t.timerGate.Update(pollTime)
×
423
                }
×
424
        }
425
        t.pollTimeLock.Unlock()
2,353✔
426

2,353✔
427
        t.processQueueCollections(levels)
2,353✔
428
}
429

430
func (t *timerQueueProcessorBase) handleNewTimer() {
3,009✔
431
        t.newTimeLock.Lock()
3,009✔
432
        newTime := t.newTime
3,009✔
433
        t.newTime = time.Time{}
3,009✔
434
        t.newTimeLock.Unlock()
3,009✔
435

3,009✔
436
        // New Timer has arrived.
3,009✔
437
        t.metricsScope.IncCounter(metrics.NewTimerNotifyCounter)
3,009✔
438
        // notify all queue collections as they are waiting for the notification when there's
3,009✔
439
        // no more task to process. For non-default queue, we choose to do periodic polling
3,009✔
440
        // in the future, then we don't need to notify them.
3,009✔
441
        for _, queueCollection := range t.processingQueueCollections {
6,018✔
442
                t.upsertPollTime(queueCollection.Level(), newTime)
3,009✔
443
        }
3,009✔
444
}
445

446
func (t *timerQueueProcessorBase) handleActionNotification(notification actionNotification) {
168✔
447
        t.processorBase.handleActionNotification(notification, func() {
336✔
448
                switch notification.action.ActionType {
168✔
449
                case ActionTypeReset:
×
450
                        t.upsertPollTime(defaultProcessingQueueLevel, time.Time{})
×
451
                }
452
        })
453
}
454

455
func (t *timerQueueProcessorBase) readAndFilterTasks(readLevel, maxReadLevel task.Key, nextPageToken []byte) (*filteredTimerTasksResponse, error) {
2,311✔
456
        resp, err := t.getTimerTasks(readLevel, maxReadLevel, nextPageToken, t.options.BatchSize())
2,311✔
457
        if err != nil {
2,311✔
458
                return nil, err
×
459
        }
×
460

461
        var lookAheadTask *persistence.TimerTaskInfo
2,311✔
462
        filteredTasks := []*persistence.TimerTaskInfo{}
2,311✔
463
        for _, timerTask := range resp.Timers {
6,138✔
464
                if !t.isProcessNow(timerTask.GetVisibilityTimestamp()) {
5,389✔
465
                        // found the first task that is not ready to be processed yet.
1,562✔
466
                        // reset NextPageToken so we can load more tasks starting from this lookAheadTask next time.
1,562✔
467
                        lookAheadTask = timerTask
1,562✔
468
                        resp.NextPageToken = nil
1,562✔
469
                        break
1,562✔
470
                }
471
                filteredTasks = append(filteredTasks, timerTask)
2,268✔
472
        }
473

474
        if len(resp.NextPageToken) == 0 && lookAheadTask == nil {
3,061✔
475
                // only look ahead within the processing queue boundary
750✔
476
                lookAheadTask, err = t.readLookAheadTask(maxReadLevel, maximumTimerTaskKey)
750✔
477
                if err != nil {
751✔
478
                        // we don't know if look ahead task exists or not, but we know if it exists,
1✔
479
                        // it's visibility timestamp is larger than or equal to maxReadLevel.
1✔
480
                        // so, create a fake look ahead task so another load can be triggered at that time.
1✔
481
                        lookAheadTask = &persistence.TimerTaskInfo{
1✔
482
                                VisibilityTimestamp: maxReadLevel.(timerTaskKey).visibilityTimestamp,
1✔
483
                        }
1✔
484
                }
1✔
485
        }
486

487
        t.logger.Debugf("readAndFilterTasks returning %d tasks and lookAheadTask: %#v for readLevel: %#v, maxReadLevel: %#v", len(filteredTasks), lookAheadTask, readLevel, maxReadLevel)
2,311✔
488
        return &filteredTimerTasksResponse{
2,311✔
489
                timerTasks:    filteredTasks,
2,311✔
490
                lookAheadTask: lookAheadTask,
2,311✔
491
                nextPageToken: resp.NextPageToken,
2,311✔
492
        }, nil
2,311✔
493
}
494

495
func (t *timerQueueProcessorBase) readLookAheadTask(lookAheadStartLevel task.Key, lookAheadMaxLevel task.Key) (*persistence.TimerTaskInfo, error) {
751✔
496
        resp, err := t.getTimerTasks(lookAheadStartLevel, lookAheadMaxLevel, nil, 1)
751✔
497
        if err != nil {
752✔
498
                return nil, err
1✔
499
        }
1✔
500

501
        if len(resp.Timers) == 1 {
1,450✔
502
                return resp.Timers[0], nil
700✔
503
        }
700✔
504
        return nil, nil
53✔
505
}
506

507
func (t *timerQueueProcessorBase) getTimerTasks(readLevel, maxReadLevel task.Key, nextPageToken []byte, batchSize int) (*persistence.GetTimerIndexTasksResponse, error) {
3,061✔
508
        request := &persistence.GetTimerIndexTasksRequest{
3,061✔
509
                MinTimestamp:  readLevel.(timerTaskKey).visibilityTimestamp,
3,061✔
510
                MaxTimestamp:  maxReadLevel.(timerTaskKey).visibilityTimestamp,
3,061✔
511
                BatchSize:     batchSize,
3,061✔
512
                NextPageToken: nextPageToken,
3,061✔
513
        }
3,061✔
514

3,061✔
515
        var err error
3,061✔
516
        var response *persistence.GetTimerIndexTasksResponse
3,061✔
517
        retryCount := t.shard.GetConfig().TimerProcessorGetFailureRetryCount()
3,061✔
518
        for attempt := 0; attempt < retryCount; attempt++ {
6,126✔
519
                response, err = t.shard.GetExecutionManager().GetTimerIndexTasks(context.Background(), request)
3,065✔
520
                if err == nil {
6,125✔
521
                        return response, nil
3,060✔
522
                }
3,060✔
523
                backoff := time.Duration(attempt*100) * time.Millisecond
5✔
524
                t.logger.Debugf("Failed to get timer tasks from execution manager. error: %v, attempt: %d, retryCount: %d, backoff: %v", err, attempt, retryCount, backoff)
5✔
525
                time.Sleep(backoff)
5✔
526
        }
527
        return nil, err
1✔
528
}
529

530
func (t *timerQueueProcessorBase) isProcessNow(expiryTime time.Time) bool {
3,831✔
531
        if expiryTime.IsZero() {
3,832✔
532
                // return true, but somewhere probably have bug creating empty timerTask.
1✔
533
                t.logger.Warn("Timer task has timestamp zero")
1✔
534
        }
1✔
535
        return !t.shard.GetCurrentTime(t.clusterName).Before(expiryTime)
3,831✔
536
}
537

538
func (t *timerQueueProcessorBase) notifyNewTimers(timerTasks []persistence.Task) {
3,034✔
539
        if len(timerTasks) == 0 {
3,034✔
540
                return
×
541
        }
×
542

543
        isActive := t.options.MetricScope == metrics.TimerActiveQueueProcessorScope
3,034✔
544
        minNewTime := timerTasks[0].GetVisibilityTimestamp()
3,034✔
545
        shardIDTag := metrics.ShardIDTag(t.shard.GetShardID())
3,034✔
546
        for _, timerTask := range timerTasks {
6,162✔
547
                ts := timerTask.GetVisibilityTimestamp()
3,128✔
548
                if ts.Before(minNewTime) {
3,204✔
549
                        minNewTime = ts
76✔
550
                }
76✔
551

552
                taskScopeIdx := task.GetTimerTaskMetricScope(
3,128✔
553
                        timerTask.GetType(),
3,128✔
554
                        isActive,
3,128✔
555
                )
3,128✔
556
                t.metricsClient.Scope(taskScopeIdx).Tagged(shardIDTag).IncCounter(metrics.NewTimerNotifyCounter)
3,128✔
557
        }
558

559
        t.notifyNewTimer(minNewTime)
3,034✔
560
}
561

562
func (t *timerQueueProcessorBase) notifyNewTimer(newTime time.Time) {
3,034✔
563
        t.newTimeLock.Lock()
3,034✔
564
        defer t.newTimeLock.Unlock()
3,034✔
565

3,034✔
566
        if t.newTime.IsZero() || newTime.Before(t.newTime) {
6,046✔
567
                t.logger.Debugf("Updating newTime from %v to %v", t.newTime, newTime)
3,012✔
568
                t.newTime = newTime
3,012✔
569
                select {
3,012✔
570
                case t.newTimerCh <- struct{}{}:
3,010✔
571
                        // Notified about new time.
572
                default:
2✔
573
                        // Channel "full" -> drop and move on, this will happen only if service is in high load.
574
                }
575
        }
576
}
577

578
func (t *timerQueueProcessorBase) upsertPollTime(level int, newPollTime time.Time) {
7,766✔
579
        t.pollTimeLock.Lock()
7,766✔
580
        defer t.pollTimeLock.Unlock()
7,766✔
581

7,766✔
582
        if _, ok := t.backoffTimer[level]; ok {
7,766✔
583
                // honor existing backoff timer
×
584
                t.logger.Debugf("Skipping upsertPollTime for timer queue at level %d because there's a backoff timer", level)
×
585
                return
×
586
        }
×
587

588
        currentPollTime, ok := t.nextPollTime[level]
7,766✔
589
        if !ok || newPollTime.Before(currentPollTime) {
12,904✔
590
                t.logger.Debugf("Updating poll timer for timer queue at level %d. CurrentPollTime: %v, newPollTime: %v", level, currentPollTime, newPollTime)
5,138✔
591
                t.nextPollTime[level] = newPollTime
5,138✔
592
                t.timerGate.Update(newPollTime)
5,138✔
593
                return
5,138✔
594
        }
5,138✔
595

596
        t.logger.Debugf("Skipping upsertPollTime for level %d because currentPollTime %v is before newPollTime %v", level, currentPollTime, newPollTime)
2,631✔
597
}
598

599
// setupBackoffTimer will trigger a poll for the specified processing queue collection
600
// after a certain period of (real) time. This means for standby timer, even if the cluster time
601
// has not been updated, the poll will still be triggered when the timer fired. Use this function
602
// for delaying the load for processing queue. If a poll should be triggered immediately
603
// use upsertPollTime.
604
func (t *timerQueueProcessorBase) setupBackoffTimer(level int) {
×
605
        t.pollTimeLock.Lock()
×
606
        defer t.pollTimeLock.Unlock()
×
607

×
608
        if _, ok := t.backoffTimer[level]; ok {
×
609
                // honor existing backoff timer
×
610
                return
×
611
        }
×
612

613
        t.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
×
614
        t.logger.Info("Throttled processing queue", tag.QueueLevel(level))
×
615
        backoffDuration := backoff.JitDuration(
×
616
                t.options.PollBackoffInterval(),
×
617
                t.options.PollBackoffIntervalJitterCoefficient(),
×
618
        )
×
619
        t.backoffTimer[level] = time.AfterFunc(backoffDuration, func() {
×
620
                select {
×
621
                case <-t.shutdownCh:
×
622
                        return
×
623
                default:
×
624
                }
625

626
                t.pollTimeLock.Lock()
×
627
                defer t.pollTimeLock.Unlock()
×
628

×
629
                t.nextPollTime[level] = time.Time{}
×
630
                t.timerGate.Update(time.Time{})
×
631
                delete(t.backoffTimer, level)
×
632
        })
633
}
634

635
func newTimerTaskKey(visibilityTimestamp time.Time, taskID int64) task.Key {
7,521✔
636
        return timerTaskKey{
7,521✔
637
                visibilityTimestamp: visibilityTimestamp,
7,521✔
638
                taskID:              taskID,
7,521✔
639
        }
7,521✔
640
}
7,521✔
641

642
func (k timerTaskKey) Less(
643
        key task.Key,
644
) bool {
26,859✔
645
        timerKey := key.(timerTaskKey)
26,859✔
646
        if k.visibilityTimestamp.Equal(timerKey.visibilityTimestamp) {
27,470✔
647
                return k.taskID < timerKey.taskID
611✔
648
        }
611✔
649
        return k.visibilityTimestamp.Before(timerKey.visibilityTimestamp)
26,251✔
650
}
651

652
func (k timerTaskKey) String() string {
2,305✔
653
        return fmt.Sprintf("{visibilityTimestamp: %v, taskID: %v}", k.visibilityTimestamp, k.taskID)
2,305✔
654
}
2,305✔
655

656
func newTimerQueueProcessorOptions(
657
        config *config.Config,
658
        isActive bool,
659
        isFailover bool,
660
) *queueProcessorOptions {
115✔
661
        options := &queueProcessorOptions{
115✔
662
                BatchSize:                            config.TimerTaskBatchSize,
115✔
663
                DeleteBatchSize:                      config.TimerTaskDeleteBatchSize,
115✔
664
                MaxPollRPS:                           config.TimerProcessorMaxPollRPS,
115✔
665
                MaxPollInterval:                      config.TimerProcessorMaxPollInterval,
115✔
666
                MaxPollIntervalJitterCoefficient:     config.TimerProcessorMaxPollIntervalJitterCoefficient,
115✔
667
                UpdateAckInterval:                    config.TimerProcessorUpdateAckInterval,
115✔
668
                UpdateAckIntervalJitterCoefficient:   config.TimerProcessorUpdateAckIntervalJitterCoefficient,
115✔
669
                RedispatchIntervalJitterCoefficient:  config.TaskRedispatchIntervalJitterCoefficient,
115✔
670
                MaxRedispatchQueueSize:               config.TimerProcessorMaxRedispatchQueueSize,
115✔
671
                SplitQueueInterval:                   config.TimerProcessorSplitQueueInterval,
115✔
672
                SplitQueueIntervalJitterCoefficient:  config.TimerProcessorSplitQueueIntervalJitterCoefficient,
115✔
673
                PollBackoffInterval:                  config.QueueProcessorPollBackoffInterval,
115✔
674
                PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
115✔
675
                EnableGracefulSyncShutdown:           config.QueueProcessorEnableGracefulSyncShutdown,
115✔
676
        }
115✔
677

115✔
678
        if isFailover {
115✔
679
                // disable queue split for failover processor
×
680
                options.EnableSplit = dynamicconfig.GetBoolPropertyFn(false)
×
681

×
682
                // disable persist and load processing queue states for failover processor as it will never be split
×
683
                options.EnablePersistQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
684
                options.EnableLoadQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
685

×
686
                options.MaxStartJitterInterval = config.TimerProcessorFailoverMaxStartJitterInterval
×
687
        } else {
115✔
688
                options.EnableSplit = config.QueueProcessorEnableSplit
115✔
689
                options.SplitMaxLevel = config.QueueProcessorSplitMaxLevel
115✔
690
                options.EnableRandomSplitByDomainID = config.QueueProcessorEnableRandomSplitByDomainID
115✔
691
                options.RandomSplitProbability = config.QueueProcessorRandomSplitProbability
115✔
692
                options.EnablePendingTaskSplitByDomainID = config.QueueProcessorEnablePendingTaskSplitByDomainID
115✔
693
                options.PendingTaskSplitThreshold = config.QueueProcessorPendingTaskSplitThreshold
115✔
694
                options.EnableStuckTaskSplitByDomainID = config.QueueProcessorEnableStuckTaskSplitByDomainID
115✔
695
                options.StuckTaskSplitThreshold = config.QueueProcessorStuckTaskSplitThreshold
115✔
696
                options.SplitLookAheadDurationByDomainID = config.QueueProcessorSplitLookAheadDurationByDomainID
115✔
697

115✔
698
                options.EnablePersistQueueStates = config.QueueProcessorEnablePersistQueueStates
115✔
699
                options.EnableLoadQueueStates = config.QueueProcessorEnableLoadQueueStates
115✔
700

115✔
701
                options.MaxStartJitterInterval = dynamicconfig.GetDurationPropertyFn(0)
115✔
702
        }
115✔
703

704
        if isActive {
182✔
705
                options.MetricScope = metrics.TimerActiveQueueProcessorScope
67✔
706
                options.RedispatchInterval = config.ActiveTaskRedispatchInterval
67✔
707
        } else {
118✔
708
                options.MetricScope = metrics.TimerStandbyQueueProcessorScope
51✔
709
                options.RedispatchInterval = config.StandbyTaskRedispatchInterval
51✔
710
        }
51✔
711

712
        return options
115✔
713
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc