• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01907581-5620-4db8-a288-46edcd2f7db0

02 Jul 2024 10:12PM UTC coverage: 71.509%. Remained the same
01907581-5620-4db8-a288-46edcd2f7db0

push

buildkite

web-flow
Stop the ratelimiter collections when stopping the service (#6155)

Rather obviously missed in retrospect, oops.

These can be stopped basically anywhere without causing issues, but after the handlers are stopped there should be no more in-bound requests worth counting (the "stopping" check will stop anything from actually "running").
So around here seems like the most-reasonable place to stop sharing load info with others.

9 of 17 new or added lines in 1 file covered. (52.94%)

34 existing lines in 11 files now uncovered.

105310 of 147269 relevant lines covered (71.51%)

2654.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.66
/service/history/queue/timer_queue_processor_base.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "math"
28
        "math/rand"
29
        "sync"
30
        "sync/atomic"
31
        "time"
32

33
        "github.com/uber/cadence/common"
34
        "github.com/uber/cadence/common/backoff"
35
        "github.com/uber/cadence/common/dynamicconfig"
36
        "github.com/uber/cadence/common/log"
37
        "github.com/uber/cadence/common/log/tag"
38
        "github.com/uber/cadence/common/metrics"
39
        "github.com/uber/cadence/common/persistence"
40
        "github.com/uber/cadence/service/history/config"
41
        "github.com/uber/cadence/service/history/shard"
42
        "github.com/uber/cadence/service/history/task"
43
)
44

45
var (
46
        maximumTimerTaskKey = newTimerTaskKey(
47
                time.Unix(0, math.MaxInt64),
48
                0,
49
        )
50
)
51

52
type (
53
        timerTaskKey struct {
54
                visibilityTimestamp time.Time
55
                taskID              int64
56
        }
57

58
        timeTaskReadProgress struct {
59
                currentQueue  ProcessingQueue
60
                readLevel     task.Key
61
                maxReadLevel  task.Key
62
                nextPageToken []byte
63
        }
64

65
        timerQueueProcessorBase struct {
66
                *processorBase
67

68
                taskInitializer task.Initializer
69
                clusterName     string
70
                pollTimeLock    sync.Mutex
71
                backoffTimer    map[int]*time.Timer
72
                nextPollTime    map[int]time.Time
73
                timerGate       TimerGate
74

75
                // timer notification
76
                newTimerCh  chan struct{}
77
                newTimeLock sync.Mutex
78
                newTime     time.Time
79

80
                processingQueueReadProgress map[int]timeTaskReadProgress
81

82
                updateAckLevelFn                 func() (bool, task.Key, error)
83
                splitProcessingQueueCollectionFn func(splitPolicy ProcessingQueueSplitPolicy, upsertPollTimeFn func(int, time.Time))
84
        }
85

86
        filteredTimerTasksResponse struct {
87
                timerTasks    []*persistence.TimerTaskInfo
88
                lookAheadTask *persistence.TimerTaskInfo
89
                nextPageToken []byte
90
        }
91
)
92

93
func newTimerQueueProcessorBase(
94
        clusterName string,
95
        shard shard.Context,
96
        processingQueueStates []ProcessingQueueState,
97
        taskProcessor task.Processor,
98
        timerGate TimerGate,
99
        options *queueProcessorOptions,
100
        updateMaxReadLevel updateMaxReadLevelFn,
101
        updateClusterAckLevel updateClusterAckLevelFn,
102
        updateProcessingQueueStates updateProcessingQueueStatesFn,
103
        queueShutdown queueShutdownFn,
104
        taskFilter task.Filter,
105
        taskExecutor task.Executor,
106
        logger log.Logger,
107
        metricsClient metrics.Client,
108
) *timerQueueProcessorBase {
165✔
109
        processorBase := newProcessorBase(
165✔
110
                shard,
165✔
111
                processingQueueStates,
165✔
112
                taskProcessor,
165✔
113
                options,
165✔
114
                updateMaxReadLevel,
165✔
115
                updateClusterAckLevel,
165✔
116
                updateProcessingQueueStates,
165✔
117
                queueShutdown,
165✔
118
                logger.WithTags(tag.ComponentTimerQueue),
165✔
119
                metricsClient,
165✔
120
        )
165✔
121

165✔
122
        queueType := task.QueueTypeActiveTimer
165✔
123
        if options.MetricScope == metrics.TimerStandbyQueueProcessorScope {
241✔
124
                queueType = task.QueueTypeStandbyTimer
76✔
125
        }
76✔
126

127
        t := &timerQueueProcessorBase{
165✔
128
                processorBase: processorBase,
165✔
129
                taskInitializer: func(taskInfo task.Info) task.Task {
2,511✔
130
                        return task.NewTimerTask(
2,346✔
131
                                shard,
2,346✔
132
                                taskInfo,
2,346✔
133
                                queueType,
2,346✔
134
                                task.InitializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
2,346✔
135
                                taskFilter,
2,346✔
136
                                taskExecutor,
2,346✔
137
                                taskProcessor,
2,346✔
138
                                processorBase.redispatcher.AddTask,
2,346✔
139
                                shard.GetConfig().TaskCriticalRetryCount,
2,346✔
140
                        )
2,346✔
141
                },
2,346✔
142
                clusterName:                 clusterName,
143
                backoffTimer:                make(map[int]*time.Timer),
144
                nextPollTime:                make(map[int]time.Time),
145
                timerGate:                   timerGate,
146
                newTimerCh:                  make(chan struct{}, 1),
147
                processingQueueReadProgress: make(map[int]timeTaskReadProgress),
148
        }
149

150
        t.updateAckLevelFn = t.updateAckLevel
165✔
151
        t.splitProcessingQueueCollectionFn = t.splitProcessingQueueCollection
165✔
152
        return t
165✔
153
}
154

155
func (t *timerQueueProcessorBase) Start() {
149✔
156
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
149✔
157
                return
×
158
        }
×
159

160
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarting)
149✔
161
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarted)
149✔
162

149✔
163
        t.redispatcher.Start()
149✔
164

149✔
165
        newPollTime := time.Time{}
149✔
166
        if startJitter := t.options.MaxStartJitterInterval(); startJitter > 0 {
149✔
167
                now := t.shard.GetTimeSource().Now()
×
168
                newPollTime = now.Add(time.Duration(rand.Int63n(int64(startJitter))))
×
169
        }
×
170
        for _, queueCollections := range t.processingQueueCollections {
298✔
171
                t.upsertPollTime(queueCollections.Level(), newPollTime)
149✔
172
        }
149✔
173

174
        t.shutdownWG.Add(1)
149✔
175
        go t.processorPump()
149✔
176
}
177

178
// Edge Case: Stop doesn't stop TimerGate if timerQueueProcessorBase is only initiliazed without starting
179
// As a result, TimerGate needs to be stopped separately
180
// One way to fix this is to make sure TimerGate doesn't start daemon loop on initilization and requires explicit Start
181
func (t *timerQueueProcessorBase) Stop() {
149✔
182
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
149✔
183
                return
×
184
        }
×
185

186
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopping)
149✔
187
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopped)
149✔
188

149✔
189
        t.timerGate.Close()
149✔
190
        close(t.shutdownCh)
149✔
191
        t.pollTimeLock.Lock()
149✔
192
        for _, timer := range t.backoffTimer {
149✔
193
                timer.Stop()
×
194
        }
×
195
        t.pollTimeLock.Unlock()
149✔
196

149✔
197
        if success := common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout); !success {
149✔
198
                t.logger.Warn("timerQueueProcessorBase timed out on shut down", tag.LifeCycleStopTimedout)
×
199
        }
×
200

201
        t.redispatcher.Stop()
149✔
202
}
203

204
func (t *timerQueueProcessorBase) processorPump() {
149✔
205
        defer t.shutdownWG.Done()
149✔
206
        updateAckTimer := time.NewTimer(backoff.JitDuration(t.options.UpdateAckInterval(), t.options.UpdateAckIntervalJitterCoefficient()))
149✔
207
        defer updateAckTimer.Stop()
149✔
208
        splitQueueTimer := time.NewTimer(backoff.JitDuration(t.options.SplitQueueInterval(), t.options.SplitQueueIntervalJitterCoefficient()))
149✔
209
        defer splitQueueTimer.Stop()
149✔
210

149✔
211
        for {
6,735✔
212
                select {
6,586✔
213
                case <-t.shutdownCh:
149✔
214
                        return
149✔
215
                case <-t.timerGate.FireChan():
2,469✔
216
                        t.updateTimerGates()
2,469✔
217
                case <-updateAckTimer.C:
461✔
218
                        if stopPump := t.handleAckLevelUpdate(updateAckTimer); stopPump {
461✔
219
                                if !t.options.EnableGracefulSyncShutdown() {
×
220
                                        go t.Stop()
×
221
                                        return
×
222
                                }
×
223

224
                                t.Stop()
×
225
                                return
×
226
                        }
227
                case <-t.newTimerCh:
3,092✔
228
                        t.handleNewTimer()
3,092✔
229
                case <-splitQueueTimer.C:
217✔
230
                        t.splitQueue(splitQueueTimer)
217✔
231
                case notification := <-t.actionNotifyCh:
209✔
232
                        t.handleActionNotification(notification)
209✔
233
                }
234
        }
235
}
236

237
func (t *timerQueueProcessorBase) processQueueCollections(levels map[int]struct{}) {
2,473✔
238
        for _, queueCollection := range t.processingQueueCollections {
4,946✔
239
                level := queueCollection.Level()
2,473✔
240
                if _, ok := levels[level]; !ok {
2,473✔
241
                        continue
×
242
                }
243

244
                activeQueue := queueCollection.ActiveQueue()
2,473✔
245
                if activeQueue == nil {
2,473✔
246
                        // process for this queue collection has finished
×
247
                        // it's possible that new queue will be added to this collection later though,
×
248
                        // pollTime will be updated after split/merge
×
249
                        t.logger.Debug("Active queue is nil for timer queue at this level", tag.QueueLevel(level))
×
250
                        continue
×
251
                }
252

253
                t.upsertPollTime(level, t.shard.GetCurrentTime(t.clusterName).Add(backoff.JitDuration(
2,473✔
254
                        t.options.MaxPollInterval(),
2,473✔
255
                        t.options.MaxPollIntervalJitterCoefficient(),
2,473✔
256
                )))
2,473✔
257

2,473✔
258
                var nextPageToken []byte
2,473✔
259
                readLevel := activeQueue.State().ReadLevel()
2,473✔
260
                maxReadLevel := minTaskKey(activeQueue.State().MaxLevel(), t.updateMaxReadLevel())
2,473✔
261
                domainFilter := activeQueue.State().DomainFilter()
2,473✔
262

2,473✔
263
                if progress, ok := t.processingQueueReadProgress[level]; ok {
2,475✔
264
                        if progress.currentQueue == activeQueue {
4✔
265
                                readLevel = progress.readLevel
2✔
266
                                maxReadLevel = progress.maxReadLevel
2✔
267
                                nextPageToken = progress.nextPageToken
2✔
268
                        }
2✔
269
                        delete(t.processingQueueReadProgress, level)
2✔
270
                }
271

272
                if !readLevel.Less(maxReadLevel) {
2,551✔
273
                        // notify timer gate about the min time
78✔
274
                        t.upsertPollTime(level, readLevel.(timerTaskKey).visibilityTimestamp)
78✔
275
                        t.logger.Debug("Skipping processing timer queue at this level because readLevel >= maxReadLevel", tag.QueueLevel(level))
78✔
276
                        continue
78✔
277
                }
278

279
                ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay)
2,398✔
280
                if err := t.rateLimiter.Wait(ctx); err != nil {
2,398✔
281
                        cancel()
×
282
                        if level == defaultProcessingQueueLevel {
×
283
                                t.upsertPollTime(level, time.Time{})
×
284
                        } else {
×
285
                                t.setupBackoffTimer(level)
×
286
                        }
×
287
                        continue
×
288
                }
289
                cancel()
2,398✔
290

2,398✔
291
                resp, err := t.readAndFilterTasks(readLevel, maxReadLevel, nextPageToken)
2,398✔
292
                if err != nil {
2,398✔
293
                        t.logger.Error("Processor unable to retrieve tasks", tag.Error(err))
×
294
                        t.upsertPollTime(level, time.Time{}) // re-enqueue the event
×
295
                        continue
×
296
                }
297

298
                tasks := make(map[task.Key]task.Task)
2,398✔
299
                taskChFull := false
2,398✔
300
                submittedCount := 0
2,398✔
301
                for _, taskInfo := range resp.timerTasks {
4,746✔
302
                        if !domainFilter.Filter(taskInfo.GetDomainID()) {
2,350✔
303
                                continue
2✔
304
                        }
305

306
                        task := t.taskInitializer(taskInfo)
2,346✔
307
                        tasks[newTimerTaskKey(taskInfo.GetVisibilityTimestamp(), taskInfo.GetTaskID())] = task
2,346✔
308
                        submitted, err := t.submitTask(task)
2,346✔
309
                        if err != nil {
2,346✔
310
                                // only err here is due to the fact that processor has been shutdown
×
311
                                // return instead of continue
×
312
                                return
×
313
                        }
×
314
                        taskChFull = taskChFull || !submitted
2,346✔
315
                        if submitted {
4,692✔
316
                                submittedCount++
2,346✔
317
                        }
2,346✔
318
                }
319
                t.logger.Debugf("Submitted %d timer tasks successfully out of %d tasks", submittedCount, len(resp.timerTasks))
2,398✔
320

2,398✔
321
                var newReadLevel task.Key
2,398✔
322
                if len(resp.nextPageToken) == 0 {
4,795✔
323
                        newReadLevel = maxReadLevel
2,397✔
324
                        if resp.lookAheadTask != nil {
4,721✔
325
                                // lookAheadTask may exist only when nextPageToken is empty
2,324✔
326
                                // notice that lookAheadTask.VisibilityTimestamp may be larger than shard max read level,
2,324✔
327
                                // which means new tasks can be generated before that timestamp. This issue is solved by
2,324✔
328
                                // upsertPollTime whenever there are new tasks
2,324✔
329
                                lookAheadTimestamp := resp.lookAheadTask.GetVisibilityTimestamp()
2,324✔
330
                                t.upsertPollTime(level, lookAheadTimestamp)
2,324✔
331
                                newReadLevel = minTaskKey(newReadLevel, newTimerTaskKey(lookAheadTimestamp, 0))
2,324✔
332
                                t.logger.Debugf("nextPageToken is empty for timer queue at level %d so setting newReadLevel to max(lookAheadTask.timestamp: %v, maxReadLevel: %v)", level, lookAheadTimestamp, maxReadLevel)
2,324✔
333
                        } else {
2,400✔
334
                                // else we have no idea when the next poll should happen
76✔
335
                                // rely on notifyNewTask to trigger the next poll even for non-default queue.
76✔
336
                                // another option for non-default queue is that we can setup a backoff timer to check back later
76✔
337
                                t.logger.Debugf("nextPageToken is empty for timer queue at level %d and there' no lookAheadTask. setting readLevel to maxReadLevel: %v", level, maxReadLevel)
76✔
338
                        }
76✔
339
                } else {
1✔
340
                        // more tasks should be loaded for this processing queue
1✔
341
                        // record the current progress and update the poll time
1✔
342
                        if level == defaultProcessingQueueLevel || !taskChFull {
2✔
343
                                t.logger.Debugf("upserting poll time for timer queue at level %d because nextPageToken is not empty and !taskChFull", level)
1✔
344
                                t.upsertPollTime(level, time.Time{})
1✔
345
                        } else {
1✔
346
                                t.logger.Debugf("setting up backoff timer for timer queue at level %d because nextPageToken is not empty and taskChFull", level)
×
347
                                t.setupBackoffTimer(level)
×
348
                        }
×
349
                        t.processingQueueReadProgress[level] = timeTaskReadProgress{
1✔
350
                                currentQueue:  activeQueue,
1✔
351
                                readLevel:     readLevel,
1✔
352
                                maxReadLevel:  maxReadLevel,
1✔
353
                                nextPageToken: resp.nextPageToken,
1✔
354
                        }
1✔
355
                        if len(resp.timerTasks) > 0 {
2✔
356
                                newReadLevel = newTimerTaskKey(resp.timerTasks[len(resp.timerTasks)-1].GetVisibilityTimestamp(), 0)
1✔
357
                        }
1✔
358
                }
359
                queueCollection.AddTasks(tasks, newReadLevel)
2,398✔
360
        }
361
}
362

363
// splitQueue splits the processing queue collection based on some policy
364
// and resets the timer with jitter for next run
365
func (t *timerQueueProcessorBase) splitQueue(splitQueueTimer *time.Timer) {
217✔
366
        splitPolicy := t.initializeSplitPolicy(
217✔
367
                func(key task.Key, domainID string) task.Key {
217✔
368
                        return newTimerTaskKey(
×
369
                                key.(timerTaskKey).visibilityTimestamp.Add(
×
370
                                        t.options.SplitLookAheadDurationByDomainID(domainID),
×
371
                                ),
×
372
                                0,
×
373
                        )
×
374
                },
×
375
        )
376

377
        t.splitProcessingQueueCollectionFn(splitPolicy, t.upsertPollTime)
217✔
378

217✔
379
        splitQueueTimer.Reset(backoff.JitDuration(
217✔
380
                t.options.SplitQueueInterval(),
217✔
381
                t.options.SplitQueueIntervalJitterCoefficient(),
217✔
382
        ))
217✔
383
}
384

385
// handleAckLevelUpdate updates ack level and resets timer with jitter.
386
// returns true if processing should be terminated
387
func (t *timerQueueProcessorBase) handleAckLevelUpdate(updateAckTimer *time.Timer) bool {
461✔
388
        processFinished, _, err := t.updateAckLevelFn()
461✔
389
        var errShardClosed *shard.ErrShardClosed
461✔
390
        if errors.As(err, &errShardClosed) || (err == nil && processFinished) {
461✔
391
                return true
×
392
        }
×
393
        updateAckTimer.Reset(backoff.JitDuration(
461✔
394
                t.options.UpdateAckInterval(),
461✔
395
                t.options.UpdateAckIntervalJitterCoefficient(),
461✔
396
        ))
461✔
397
        return false
461✔
398
}
399

400
func (t *timerQueueProcessorBase) updateTimerGates() {
2,469✔
401
        maxRedispatchQueueSize := t.options.MaxRedispatchQueueSize()
2,469✔
402
        if t.redispatcher.Size() > maxRedispatchQueueSize {
2,469✔
403
                t.redispatcher.Redispatch(maxRedispatchQueueSize)
×
404
                if t.redispatcher.Size() > maxRedispatchQueueSize {
×
405
                        // if redispatcher still has a large number of tasks
×
406
                        // this only happens when system is under very high load
×
407
                        // we should backoff here instead of keeping submitting tasks to task processor
×
408
                        // don't call t.timerGate.Update(time.Now() + loadQueueTaskThrottleRetryDelay) as the time in
×
409
                        // standby timer processor is not real time and is managed separately
×
410
                        time.Sleep(backoff.JitDuration(
×
411
                                t.options.PollBackoffInterval(),
×
412
                                t.options.PollBackoffIntervalJitterCoefficient(),
×
413
                        ))
×
414
                }
×
415
                t.timerGate.Update(time.Time{})
×
416
                return
×
417
        }
418

419
        t.pollTimeLock.Lock()
2,469✔
420
        levels := make(map[int]struct{})
2,469✔
421
        now := t.shard.GetCurrentTime(t.clusterName)
2,469✔
422
        for level, pollTime := range t.nextPollTime {
4,938✔
423
                if !now.Before(pollTime) {
4,938✔
424
                        levels[level] = struct{}{}
2,469✔
425
                        delete(t.nextPollTime, level)
2,469✔
426
                } else {
2,469✔
427
                        t.timerGate.Update(pollTime)
×
428
                }
×
429
        }
430
        t.pollTimeLock.Unlock()
2,469✔
431

2,469✔
432
        t.processQueueCollections(levels)
2,469✔
433
}
434

435
func (t *timerQueueProcessorBase) handleNewTimer() {
3,092✔
436
        t.newTimeLock.Lock()
3,092✔
437
        newTime := t.newTime
3,092✔
438
        t.newTime = time.Time{}
3,092✔
439
        t.newTimeLock.Unlock()
3,092✔
440

3,092✔
441
        // New Timer has arrived.
3,092✔
442
        t.metricsScope.IncCounter(metrics.NewTimerNotifyCounter)
3,092✔
443
        // notify all queue collections as they are waiting for the notification when there's
3,092✔
444
        // no more task to process. For non-default queue, we choose to do periodic polling
3,092✔
445
        // in the future, then we don't need to notify them.
3,092✔
446
        for _, queueCollection := range t.processingQueueCollections {
6,184✔
447
                t.upsertPollTime(queueCollection.Level(), newTime)
3,092✔
448
        }
3,092✔
449
}
450

451
func (t *timerQueueProcessorBase) handleActionNotification(notification actionNotification) {
209✔
452
        t.processorBase.handleActionNotification(notification, func() {
418✔
453
                switch notification.action.ActionType {
209✔
454
                case ActionTypeReset:
×
455
                        t.upsertPollTime(defaultProcessingQueueLevel, time.Time{})
×
456
                }
457
        })
458
}
459

460
func (t *timerQueueProcessorBase) readAndFilterTasks(readLevel, maxReadLevel task.Key, nextPageToken []byte) (*filteredTimerTasksResponse, error) {
2,403✔
461
        resp, err := t.getTimerTasks(readLevel, maxReadLevel, nextPageToken, t.options.BatchSize())
2,403✔
462
        if err != nil {
2,403✔
463
                return nil, err
×
464
        }
×
465

466
        var lookAheadTask *persistence.TimerTaskInfo
2,403✔
467
        filteredTasks := []*persistence.TimerTaskInfo{}
2,403✔
468
        for _, timerTask := range resp.Timers {
6,308✔
469
                if !t.isProcessNow(timerTask.GetVisibilityTimestamp()) {
5,457✔
470
                        // found the first task that is not ready to be processed yet.
1,552✔
471
                        // reset NextPageToken so we can load more tasks starting from this lookAheadTask next time.
1,552✔
472
                        lookAheadTask = timerTask
1,552✔
473
                        resp.NextPageToken = nil
1,552✔
474
                        break
1,552✔
475
                }
476
                filteredTasks = append(filteredTasks, timerTask)
2,354✔
477
        }
478

479
        if len(resp.NextPageToken) == 0 && lookAheadTask == nil {
3,253✔
480
                // only look ahead within the processing queue boundary
850✔
481
                lookAheadTask, err = t.readLookAheadTask(maxReadLevel, maximumTimerTaskKey)
850✔
482
                if err != nil {
851✔
483
                        // we don't know if look ahead task exists or not, but we know if it exists,
1✔
484
                        // it's visibility timestamp is larger than or equal to maxReadLevel.
1✔
485
                        // so, create a fake look ahead task so another load can be triggered at that time.
1✔
486
                        lookAheadTask = &persistence.TimerTaskInfo{
1✔
487
                                VisibilityTimestamp: maxReadLevel.(timerTaskKey).visibilityTimestamp,
1✔
488
                        }
1✔
489
                }
1✔
490
        }
491

492
        t.logger.Debugf("readAndFilterTasks returning %d tasks and lookAheadTask: %#v for readLevel: %#v, maxReadLevel: %#v", len(filteredTasks), lookAheadTask, readLevel, maxReadLevel)
2,403✔
493
        return &filteredTimerTasksResponse{
2,403✔
494
                timerTasks:    filteredTasks,
2,403✔
495
                lookAheadTask: lookAheadTask,
2,403✔
496
                nextPageToken: resp.NextPageToken,
2,403✔
497
        }, nil
2,403✔
498
}
499

500
func (t *timerQueueProcessorBase) readLookAheadTask(lookAheadStartLevel task.Key, lookAheadMaxLevel task.Key) (*persistence.TimerTaskInfo, error) {
851✔
501
        resp, err := t.getTimerTasks(lookAheadStartLevel, lookAheadMaxLevel, nil, 1)
851✔
502
        if err != nil {
852✔
503
                return nil, err
1✔
504
        }
1✔
505

506
        if len(resp.Timers) == 1 {
1,626✔
507
                return resp.Timers[0], nil
776✔
508
        }
776✔
509
        return nil, nil
77✔
510
}
511

512
func (t *timerQueueProcessorBase) getTimerTasks(readLevel, maxReadLevel task.Key, nextPageToken []byte, batchSize int) (*persistence.GetTimerIndexTasksResponse, error) {
3,253✔
513
        request := &persistence.GetTimerIndexTasksRequest{
3,253✔
514
                MinTimestamp:  readLevel.(timerTaskKey).visibilityTimestamp,
3,253✔
515
                MaxTimestamp:  maxReadLevel.(timerTaskKey).visibilityTimestamp,
3,253✔
516
                BatchSize:     batchSize,
3,253✔
517
                NextPageToken: nextPageToken,
3,253✔
518
        }
3,253✔
519

3,253✔
520
        var err error
3,253✔
521
        var response *persistence.GetTimerIndexTasksResponse
3,253✔
522
        retryCount := t.shard.GetConfig().TimerProcessorGetFailureRetryCount()
3,253✔
523
        for attempt := 0; attempt < retryCount; attempt++ {
6,510✔
524
                response, err = t.shard.GetExecutionManager().GetTimerIndexTasks(context.Background(), request)
3,257✔
525
                if err == nil {
6,509✔
526
                        return response, nil
3,252✔
527
                }
3,252✔
528
                backoff := time.Duration(attempt*100) * time.Millisecond
5✔
529
                t.logger.Debugf("Failed to get timer tasks from execution manager. error: %v, attempt: %d, retryCount: %d, backoff: %v", err, attempt, retryCount, backoff)
5✔
530
                time.Sleep(backoff)
5✔
531
        }
532
        return nil, err
1✔
533
}
534

535
func (t *timerQueueProcessorBase) isProcessNow(expiryTime time.Time) bool {
3,909✔
536
        if expiryTime.IsZero() {
3,910✔
537
                // return true, but somewhere probably have bug creating empty timerTask.
1✔
538
                t.logger.Warn("Timer task has timestamp zero")
1✔
539
        }
1✔
540
        return !t.shard.GetCurrentTime(t.clusterName).Before(expiryTime)
3,909✔
541
}
542

543
func (t *timerQueueProcessorBase) notifyNewTimers(timerTasks []persistence.Task) {
3,113✔
544
        if len(timerTasks) == 0 {
3,113✔
545
                return
×
546
        }
×
547

548
        isActive := t.options.MetricScope == metrics.TimerActiveQueueProcessorScope
3,113✔
549
        minNewTime := timerTasks[0].GetVisibilityTimestamp()
3,113✔
550
        shardIDTag := metrics.ShardIDTag(t.shard.GetShardID())
3,113✔
551
        for _, timerTask := range timerTasks {
6,320✔
552
                ts := timerTask.GetVisibilityTimestamp()
3,207✔
553
                if ts.Before(minNewTime) {
3,283✔
554
                        minNewTime = ts
76✔
555
                }
76✔
556

557
                taskScopeIdx := task.GetTimerTaskMetricScope(
3,207✔
558
                        timerTask.GetType(),
3,207✔
559
                        isActive,
3,207✔
560
                )
3,207✔
561
                t.metricsClient.Scope(taskScopeIdx).Tagged(shardIDTag).IncCounter(metrics.NewTimerNotifyCounter)
3,207✔
562
        }
563

564
        t.notifyNewTimer(minNewTime)
3,113✔
565
}
566

567
func (t *timerQueueProcessorBase) notifyNewTimer(newTime time.Time) {
3,113✔
568
        t.newTimeLock.Lock()
3,113✔
569
        defer t.newTimeLock.Unlock()
3,113✔
570

3,113✔
571
        if t.newTime.IsZero() || newTime.Before(t.newTime) {
6,206✔
572
                t.logger.Debugf("Updating newTime from %v to %v", t.newTime, newTime)
3,093✔
573
                t.newTime = newTime
3,093✔
574
                select {
3,093✔
575
                case t.newTimerCh <- struct{}{}:
3,093✔
576
                        // Notified about new time.
UNCOV
577
                default:
×
578
                        // Channel "full" -> drop and move on, this will happen only if service is in high load.
579
                }
580
        }
581
}
582

583
func (t *timerQueueProcessorBase) upsertPollTime(level int, newPollTime time.Time) {
8,105✔
584
        t.pollTimeLock.Lock()
8,105✔
585
        defer t.pollTimeLock.Unlock()
8,105✔
586

8,105✔
587
        if _, ok := t.backoffTimer[level]; ok {
8,105✔
588
                // honor existing backoff timer
×
589
                t.logger.Debugf("Skipping upsertPollTime for timer queue at level %d because there's a backoff timer", level)
×
590
                return
×
591
        }
×
592

593
        currentPollTime, ok := t.nextPollTime[level]
8,105✔
594
        if !ok || newPollTime.Before(currentPollTime) {
13,461✔
595
                t.logger.Debugf("Updating poll timer for timer queue at level %d. CurrentPollTime: %v, newPollTime: %v", level, currentPollTime, newPollTime)
5,356✔
596
                t.nextPollTime[level] = newPollTime
5,356✔
597
                t.timerGate.Update(newPollTime)
5,356✔
598
                return
5,356✔
599
        }
5,356✔
600

601
        t.logger.Debugf("Skipping upsertPollTime for level %d because currentPollTime %v is before newPollTime %v", level, currentPollTime, newPollTime)
2,752✔
602
}
603

604
// setupBackoffTimer will trigger a poll for the specified processing queue collection
605
// after a certain period of (real) time. This means for standby timer, even if the cluster time
606
// has not been updated, the poll will still be triggered when the timer fired. Use this function
607
// for delaying the load for processing queue. If a poll should be triggered immediately
608
// use upsertPollTime.
609
func (t *timerQueueProcessorBase) setupBackoffTimer(level int) {
×
610
        t.pollTimeLock.Lock()
×
611
        defer t.pollTimeLock.Unlock()
×
612

×
613
        if _, ok := t.backoffTimer[level]; ok {
×
614
                // honor existing backoff timer
×
615
                return
×
616
        }
×
617

618
        t.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
×
619
        t.logger.Info("Throttled processing queue", tag.QueueLevel(level))
×
620
        backoffDuration := backoff.JitDuration(
×
621
                t.options.PollBackoffInterval(),
×
622
                t.options.PollBackoffIntervalJitterCoefficient(),
×
623
        )
×
624
        t.backoffTimer[level] = time.AfterFunc(backoffDuration, func() {
×
625
                select {
×
626
                case <-t.shutdownCh:
×
627
                        return
×
628
                default:
×
629
                }
630

631
                t.pollTimeLock.Lock()
×
632
                defer t.pollTimeLock.Unlock()
×
633

×
634
                t.nextPollTime[level] = time.Time{}
×
635
                t.timerGate.Update(time.Time{})
×
636
                delete(t.backoffTimer, level)
×
637
        })
638
}
639

640
func newTimerTaskKey(visibilityTimestamp time.Time, taskID int64) task.Key {
7,947✔
641
        return timerTaskKey{
7,947✔
642
                visibilityTimestamp: visibilityTimestamp,
7,947✔
643
                taskID:              taskID,
7,947✔
644
        }
7,947✔
645
}
7,947✔
646

647
func (k timerTaskKey) Less(
648
        key task.Key,
649
) bool {
29,556✔
650
        timerKey := key.(timerTaskKey)
29,556✔
651
        if k.visibilityTimestamp.Equal(timerKey.visibilityTimestamp) {
30,257✔
652
                return k.taskID < timerKey.taskID
701✔
653
        }
701✔
654
        return k.visibilityTimestamp.Before(timerKey.visibilityTimestamp)
28,858✔
655
}
656

657
func (k timerTaskKey) String() string {
2,397✔
658
        return fmt.Sprintf("{visibilityTimestamp: %v, taskID: %v}", k.visibilityTimestamp, k.taskID)
2,397✔
659
}
2,397✔
660

661
func newTimerQueueProcessorOptions(
662
        config *config.Config,
663
        isActive bool,
664
        isFailover bool,
665
) *queueProcessorOptions {
165✔
666
        options := &queueProcessorOptions{
165✔
667
                BatchSize:                            config.TimerTaskBatchSize,
165✔
668
                DeleteBatchSize:                      config.TimerTaskDeleteBatchSize,
165✔
669
                MaxPollRPS:                           config.TimerProcessorMaxPollRPS,
165✔
670
                MaxPollInterval:                      config.TimerProcessorMaxPollInterval,
165✔
671
                MaxPollIntervalJitterCoefficient:     config.TimerProcessorMaxPollIntervalJitterCoefficient,
165✔
672
                UpdateAckInterval:                    config.TimerProcessorUpdateAckInterval,
165✔
673
                UpdateAckIntervalJitterCoefficient:   config.TimerProcessorUpdateAckIntervalJitterCoefficient,
165✔
674
                RedispatchIntervalJitterCoefficient:  config.TaskRedispatchIntervalJitterCoefficient,
165✔
675
                MaxRedispatchQueueSize:               config.TimerProcessorMaxRedispatchQueueSize,
165✔
676
                SplitQueueInterval:                   config.TimerProcessorSplitQueueInterval,
165✔
677
                SplitQueueIntervalJitterCoefficient:  config.TimerProcessorSplitQueueIntervalJitterCoefficient,
165✔
678
                PollBackoffInterval:                  config.QueueProcessorPollBackoffInterval,
165✔
679
                PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
165✔
680
                EnableGracefulSyncShutdown:           config.QueueProcessorEnableGracefulSyncShutdown,
165✔
681
        }
165✔
682

165✔
683
        if isFailover {
165✔
684
                // disable queue split for failover processor
×
685
                options.EnableSplit = dynamicconfig.GetBoolPropertyFn(false)
×
686

×
687
                // disable persist and load processing queue states for failover processor as it will never be split
×
688
                options.EnablePersistQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
689
                options.EnableLoadQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
690

×
691
                options.MaxStartJitterInterval = config.TimerProcessorFailoverMaxStartJitterInterval
×
692
        } else {
165✔
693
                options.EnableSplit = config.QueueProcessorEnableSplit
165✔
694
                options.SplitMaxLevel = config.QueueProcessorSplitMaxLevel
165✔
695
                options.EnableRandomSplitByDomainID = config.QueueProcessorEnableRandomSplitByDomainID
165✔
696
                options.RandomSplitProbability = config.QueueProcessorRandomSplitProbability
165✔
697
                options.EnablePendingTaskSplitByDomainID = config.QueueProcessorEnablePendingTaskSplitByDomainID
165✔
698
                options.PendingTaskSplitThreshold = config.QueueProcessorPendingTaskSplitThreshold
165✔
699
                options.EnableStuckTaskSplitByDomainID = config.QueueProcessorEnableStuckTaskSplitByDomainID
165✔
700
                options.StuckTaskSplitThreshold = config.QueueProcessorStuckTaskSplitThreshold
165✔
701
                options.SplitLookAheadDurationByDomainID = config.QueueProcessorSplitLookAheadDurationByDomainID
165✔
702

165✔
703
                options.EnablePersistQueueStates = config.QueueProcessorEnablePersistQueueStates
165✔
704
                options.EnableLoadQueueStates = config.QueueProcessorEnableLoadQueueStates
165✔
705

165✔
706
                options.MaxStartJitterInterval = dynamicconfig.GetDurationPropertyFn(0)
165✔
707
        }
165✔
708

709
        if isActive {
257✔
710
                options.MetricScope = metrics.TimerActiveQueueProcessorScope
92✔
711
                options.RedispatchInterval = config.ActiveTaskRedispatchInterval
92✔
712
        } else {
168✔
713
                options.MetricScope = metrics.TimerStandbyQueueProcessorScope
76✔
714
                options.RedispatchInterval = config.StandbyTaskRedispatchInterval
76✔
715
        }
76✔
716

717
        return options
165✔
718
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc