• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018f1248-bf4e-4f7f-ada2-77eea52238e3

24 Apr 2024 10:45PM UTC coverage: 67.714% (-0.02%) from 67.734%
018f1248-bf4e-4f7f-ada2-77eea52238e3

push

buildkite

web-flow
Fix slice reuse in cassandra/domain.go (#5937)

While reading domains from Cassandra the isolationGroups and asncWFConfigData are not properly reset for each row. This can result in the same backing array being reused across multiple rows, resulting in an incorrect or corrupted value being read. Notably this is used for the domain cache, so any operation relying on it may be unable to get the correct values of these fields.

4 of 19 new or added lines in 2 files covered. (21.05%)

66 existing lines in 11 files now uncovered.

99255 of 146579 relevant lines covered (67.71%)

2400.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.82
/service/history/queue/timer_queue_processor_base.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "fmt"
26
        "math"
27
        "math/rand"
28
        "sync"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/uber/cadence/common"
33
        "github.com/uber/cadence/common/backoff"
34
        "github.com/uber/cadence/common/dynamicconfig"
35
        "github.com/uber/cadence/common/log"
36
        "github.com/uber/cadence/common/log/tag"
37
        "github.com/uber/cadence/common/metrics"
38
        "github.com/uber/cadence/common/persistence"
39
        "github.com/uber/cadence/service/history/config"
40
        "github.com/uber/cadence/service/history/shard"
41
        "github.com/uber/cadence/service/history/task"
42
)
43

44
var (
45
        maximumTimerTaskKey = newTimerTaskKey(
46
                time.Unix(0, math.MaxInt64),
47
                0,
48
        )
49
)
50

51
type (
52
        timerTaskKey struct {
53
                visibilityTimestamp time.Time
54
                taskID              int64
55
        }
56

57
        timeTaskReadProgress struct {
58
                currentQueue  ProcessingQueue
59
                readLevel     task.Key
60
                maxReadLevel  task.Key
61
                nextPageToken []byte
62
        }
63

64
        timerQueueProcessorBase struct {
65
                *processorBase
66

67
                taskInitializer task.Initializer
68
                clusterName     string
69
                pollTimeLock    sync.Mutex
70
                backoffTimer    map[int]*time.Timer
71
                nextPollTime    map[int]time.Time
72
                timerGate       TimerGate
73

74
                // timer notification
75
                newTimerCh  chan struct{}
76
                newTimeLock sync.Mutex
77
                newTime     time.Time
78

79
                processingQueueReadProgress map[int]timeTaskReadProgress
80

81
                updateAckLevelFn                 func() (bool, task.Key, error)
82
                splitProcessingQueueCollectionFn func(splitPolicy ProcessingQueueSplitPolicy, upsertPollTimeFn func(int, time.Time))
83
        }
84

85
        filteredTimerTasksResponse struct {
86
                timerTasks    []*persistence.TimerTaskInfo
87
                lookAheadTask *persistence.TimerTaskInfo
88
                nextPageToken []byte
89
        }
90
)
91

92
func newTimerQueueProcessorBase(
93
        clusterName string,
94
        shard shard.Context,
95
        processingQueueStates []ProcessingQueueState,
96
        taskProcessor task.Processor,
97
        timerGate TimerGate,
98
        options *queueProcessorOptions,
99
        updateMaxReadLevel updateMaxReadLevelFn,
100
        updateClusterAckLevel updateClusterAckLevelFn,
101
        updateProcessingQueueStates updateProcessingQueueStatesFn,
102
        queueShutdown queueShutdownFn,
103
        taskFilter task.Filter,
104
        taskExecutor task.Executor,
105
        logger log.Logger,
106
        metricsClient metrics.Client,
107
) *timerQueueProcessorBase {
139✔
108
        processorBase := newProcessorBase(
139✔
109
                shard,
139✔
110
                processingQueueStates,
139✔
111
                taskProcessor,
139✔
112
                options,
139✔
113
                updateMaxReadLevel,
139✔
114
                updateClusterAckLevel,
139✔
115
                updateProcessingQueueStates,
139✔
116
                queueShutdown,
139✔
117
                logger.WithTags(tag.ComponentTimerQueue),
139✔
118
                metricsClient,
139✔
119
        )
139✔
120

139✔
121
        queueType := task.QueueTypeActiveTimer
139✔
122
        if options.MetricScope == metrics.TimerStandbyQueueProcessorScope {
202✔
123
                queueType = task.QueueTypeStandbyTimer
63✔
124
        }
63✔
125

126
        t := &timerQueueProcessorBase{
139✔
127
                processorBase: processorBase,
139✔
128
                taskInitializer: func(taskInfo task.Info) task.Task {
2,415✔
129
                        return task.NewTimerTask(
2,276✔
130
                                shard,
2,276✔
131
                                taskInfo,
2,276✔
132
                                queueType,
2,276✔
133
                                task.InitializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
2,276✔
134
                                taskFilter,
2,276✔
135
                                taskExecutor,
2,276✔
136
                                taskProcessor,
2,276✔
137
                                processorBase.redispatcher.AddTask,
2,276✔
138
                                shard.GetConfig().TaskCriticalRetryCount,
2,276✔
139
                        )
2,276✔
140
                },
2,276✔
141
                clusterName:                 clusterName,
142
                backoffTimer:                make(map[int]*time.Timer),
143
                nextPollTime:                make(map[int]time.Time),
144
                timerGate:                   timerGate,
145
                newTimerCh:                  make(chan struct{}, 1),
146
                processingQueueReadProgress: make(map[int]timeTaskReadProgress),
147
        }
148

149
        t.updateAckLevelFn = t.updateAckLevel
139✔
150
        t.splitProcessingQueueCollectionFn = t.splitProcessingQueueCollection
139✔
151
        return t
139✔
152
}
153

154
func (t *timerQueueProcessorBase) Start() {
125✔
155
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
125✔
156
                return
×
157
        }
×
158

159
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarting)
125✔
160
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarted)
125✔
161

125✔
162
        t.redispatcher.Start()
125✔
163

125✔
164
        newPollTime := time.Time{}
125✔
165
        if startJitter := t.options.MaxStartJitterInterval(); startJitter > 0 {
125✔
166
                now := t.shard.GetTimeSource().Now()
×
167
                newPollTime = now.Add(time.Duration(rand.Int63n(int64(startJitter))))
×
168
        }
×
169
        for _, queueCollections := range t.processingQueueCollections {
250✔
170
                t.upsertPollTime(queueCollections.Level(), newPollTime)
125✔
171
        }
125✔
172

173
        t.shutdownWG.Add(1)
125✔
174
        go t.processorPump()
125✔
175
}
176

177
// Edge Case: Stop doesn't stop TimerGate if timerQueueProcessorBase is only initiliazed without starting
178
// As a result, TimerGate needs to be stopped separately
179
// One way to fix this is to make sure TimerGate doesn't start daemon loop on initilization and requires explicit Start
180
func (t *timerQueueProcessorBase) Stop() {
125✔
181
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
125✔
182
                return
×
183
        }
×
184

185
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopping)
125✔
186
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopped)
125✔
187

125✔
188
        t.timerGate.Close()
125✔
189
        close(t.shutdownCh)
125✔
190
        t.pollTimeLock.Lock()
125✔
191
        for _, timer := range t.backoffTimer {
125✔
192
                timer.Stop()
×
193
        }
×
194
        t.pollTimeLock.Unlock()
125✔
195

125✔
196
        if success := common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout); !success {
125✔
197
                t.logger.Warn("timerQueueProcessorBase timed out on shut down", tag.LifeCycleStopTimedout)
×
198
        }
×
199

200
        t.redispatcher.Stop()
125✔
201
}
202

203
func (t *timerQueueProcessorBase) processorPump() {
125✔
204
        defer t.shutdownWG.Done()
125✔
205
        updateAckTimer := time.NewTimer(backoff.JitDuration(t.options.UpdateAckInterval(), t.options.UpdateAckIntervalJitterCoefficient()))
125✔
206
        defer updateAckTimer.Stop()
125✔
207
        splitQueueTimer := time.NewTimer(backoff.JitDuration(t.options.SplitQueueInterval(), t.options.SplitQueueIntervalJitterCoefficient()))
125✔
208
        defer splitQueueTimer.Stop()
125✔
209

125✔
210
        for {
6,474✔
211
                select {
6,349✔
212
                case <-t.shutdownCh:
125✔
213
                        return
125✔
214
                case <-t.timerGate.FireChan():
2,396✔
215
                        t.updateTimerGates()
2,396✔
216
                case <-updateAckTimer.C:
437✔
217
                        if stopPump := t.handleAckLevelUpdate(updateAckTimer); stopPump {
437✔
218
                                if !t.options.EnableGracefulSyncShutdown() {
×
219
                                        go t.Stop()
×
220
                                        return
×
221
                                }
×
222

223
                                t.Stop()
×
224
                                return
×
225
                        }
226
                case <-t.newTimerCh:
3,012✔
227
                        t.handleNewTimer()
3,012✔
228
                case <-splitQueueTimer.C:
204✔
229
                        t.splitQueue(splitQueueTimer)
204✔
230
                case notification := <-t.actionNotifyCh:
184✔
231
                        t.handleActionNotification(notification)
184✔
232
                }
233
        }
234
}
235

236
func (t *timerQueueProcessorBase) processQueueCollections(levels map[int]struct{}) {
2,400✔
237
        for _, queueCollection := range t.processingQueueCollections {
4,800✔
238
                level := queueCollection.Level()
2,400✔
239
                if _, ok := levels[level]; !ok {
2,400✔
UNCOV
240
                        continue
×
241
                }
242

243
                activeQueue := queueCollection.ActiveQueue()
2,400✔
244
                if activeQueue == nil {
2,400✔
245
                        // process for this queue collection has finished
×
246
                        // it's possible that new queue will be added to this collection later though,
×
247
                        // pollTime will be updated after split/merge
×
248
                        t.logger.Debug("Active queue is nil for timer queue at this level", tag.QueueLevel(level))
×
249
                        continue
×
250
                }
251

252
                t.upsertPollTime(level, t.shard.GetCurrentTime(t.clusterName).Add(backoff.JitDuration(
2,400✔
253
                        t.options.MaxPollInterval(),
2,400✔
254
                        t.options.MaxPollIntervalJitterCoefficient(),
2,400✔
255
                )))
2,400✔
256

2,400✔
257
                var nextPageToken []byte
2,400✔
258
                readLevel := activeQueue.State().ReadLevel()
2,400✔
259
                maxReadLevel := minTaskKey(activeQueue.State().MaxLevel(), t.updateMaxReadLevel())
2,400✔
260
                domainFilter := activeQueue.State().DomainFilter()
2,400✔
261

2,400✔
262
                if progress, ok := t.processingQueueReadProgress[level]; ok {
2,402✔
263
                        if progress.currentQueue == activeQueue {
4✔
264
                                readLevel = progress.readLevel
2✔
265
                                maxReadLevel = progress.maxReadLevel
2✔
266
                                nextPageToken = progress.nextPageToken
2✔
267
                        }
2✔
268
                        delete(t.processingQueueReadProgress, level)
2✔
269
                }
270

271
                if !readLevel.Less(maxReadLevel) {
2,466✔
272
                        // notify timer gate about the min time
66✔
273
                        t.upsertPollTime(level, readLevel.(timerTaskKey).visibilityTimestamp)
66✔
274
                        t.logger.Debug("Skipping processing timer queue at this level because readLevel >= maxReadLevel", tag.QueueLevel(level))
66✔
275
                        continue
66✔
276
                }
277

278
                ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay)
2,337✔
279
                if err := t.rateLimiter.Wait(ctx); err != nil {
2,337✔
280
                        cancel()
×
281
                        if level == defaultProcessingQueueLevel {
×
282
                                t.upsertPollTime(level, time.Time{})
×
283
                        } else {
×
284
                                t.setupBackoffTimer(level)
×
285
                        }
×
286
                        continue
×
287
                }
288
                cancel()
2,337✔
289

2,337✔
290
                resp, err := t.readAndFilterTasks(readLevel, maxReadLevel, nextPageToken)
2,337✔
291
                if err != nil {
2,337✔
292
                        t.logger.Error("Processor unable to retrieve tasks", tag.Error(err))
×
293
                        t.upsertPollTime(level, time.Time{}) // re-enqueue the event
×
294
                        continue
×
295
                }
296

297
                tasks := make(map[task.Key]task.Task)
2,337✔
298
                taskChFull := false
2,337✔
299
                submittedCount := 0
2,337✔
300
                for _, taskInfo := range resp.timerTasks {
4,615✔
301
                        if !domainFilter.Filter(taskInfo.GetDomainID()) {
2,280✔
302
                                continue
2✔
303
                        }
304

305
                        task := t.taskInitializer(taskInfo)
2,276✔
306
                        tasks[newTimerTaskKey(taskInfo.GetVisibilityTimestamp(), taskInfo.GetTaskID())] = task
2,276✔
307
                        submitted, err := t.submitTask(task)
2,276✔
308
                        if err != nil {
2,276✔
309
                                // only err here is due to the fact that processor has been shutdown
×
310
                                // return instead of continue
×
311
                                return
×
312
                        }
×
313
                        taskChFull = taskChFull || !submitted
2,276✔
314
                        if submitted {
4,552✔
315
                                submittedCount++
2,276✔
316
                        }
2,276✔
317
                }
318
                t.logger.Debugf("Submitted %d timer tasks successfully out of %d tasks", submittedCount, len(resp.timerTasks))
2,337✔
319

2,337✔
320
                var newReadLevel task.Key
2,337✔
321
                if len(resp.nextPageToken) == 0 {
4,673✔
322
                        newReadLevel = maxReadLevel
2,336✔
323
                        if resp.lookAheadTask != nil {
4,611✔
324
                                // lookAheadTask may exist only when nextPageToken is empty
2,275✔
325
                                // notice that lookAheadTask.VisibilityTimestamp may be larger than shard max read level,
2,275✔
326
                                // which means new tasks can be generated before that timestamp. This issue is solved by
2,275✔
327
                                // upsertPollTime whenever there are new tasks
2,275✔
328
                                lookAheadTimestamp := resp.lookAheadTask.GetVisibilityTimestamp()
2,275✔
329
                                t.upsertPollTime(level, lookAheadTimestamp)
2,275✔
330
                                newReadLevel = minTaskKey(newReadLevel, newTimerTaskKey(lookAheadTimestamp, 0))
2,275✔
331
                                t.logger.Debugf("nextPageToken is empty for timer queue at level %d so setting newReadLevel to max(lookAheadTask.timestamp: %v, maxReadLevel: %v)", level, lookAheadTimestamp, maxReadLevel)
2,275✔
332
                        } else {
2,339✔
333
                                // else we have no idea when the next poll should happen
64✔
334
                                // rely on notifyNewTask to trigger the next poll even for non-default queue.
64✔
335
                                // another option for non-default queue is that we can setup a backoff timer to check back later
64✔
336
                                t.logger.Debugf("nextPageToken is empty for timer queue at level %d and there' no lookAheadTask. setting readLevel to maxReadLevel: %v", level, maxReadLevel)
64✔
337
                        }
64✔
338
                } else {
1✔
339
                        // more tasks should be loaded for this processing queue
1✔
340
                        // record the current progress and update the poll time
1✔
341
                        if level == defaultProcessingQueueLevel || !taskChFull {
2✔
342
                                t.logger.Debugf("upserting poll time for timer queue at level %d because nextPageToken is not empty and !taskChFull", level)
1✔
343
                                t.upsertPollTime(level, time.Time{})
1✔
344
                        } else {
1✔
345
                                t.logger.Debugf("setting up backoff timer for timer queue at level %d because nextPageToken is not empty and taskChFull", level)
×
346
                                t.setupBackoffTimer(level)
×
347
                        }
×
348
                        t.processingQueueReadProgress[level] = timeTaskReadProgress{
1✔
349
                                currentQueue:  activeQueue,
1✔
350
                                readLevel:     readLevel,
1✔
351
                                maxReadLevel:  maxReadLevel,
1✔
352
                                nextPageToken: resp.nextPageToken,
1✔
353
                        }
1✔
354
                        if len(resp.timerTasks) > 0 {
2✔
355
                                newReadLevel = newTimerTaskKey(resp.timerTasks[len(resp.timerTasks)-1].GetVisibilityTimestamp(), 0)
1✔
356
                        }
1✔
357
                }
358
                queueCollection.AddTasks(tasks, newReadLevel)
2,337✔
359
        }
360
}
361

362
// splitQueue splits the processing queue collection based on some policy
363
// and resets the timer with jitter for next run
364
func (t *timerQueueProcessorBase) splitQueue(splitQueueTimer *time.Timer) {
204✔
365
        splitPolicy := t.initializeSplitPolicy(
204✔
366
                func(key task.Key, domainID string) task.Key {
204✔
367
                        return newTimerTaskKey(
×
368
                                key.(timerTaskKey).visibilityTimestamp.Add(
×
369
                                        t.options.SplitLookAheadDurationByDomainID(domainID),
×
370
                                ),
×
371
                                0,
×
372
                        )
×
373
                },
×
374
        )
375

376
        t.splitProcessingQueueCollectionFn(splitPolicy, t.upsertPollTime)
204✔
377

204✔
378
        splitQueueTimer.Reset(backoff.JitDuration(
204✔
379
                t.options.SplitQueueInterval(),
204✔
380
                t.options.SplitQueueIntervalJitterCoefficient(),
204✔
381
        ))
204✔
382
}
383

384
// handleAckLevelUpdate updates ack level and resets timer with jitter.
385
// returns true if processing should be terminated
386
func (t *timerQueueProcessorBase) handleAckLevelUpdate(updateAckTimer *time.Timer) bool {
437✔
387
        processFinished, _, err := t.updateAckLevelFn()
437✔
388
        if err == shard.ErrShardClosed || (err == nil && processFinished) {
437✔
389
                return true
×
390
        }
×
391
        updateAckTimer.Reset(backoff.JitDuration(
437✔
392
                t.options.UpdateAckInterval(),
437✔
393
                t.options.UpdateAckIntervalJitterCoefficient(),
437✔
394
        ))
437✔
395
        return false
437✔
396
}
397

398
func (t *timerQueueProcessorBase) updateTimerGates() {
2,396✔
399
        maxRedispatchQueueSize := t.options.MaxRedispatchQueueSize()
2,396✔
400
        if t.redispatcher.Size() > maxRedispatchQueueSize {
2,396✔
401
                t.redispatcher.Redispatch(maxRedispatchQueueSize)
×
402
                if t.redispatcher.Size() > maxRedispatchQueueSize {
×
403
                        // if redispatcher still has a large number of tasks
×
404
                        // this only happens when system is under very high load
×
405
                        // we should backoff here instead of keeping submitting tasks to task processor
×
406
                        // don't call t.timerGate.Update(time.Now() + loadQueueTaskThrottleRetryDelay) as the time in
×
407
                        // standby timer processor is not real time and is managed separately
×
408
                        time.Sleep(backoff.JitDuration(
×
409
                                t.options.PollBackoffInterval(),
×
410
                                t.options.PollBackoffIntervalJitterCoefficient(),
×
411
                        ))
×
412
                }
×
413
                t.timerGate.Update(time.Time{})
×
414
                return
×
415
        }
416

417
        t.pollTimeLock.Lock()
2,396✔
418
        levels := make(map[int]struct{})
2,396✔
419
        now := t.shard.GetCurrentTime(t.clusterName)
2,396✔
420
        for level, pollTime := range t.nextPollTime {
4,792✔
421
                if !now.Before(pollTime) {
4,792✔
422
                        levels[level] = struct{}{}
2,396✔
423
                        delete(t.nextPollTime, level)
2,396✔
424
                } else {
2,396✔
UNCOV
425
                        t.timerGate.Update(pollTime)
×
UNCOV
426
                }
×
427
        }
428
        t.pollTimeLock.Unlock()
2,396✔
429

2,396✔
430
        t.processQueueCollections(levels)
2,396✔
431
}
432

433
func (t *timerQueueProcessorBase) handleNewTimer() {
3,012✔
434
        t.newTimeLock.Lock()
3,012✔
435
        newTime := t.newTime
3,012✔
436
        t.newTime = time.Time{}
3,012✔
437
        t.newTimeLock.Unlock()
3,012✔
438

3,012✔
439
        // New Timer has arrived.
3,012✔
440
        t.metricsScope.IncCounter(metrics.NewTimerNotifyCounter)
3,012✔
441
        // notify all queue collections as they are waiting for the notification when there's
3,012✔
442
        // no more task to process. For non-default queue, we choose to do periodic polling
3,012✔
443
        // in the future, then we don't need to notify them.
3,012✔
444
        for _, queueCollection := range t.processingQueueCollections {
6,024✔
445
                t.upsertPollTime(queueCollection.Level(), newTime)
3,012✔
446
        }
3,012✔
447
}
448

449
func (t *timerQueueProcessorBase) handleActionNotification(notification actionNotification) {
184✔
450
        t.processorBase.handleActionNotification(notification, func() {
368✔
451
                switch notification.action.ActionType {
184✔
452
                case ActionTypeReset:
×
453
                        t.upsertPollTime(defaultProcessingQueueLevel, time.Time{})
×
454
                }
455
        })
456
}
457

458
func (t *timerQueueProcessorBase) readAndFilterTasks(readLevel, maxReadLevel task.Key, nextPageToken []byte) (*filteredTimerTasksResponse, error) {
2,342✔
459
        resp, err := t.getTimerTasks(readLevel, maxReadLevel, nextPageToken, t.options.BatchSize())
2,342✔
460
        if err != nil {
2,342✔
461
                return nil, err
×
462
        }
×
463

464
        var lookAheadTask *persistence.TimerTaskInfo
2,342✔
465
        filteredTasks := []*persistence.TimerTaskInfo{}
2,342✔
466
        for _, timerTask := range resp.Timers {
6,173✔
467
                if !t.isProcessNow(timerTask.GetVisibilityTimestamp()) {
5,381✔
468
                        // found the first task that is not ready to be processed yet.
1,550✔
469
                        // reset NextPageToken so we can load more tasks starting from this lookAheadTask next time.
1,550✔
470
                        lookAheadTask = timerTask
1,550✔
471
                        resp.NextPageToken = nil
1,550✔
472
                        break
1,550✔
473
                }
474
                filteredTasks = append(filteredTasks, timerTask)
2,284✔
475
        }
476

477
        if len(resp.NextPageToken) == 0 && lookAheadTask == nil {
3,135✔
478
                // only look ahead within the processing queue boundary
793✔
479
                lookAheadTask, err = t.readLookAheadTask(maxReadLevel, maximumTimerTaskKey)
793✔
480
                if err != nil {
794✔
481
                        // we don't know if look ahead task exists or not, but we know if it exists,
1✔
482
                        // it's visibility timestamp is larger than or equal to maxReadLevel.
1✔
483
                        // so, create a fake look ahead task so another load can be triggered at that time.
1✔
484
                        lookAheadTask = &persistence.TimerTaskInfo{
1✔
485
                                VisibilityTimestamp: maxReadLevel.(timerTaskKey).visibilityTimestamp,
1✔
486
                        }
1✔
487
                }
1✔
488
        }
489

490
        t.logger.Debugf("readAndFilterTasks returning %d tasks and lookAheadTask: %#v for readLevel: %#v, maxReadLevel: %#v", len(filteredTasks), lookAheadTask, readLevel, maxReadLevel)
2,342✔
491
        return &filteredTimerTasksResponse{
2,342✔
492
                timerTasks:    filteredTasks,
2,342✔
493
                lookAheadTask: lookAheadTask,
2,342✔
494
                nextPageToken: resp.NextPageToken,
2,342✔
495
        }, nil
2,342✔
496
}
497

498
func (t *timerQueueProcessorBase) readLookAheadTask(lookAheadStartLevel task.Key, lookAheadMaxLevel task.Key) (*persistence.TimerTaskInfo, error) {
794✔
499
        resp, err := t.getTimerTasks(lookAheadStartLevel, lookAheadMaxLevel, nil, 1)
794✔
500
        if err != nil {
795✔
501
                return nil, err
1✔
502
        }
1✔
503

504
        if len(resp.Timers) == 1 {
1,524✔
505
                return resp.Timers[0], nil
731✔
506
        }
731✔
507
        return nil, nil
65✔
508
}
509

510
func (t *timerQueueProcessorBase) getTimerTasks(readLevel, maxReadLevel task.Key, nextPageToken []byte, batchSize int) (*persistence.GetTimerIndexTasksResponse, error) {
3,135✔
511
        request := &persistence.GetTimerIndexTasksRequest{
3,135✔
512
                MinTimestamp:  readLevel.(timerTaskKey).visibilityTimestamp,
3,135✔
513
                MaxTimestamp:  maxReadLevel.(timerTaskKey).visibilityTimestamp,
3,135✔
514
                BatchSize:     batchSize,
3,135✔
515
                NextPageToken: nextPageToken,
3,135✔
516
        }
3,135✔
517

3,135✔
518
        var err error
3,135✔
519
        var response *persistence.GetTimerIndexTasksResponse
3,135✔
520
        retryCount := t.shard.GetConfig().TimerProcessorGetFailureRetryCount()
3,135✔
521
        for attempt := 0; attempt < retryCount; attempt++ {
6,274✔
522
                response, err = t.shard.GetExecutionManager().GetTimerIndexTasks(context.Background(), request)
3,139✔
523
                if err == nil {
6,273✔
524
                        return response, nil
3,134✔
525
                }
3,134✔
526
                backoff := time.Duration(attempt*100) * time.Millisecond
5✔
527
                t.logger.Debugf("Failed to get timer tasks from execution manager. error: %v, attempt: %d, retryCount: %d, backoff: %v", err, attempt, retryCount, backoff)
5✔
528
                time.Sleep(backoff)
5✔
529
        }
530
        return nil, err
1✔
531
}
532

533
func (t *timerQueueProcessorBase) isProcessNow(expiryTime time.Time) bool {
3,835✔
534
        if expiryTime.IsZero() {
3,836✔
535
                // return true, but somewhere probably have bug creating empty timerTask.
1✔
536
                t.logger.Warn("Timer task has timestamp zero")
1✔
537
        }
1✔
538
        return !t.shard.GetCurrentTime(t.clusterName).Before(expiryTime)
3,835✔
539
}
540

541
func (t *timerQueueProcessorBase) notifyNewTimers(timerTasks []persistence.Task) {
3,039✔
542
        if len(timerTasks) == 0 {
3,039✔
543
                return
×
544
        }
×
545

546
        isActive := t.options.MetricScope == metrics.TimerActiveQueueProcessorScope
3,039✔
547
        minNewTime := timerTasks[0].GetVisibilityTimestamp()
3,039✔
548
        shardIDTag := metrics.ShardIDTag(t.shard.GetShardID())
3,039✔
549
        for _, timerTask := range timerTasks {
6,172✔
550
                ts := timerTask.GetVisibilityTimestamp()
3,133✔
551
                if ts.Before(minNewTime) {
3,209✔
552
                        minNewTime = ts
76✔
553
                }
76✔
554

555
                taskScopeIdx := task.GetTimerTaskMetricScope(
3,133✔
556
                        timerTask.GetType(),
3,133✔
557
                        isActive,
3,133✔
558
                )
3,133✔
559
                t.metricsClient.Scope(taskScopeIdx).Tagged(shardIDTag).IncCounter(metrics.NewTimerNotifyCounter)
3,133✔
560
        }
561

562
        t.notifyNewTimer(minNewTime)
3,039✔
563
}
564

565
func (t *timerQueueProcessorBase) notifyNewTimer(newTime time.Time) {
3,039✔
566
        t.newTimeLock.Lock()
3,039✔
567
        defer t.newTimeLock.Unlock()
3,039✔
568

3,039✔
569
        if t.newTime.IsZero() || newTime.Before(t.newTime) {
6,053✔
570
                t.logger.Debugf("Updating newTime from %v to %v", t.newTime, newTime)
3,014✔
571
                t.newTime = newTime
3,014✔
572
                select {
3,014✔
573
                case t.newTimerCh <- struct{}{}:
3,013✔
574
                        // Notified about new time.
575
                default:
1✔
576
                        // Channel "full" -> drop and move on, this will happen only if service is in high load.
577
                }
578
        }
579
}
580

581
func (t *timerQueueProcessorBase) upsertPollTime(level int, newPollTime time.Time) {
7,867✔
582
        t.pollTimeLock.Lock()
7,867✔
583
        defer t.pollTimeLock.Unlock()
7,867✔
584

7,867✔
585
        if _, ok := t.backoffTimer[level]; ok {
7,867✔
586
                // honor existing backoff timer
×
587
                t.logger.Debugf("Skipping upsertPollTime for timer queue at level %d because there's a backoff timer", level)
×
588
                return
×
589
        }
×
590

591
        currentPollTime, ok := t.nextPollTime[level]
7,867✔
592
        if !ok || newPollTime.Before(currentPollTime) {
13,089✔
593
                t.logger.Debugf("Updating poll timer for timer queue at level %d. CurrentPollTime: %v, newPollTime: %v", level, currentPollTime, newPollTime)
5,222✔
594
                t.nextPollTime[level] = newPollTime
5,222✔
595
                t.timerGate.Update(newPollTime)
5,222✔
596
                return
5,222✔
597
        }
5,222✔
598

599
        t.logger.Debugf("Skipping upsertPollTime for level %d because currentPollTime %v is before newPollTime %v", level, currentPollTime, newPollTime)
2,648✔
600
}
601

602
// setupBackoffTimer will trigger a poll for the specified processing queue collection
603
// after a certain period of (real) time. This means for standby timer, even if the cluster time
604
// has not been updated, the poll will still be triggered when the timer fired. Use this function
605
// for delaying the load for processing queue. If a poll should be triggered immediately
606
// use upsertPollTime.
607
func (t *timerQueueProcessorBase) setupBackoffTimer(level int) {
×
608
        t.pollTimeLock.Lock()
×
609
        defer t.pollTimeLock.Unlock()
×
610

×
611
        if _, ok := t.backoffTimer[level]; ok {
×
612
                // honor existing backoff timer
×
613
                return
×
614
        }
×
615

616
        t.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
×
617
        t.logger.Info("Throttled processing queue", tag.QueueLevel(level))
×
618
        backoffDuration := backoff.JitDuration(
×
619
                t.options.PollBackoffInterval(),
×
620
                t.options.PollBackoffIntervalJitterCoefficient(),
×
621
        )
×
622
        t.backoffTimer[level] = time.AfterFunc(backoffDuration, func() {
×
623
                select {
×
624
                case <-t.shutdownCh:
×
625
                        return
×
626
                default:
×
627
                }
628

629
                t.pollTimeLock.Lock()
×
630
                defer t.pollTimeLock.Unlock()
×
631

×
632
                t.nextPollTime[level] = time.Time{}
×
633
                t.timerGate.Update(time.Time{})
×
634
                delete(t.backoffTimer, level)
×
635
        })
636
}
637

638
func newTimerTaskKey(visibilityTimestamp time.Time, taskID int64) task.Key {
7,679✔
639
        return timerTaskKey{
7,679✔
640
                visibilityTimestamp: visibilityTimestamp,
7,679✔
641
                taskID:              taskID,
7,679✔
642
        }
7,679✔
643
}
7,679✔
644

645
func (k timerTaskKey) Less(
646
        key task.Key,
647
) bool {
28,565✔
648
        timerKey := key.(timerTaskKey)
28,565✔
649
        if k.visibilityTimestamp.Equal(timerKey.visibilityTimestamp) {
29,246✔
650
                return k.taskID < timerKey.taskID
681✔
651
        }
681✔
652
        return k.visibilityTimestamp.Before(timerKey.visibilityTimestamp)
27,887✔
653
}
654

655
func (k timerTaskKey) String() string {
2,336✔
656
        return fmt.Sprintf("{visibilityTimestamp: %v, taskID: %v}", k.visibilityTimestamp, k.taskID)
2,336✔
657
}
2,336✔
658

659
func newTimerQueueProcessorOptions(
660
        config *config.Config,
661
        isActive bool,
662
        isFailover bool,
663
) *queueProcessorOptions {
139✔
664
        options := &queueProcessorOptions{
139✔
665
                BatchSize:                            config.TimerTaskBatchSize,
139✔
666
                DeleteBatchSize:                      config.TimerTaskDeleteBatchSize,
139✔
667
                MaxPollRPS:                           config.TimerProcessorMaxPollRPS,
139✔
668
                MaxPollInterval:                      config.TimerProcessorMaxPollInterval,
139✔
669
                MaxPollIntervalJitterCoefficient:     config.TimerProcessorMaxPollIntervalJitterCoefficient,
139✔
670
                UpdateAckInterval:                    config.TimerProcessorUpdateAckInterval,
139✔
671
                UpdateAckIntervalJitterCoefficient:   config.TimerProcessorUpdateAckIntervalJitterCoefficient,
139✔
672
                RedispatchIntervalJitterCoefficient:  config.TaskRedispatchIntervalJitterCoefficient,
139✔
673
                MaxRedispatchQueueSize:               config.TimerProcessorMaxRedispatchQueueSize,
139✔
674
                SplitQueueInterval:                   config.TimerProcessorSplitQueueInterval,
139✔
675
                SplitQueueIntervalJitterCoefficient:  config.TimerProcessorSplitQueueIntervalJitterCoefficient,
139✔
676
                PollBackoffInterval:                  config.QueueProcessorPollBackoffInterval,
139✔
677
                PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
139✔
678
                EnableGracefulSyncShutdown:           config.QueueProcessorEnableGracefulSyncShutdown,
139✔
679
        }
139✔
680

139✔
681
        if isFailover {
139✔
682
                // disable queue split for failover processor
×
683
                options.EnableSplit = dynamicconfig.GetBoolPropertyFn(false)
×
684

×
685
                // disable persist and load processing queue states for failover processor as it will never be split
×
686
                options.EnablePersistQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
687
                options.EnableLoadQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
688

×
689
                options.MaxStartJitterInterval = config.TimerProcessorFailoverMaxStartJitterInterval
×
690
        } else {
139✔
691
                options.EnableSplit = config.QueueProcessorEnableSplit
139✔
692
                options.SplitMaxLevel = config.QueueProcessorSplitMaxLevel
139✔
693
                options.EnableRandomSplitByDomainID = config.QueueProcessorEnableRandomSplitByDomainID
139✔
694
                options.RandomSplitProbability = config.QueueProcessorRandomSplitProbability
139✔
695
                options.EnablePendingTaskSplitByDomainID = config.QueueProcessorEnablePendingTaskSplitByDomainID
139✔
696
                options.PendingTaskSplitThreshold = config.QueueProcessorPendingTaskSplitThreshold
139✔
697
                options.EnableStuckTaskSplitByDomainID = config.QueueProcessorEnableStuckTaskSplitByDomainID
139✔
698
                options.StuckTaskSplitThreshold = config.QueueProcessorStuckTaskSplitThreshold
139✔
699
                options.SplitLookAheadDurationByDomainID = config.QueueProcessorSplitLookAheadDurationByDomainID
139✔
700

139✔
701
                options.EnablePersistQueueStates = config.QueueProcessorEnablePersistQueueStates
139✔
702
                options.EnableLoadQueueStates = config.QueueProcessorEnableLoadQueueStates
139✔
703

139✔
704
                options.MaxStartJitterInterval = dynamicconfig.GetDurationPropertyFn(0)
139✔
705
        }
139✔
706

707
        if isActive {
218✔
708
                options.MetricScope = metrics.TimerActiveQueueProcessorScope
79✔
709
                options.RedispatchInterval = config.ActiveTaskRedispatchInterval
79✔
710
        } else {
142✔
711
                options.MetricScope = metrics.TimerStandbyQueueProcessorScope
63✔
712
                options.RedispatchInterval = config.StandbyTaskRedispatchInterval
63✔
713
        }
63✔
714

715
        return options
139✔
716
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc