• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01909e15-6794-4b38-abfa-ab8dde3a5a9b

28 Jun 2024 10:07PM UTC coverage: 71.517% (+0.08%) from 71.438%
01909e15-6794-4b38-abfa-ab8dde3a5a9b

push

buildkite

web-flow
Global ratelimiter: everything else (#6141)

After too many attempts to break this apart and build different portions in self-contained ways, and running into various inter-dependent roadblocks... I just gave up and did it all at once.

# Rollout plan for people who don't want or need this system

Do nothing :)

As of this PR, you'll use "disabled" and that should be as close to "no changes at all" as possible.
Soon, you'll get "local", and then you'll have some new metrics you can use (or ignore) but otherwise no behavior changes.

And that'll be it.  The "global" load-balanced stuff is likely to remain opt-in.

# Rollout plan for us

For deployment: any order is fine / should not behave (too) badly.  Even if "global" or either shadow mode is selected on the initial deploy.  Frontends will have background `RatelimitUpdate` request failures until History is deployed, but that'll just mean it continues to use the "local" internal fallback and that's in practice the same behavior as "local" or "disabled", just slightly noisier.

The _smoothest_ deployment is: deploy everything on "disabled" or "local" (the default(s), so no requests are sent until deploy is done), then switch to "local-shadow-global" to warm global limiters / check that it's working, then "global" to use the global behavior.  

Rolling back is just the opposite.  Ideally disable things first to stop the requests, but even if you don't it should be fine.

In more detail:

1. At merge time, this will set the "key mode" (`frontend.globalRatelimiterMode`) to "disabled", which gets as close as is reasonably possible to acting _exactly_ like it did before this PR.
   - This is also effectively the panic button for the initial rollout.
2. Once that proves to not immediately explode, switch to "local" for all keys.  This will keep the current ratelimiter rates, but will start collecting and emitting ratelimiter-usage metrics, so we can make sure that doesn't explode eithe... (continued)

688 of 850 new or added lines in 29 files covered. (80.94%)

14 existing lines in 7 files now uncovered.

105310 of 147252 relevant lines covered (71.52%)

2625.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.87
/service/history/queue/timer_queue_processor_base.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "math"
28
        "math/rand"
29
        "sync"
30
        "sync/atomic"
31
        "time"
32

33
        "github.com/uber/cadence/common"
34
        "github.com/uber/cadence/common/backoff"
35
        "github.com/uber/cadence/common/dynamicconfig"
36
        "github.com/uber/cadence/common/log"
37
        "github.com/uber/cadence/common/log/tag"
38
        "github.com/uber/cadence/common/metrics"
39
        "github.com/uber/cadence/common/persistence"
40
        "github.com/uber/cadence/service/history/config"
41
        "github.com/uber/cadence/service/history/shard"
42
        "github.com/uber/cadence/service/history/task"
43
)
44

45
var (
46
        maximumTimerTaskKey = newTimerTaskKey(
47
                time.Unix(0, math.MaxInt64),
48
                0,
49
        )
50
)
51

52
type (
53
        timerTaskKey struct {
54
                visibilityTimestamp time.Time
55
                taskID              int64
56
        }
57

58
        timeTaskReadProgress struct {
59
                currentQueue  ProcessingQueue
60
                readLevel     task.Key
61
                maxReadLevel  task.Key
62
                nextPageToken []byte
63
        }
64

65
        timerQueueProcessorBase struct {
66
                *processorBase
67

68
                taskInitializer task.Initializer
69
                clusterName     string
70
                pollTimeLock    sync.Mutex
71
                backoffTimer    map[int]*time.Timer
72
                nextPollTime    map[int]time.Time
73
                timerGate       TimerGate
74

75
                // timer notification
76
                newTimerCh  chan struct{}
77
                newTimeLock sync.Mutex
78
                newTime     time.Time
79

80
                processingQueueReadProgress map[int]timeTaskReadProgress
81

82
                updateAckLevelFn                 func() (bool, task.Key, error)
83
                splitProcessingQueueCollectionFn func(splitPolicy ProcessingQueueSplitPolicy, upsertPollTimeFn func(int, time.Time))
84
        }
85

86
        filteredTimerTasksResponse struct {
87
                timerTasks    []*persistence.TimerTaskInfo
88
                lookAheadTask *persistence.TimerTaskInfo
89
                nextPageToken []byte
90
        }
91
)
92

93
func newTimerQueueProcessorBase(
94
        clusterName string,
95
        shard shard.Context,
96
        processingQueueStates []ProcessingQueueState,
97
        taskProcessor task.Processor,
98
        timerGate TimerGate,
99
        options *queueProcessorOptions,
100
        updateMaxReadLevel updateMaxReadLevelFn,
101
        updateClusterAckLevel updateClusterAckLevelFn,
102
        updateProcessingQueueStates updateProcessingQueueStatesFn,
103
        queueShutdown queueShutdownFn,
104
        taskFilter task.Filter,
105
        taskExecutor task.Executor,
106
        logger log.Logger,
107
        metricsClient metrics.Client,
108
) *timerQueueProcessorBase {
165✔
109
        processorBase := newProcessorBase(
165✔
110
                shard,
165✔
111
                processingQueueStates,
165✔
112
                taskProcessor,
165✔
113
                options,
165✔
114
                updateMaxReadLevel,
165✔
115
                updateClusterAckLevel,
165✔
116
                updateProcessingQueueStates,
165✔
117
                queueShutdown,
165✔
118
                logger.WithTags(tag.ComponentTimerQueue),
165✔
119
                metricsClient,
165✔
120
        )
165✔
121

165✔
122
        queueType := task.QueueTypeActiveTimer
165✔
123
        if options.MetricScope == metrics.TimerStandbyQueueProcessorScope {
241✔
124
                queueType = task.QueueTypeStandbyTimer
76✔
125
        }
76✔
126

127
        t := &timerQueueProcessorBase{
165✔
128
                processorBase: processorBase,
165✔
129
                taskInitializer: func(taskInfo task.Info) task.Task {
2,519✔
130
                        return task.NewTimerTask(
2,354✔
131
                                shard,
2,354✔
132
                                taskInfo,
2,354✔
133
                                queueType,
2,354✔
134
                                task.InitializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
2,354✔
135
                                taskFilter,
2,354✔
136
                                taskExecutor,
2,354✔
137
                                taskProcessor,
2,354✔
138
                                processorBase.redispatcher.AddTask,
2,354✔
139
                                shard.GetConfig().TaskCriticalRetryCount,
2,354✔
140
                        )
2,354✔
141
                },
2,354✔
142
                clusterName:                 clusterName,
143
                backoffTimer:                make(map[int]*time.Timer),
144
                nextPollTime:                make(map[int]time.Time),
145
                timerGate:                   timerGate,
146
                newTimerCh:                  make(chan struct{}, 1),
147
                processingQueueReadProgress: make(map[int]timeTaskReadProgress),
148
        }
149

150
        t.updateAckLevelFn = t.updateAckLevel
165✔
151
        t.splitProcessingQueueCollectionFn = t.splitProcessingQueueCollection
165✔
152
        return t
165✔
153
}
154

155
func (t *timerQueueProcessorBase) Start() {
149✔
156
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
149✔
157
                return
×
158
        }
×
159

160
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarting)
149✔
161
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarted)
149✔
162

149✔
163
        t.redispatcher.Start()
149✔
164

149✔
165
        newPollTime := time.Time{}
149✔
166
        if startJitter := t.options.MaxStartJitterInterval(); startJitter > 0 {
149✔
167
                now := t.shard.GetTimeSource().Now()
×
168
                newPollTime = now.Add(time.Duration(rand.Int63n(int64(startJitter))))
×
169
        }
×
170
        for _, queueCollections := range t.processingQueueCollections {
298✔
171
                t.upsertPollTime(queueCollections.Level(), newPollTime)
149✔
172
        }
149✔
173

174
        t.shutdownWG.Add(1)
149✔
175
        go t.processorPump()
149✔
176
}
177

178
// Edge Case: Stop doesn't stop TimerGate if timerQueueProcessorBase is only initiliazed without starting
179
// As a result, TimerGate needs to be stopped separately
180
// One way to fix this is to make sure TimerGate doesn't start daemon loop on initilization and requires explicit Start
181
func (t *timerQueueProcessorBase) Stop() {
149✔
182
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
149✔
183
                return
×
184
        }
×
185

186
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopping)
149✔
187
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopped)
149✔
188

149✔
189
        t.timerGate.Close()
149✔
190
        close(t.shutdownCh)
149✔
191
        t.pollTimeLock.Lock()
149✔
192
        for _, timer := range t.backoffTimer {
149✔
193
                timer.Stop()
×
194
        }
×
195
        t.pollTimeLock.Unlock()
149✔
196

149✔
197
        if success := common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout); !success {
149✔
198
                t.logger.Warn("timerQueueProcessorBase timed out on shut down", tag.LifeCycleStopTimedout)
×
199
        }
×
200

201
        t.redispatcher.Stop()
149✔
202
}
203

204
func (t *timerQueueProcessorBase) processorPump() {
149✔
205
        defer t.shutdownWG.Done()
149✔
206
        updateAckTimer := time.NewTimer(backoff.JitDuration(t.options.UpdateAckInterval(), t.options.UpdateAckIntervalJitterCoefficient()))
149✔
207
        defer updateAckTimer.Stop()
149✔
208
        splitQueueTimer := time.NewTimer(backoff.JitDuration(t.options.SplitQueueInterval(), t.options.SplitQueueIntervalJitterCoefficient()))
149✔
209
        defer splitQueueTimer.Stop()
149✔
210

149✔
211
        for {
6,775✔
212
                select {
6,626✔
213
                case <-t.shutdownCh:
149✔
214
                        return
149✔
215
                case <-t.timerGate.FireChan():
2,498✔
216
                        t.updateTimerGates()
2,498✔
217
                case <-updateAckTimer.C:
458✔
218
                        if stopPump := t.handleAckLevelUpdate(updateAckTimer); stopPump {
458✔
219
                                if !t.options.EnableGracefulSyncShutdown() {
×
220
                                        go t.Stop()
×
221
                                        return
×
222
                                }
×
223

224
                                t.Stop()
×
225
                                return
×
226
                        }
227
                case <-t.newTimerCh:
3,100✔
228
                        t.handleNewTimer()
3,100✔
229
                case <-splitQueueTimer.C:
216✔
230
                        t.splitQueue(splitQueueTimer)
216✔
231
                case notification := <-t.actionNotifyCh:
218✔
232
                        t.handleActionNotification(notification)
218✔
233
                }
234
        }
235
}
236

237
func (t *timerQueueProcessorBase) processQueueCollections(levels map[int]struct{}) {
2,502✔
238
        for _, queueCollection := range t.processingQueueCollections {
5,004✔
239
                level := queueCollection.Level()
2,502✔
240
                if _, ok := levels[level]; !ok {
2,502✔
UNCOV
241
                        continue
×
242
                }
243

244
                activeQueue := queueCollection.ActiveQueue()
2,502✔
245
                if activeQueue == nil {
2,502✔
246
                        // process for this queue collection has finished
×
247
                        // it's possible that new queue will be added to this collection later though,
×
248
                        // pollTime will be updated after split/merge
×
249
                        t.logger.Debug("Active queue is nil for timer queue at this level", tag.QueueLevel(level))
×
250
                        continue
×
251
                }
252

253
                t.upsertPollTime(level, t.shard.GetCurrentTime(t.clusterName).Add(backoff.JitDuration(
2,502✔
254
                        t.options.MaxPollInterval(),
2,502✔
255
                        t.options.MaxPollIntervalJitterCoefficient(),
2,502✔
256
                )))
2,502✔
257

2,502✔
258
                var nextPageToken []byte
2,502✔
259
                readLevel := activeQueue.State().ReadLevel()
2,502✔
260
                maxReadLevel := minTaskKey(activeQueue.State().MaxLevel(), t.updateMaxReadLevel())
2,502✔
261
                domainFilter := activeQueue.State().DomainFilter()
2,502✔
262

2,502✔
263
                if progress, ok := t.processingQueueReadProgress[level]; ok {
2,504✔
264
                        if progress.currentQueue == activeQueue {
4✔
265
                                readLevel = progress.readLevel
2✔
266
                                maxReadLevel = progress.maxReadLevel
2✔
267
                                nextPageToken = progress.nextPageToken
2✔
268
                        }
2✔
269
                        delete(t.processingQueueReadProgress, level)
2✔
270
                }
271

272
                if !readLevel.Less(maxReadLevel) {
2,580✔
273
                        // notify timer gate about the min time
78✔
274
                        t.upsertPollTime(level, readLevel.(timerTaskKey).visibilityTimestamp)
78✔
275
                        t.logger.Debug("Skipping processing timer queue at this level because readLevel >= maxReadLevel", tag.QueueLevel(level))
78✔
276
                        continue
78✔
277
                }
278

279
                ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay)
2,427✔
280
                if err := t.rateLimiter.Wait(ctx); err != nil {
2,427✔
281
                        cancel()
×
282
                        if level == defaultProcessingQueueLevel {
×
283
                                t.upsertPollTime(level, time.Time{})
×
284
                        } else {
×
285
                                t.setupBackoffTimer(level)
×
286
                        }
×
287
                        continue
×
288
                }
289
                cancel()
2,427✔
290

2,427✔
291
                resp, err := t.readAndFilterTasks(readLevel, maxReadLevel, nextPageToken)
2,427✔
292
                if err != nil {
2,427✔
293
                        t.logger.Error("Processor unable to retrieve tasks", tag.Error(err))
×
294
                        t.upsertPollTime(level, time.Time{}) // re-enqueue the event
×
295
                        continue
×
296
                }
297

298
                tasks := make(map[task.Key]task.Task)
2,427✔
299
                taskChFull := false
2,427✔
300
                submittedCount := 0
2,427✔
301
                for _, taskInfo := range resp.timerTasks {
4,783✔
302
                        if !domainFilter.Filter(taskInfo.GetDomainID()) {
2,358✔
303
                                continue
2✔
304
                        }
305

306
                        task := t.taskInitializer(taskInfo)
2,354✔
307
                        tasks[newTimerTaskKey(taskInfo.GetVisibilityTimestamp(), taskInfo.GetTaskID())] = task
2,354✔
308
                        submitted, err := t.submitTask(task)
2,354✔
309
                        if err != nil {
2,354✔
310
                                // only err here is due to the fact that processor has been shutdown
×
311
                                // return instead of continue
×
312
                                return
×
313
                        }
×
314
                        taskChFull = taskChFull || !submitted
2,354✔
315
                        if submitted {
4,708✔
316
                                submittedCount++
2,354✔
317
                        }
2,354✔
318
                }
319
                t.logger.Debugf("Submitted %d timer tasks successfully out of %d tasks", submittedCount, len(resp.timerTasks))
2,427✔
320

2,427✔
321
                var newReadLevel task.Key
2,427✔
322
                if len(resp.nextPageToken) == 0 {
4,853✔
323
                        newReadLevel = maxReadLevel
2,426✔
324
                        if resp.lookAheadTask != nil {
4,779✔
325
                                // lookAheadTask may exist only when nextPageToken is empty
2,353✔
326
                                // notice that lookAheadTask.VisibilityTimestamp may be larger than shard max read level,
2,353✔
327
                                // which means new tasks can be generated before that timestamp. This issue is solved by
2,353✔
328
                                // upsertPollTime whenever there are new tasks
2,353✔
329
                                lookAheadTimestamp := resp.lookAheadTask.GetVisibilityTimestamp()
2,353✔
330
                                t.upsertPollTime(level, lookAheadTimestamp)
2,353✔
331
                                newReadLevel = minTaskKey(newReadLevel, newTimerTaskKey(lookAheadTimestamp, 0))
2,353✔
332
                                t.logger.Debugf("nextPageToken is empty for timer queue at level %d so setting newReadLevel to max(lookAheadTask.timestamp: %v, maxReadLevel: %v)", level, lookAheadTimestamp, maxReadLevel)
2,353✔
333
                        } else {
2,429✔
334
                                // else we have no idea when the next poll should happen
76✔
335
                                // rely on notifyNewTask to trigger the next poll even for non-default queue.
76✔
336
                                // another option for non-default queue is that we can setup a backoff timer to check back later
76✔
337
                                t.logger.Debugf("nextPageToken is empty for timer queue at level %d and there' no lookAheadTask. setting readLevel to maxReadLevel: %v", level, maxReadLevel)
76✔
338
                        }
76✔
339
                } else {
1✔
340
                        // more tasks should be loaded for this processing queue
1✔
341
                        // record the current progress and update the poll time
1✔
342
                        if level == defaultProcessingQueueLevel || !taskChFull {
2✔
343
                                t.logger.Debugf("upserting poll time for timer queue at level %d because nextPageToken is not empty and !taskChFull", level)
1✔
344
                                t.upsertPollTime(level, time.Time{})
1✔
345
                        } else {
1✔
346
                                t.logger.Debugf("setting up backoff timer for timer queue at level %d because nextPageToken is not empty and taskChFull", level)
×
347
                                t.setupBackoffTimer(level)
×
348
                        }
×
349
                        t.processingQueueReadProgress[level] = timeTaskReadProgress{
1✔
350
                                currentQueue:  activeQueue,
1✔
351
                                readLevel:     readLevel,
1✔
352
                                maxReadLevel:  maxReadLevel,
1✔
353
                                nextPageToken: resp.nextPageToken,
1✔
354
                        }
1✔
355
                        if len(resp.timerTasks) > 0 {
2✔
356
                                newReadLevel = newTimerTaskKey(resp.timerTasks[len(resp.timerTasks)-1].GetVisibilityTimestamp(), 0)
1✔
357
                        }
1✔
358
                }
359
                queueCollection.AddTasks(tasks, newReadLevel)
2,427✔
360
        }
361
}
362

363
// splitQueue splits the processing queue collection based on some policy
364
// and resets the timer with jitter for next run
365
func (t *timerQueueProcessorBase) splitQueue(splitQueueTimer *time.Timer) {
216✔
366
        splitPolicy := t.initializeSplitPolicy(
216✔
367
                func(key task.Key, domainID string) task.Key {
216✔
368
                        return newTimerTaskKey(
×
369
                                key.(timerTaskKey).visibilityTimestamp.Add(
×
370
                                        t.options.SplitLookAheadDurationByDomainID(domainID),
×
371
                                ),
×
372
                                0,
×
373
                        )
×
374
                },
×
375
        )
376

377
        t.splitProcessingQueueCollectionFn(splitPolicy, t.upsertPollTime)
216✔
378

216✔
379
        splitQueueTimer.Reset(backoff.JitDuration(
216✔
380
                t.options.SplitQueueInterval(),
216✔
381
                t.options.SplitQueueIntervalJitterCoefficient(),
216✔
382
        ))
216✔
383
}
384

385
// handleAckLevelUpdate updates ack level and resets timer with jitter.
386
// returns true if processing should be terminated
387
func (t *timerQueueProcessorBase) handleAckLevelUpdate(updateAckTimer *time.Timer) bool {
458✔
388
        processFinished, _, err := t.updateAckLevelFn()
458✔
389
        var errShardClosed *shard.ErrShardClosed
458✔
390
        if errors.As(err, &errShardClosed) || (err == nil && processFinished) {
458✔
391
                return true
×
392
        }
×
393
        updateAckTimer.Reset(backoff.JitDuration(
458✔
394
                t.options.UpdateAckInterval(),
458✔
395
                t.options.UpdateAckIntervalJitterCoefficient(),
458✔
396
        ))
458✔
397
        return false
458✔
398
}
399

400
func (t *timerQueueProcessorBase) updateTimerGates() {
2,498✔
401
        maxRedispatchQueueSize := t.options.MaxRedispatchQueueSize()
2,498✔
402
        if t.redispatcher.Size() > maxRedispatchQueueSize {
2,498✔
403
                t.redispatcher.Redispatch(maxRedispatchQueueSize)
×
404
                if t.redispatcher.Size() > maxRedispatchQueueSize {
×
405
                        // if redispatcher still has a large number of tasks
×
406
                        // this only happens when system is under very high load
×
407
                        // we should backoff here instead of keeping submitting tasks to task processor
×
408
                        // don't call t.timerGate.Update(time.Now() + loadQueueTaskThrottleRetryDelay) as the time in
×
409
                        // standby timer processor is not real time and is managed separately
×
410
                        time.Sleep(backoff.JitDuration(
×
411
                                t.options.PollBackoffInterval(),
×
412
                                t.options.PollBackoffIntervalJitterCoefficient(),
×
413
                        ))
×
414
                }
×
415
                t.timerGate.Update(time.Time{})
×
416
                return
×
417
        }
418

419
        t.pollTimeLock.Lock()
2,498✔
420
        levels := make(map[int]struct{})
2,498✔
421
        now := t.shard.GetCurrentTime(t.clusterName)
2,498✔
422
        for level, pollTime := range t.nextPollTime {
4,996✔
423
                if !now.Before(pollTime) {
4,996✔
424
                        levels[level] = struct{}{}
2,498✔
425
                        delete(t.nextPollTime, level)
2,498✔
426
                } else {
2,498✔
UNCOV
427
                        t.timerGate.Update(pollTime)
×
UNCOV
428
                }
×
429
        }
430
        t.pollTimeLock.Unlock()
2,498✔
431

2,498✔
432
        t.processQueueCollections(levels)
2,498✔
433
}
434

435
func (t *timerQueueProcessorBase) handleNewTimer() {
3,100✔
436
        t.newTimeLock.Lock()
3,100✔
437
        newTime := t.newTime
3,100✔
438
        t.newTime = time.Time{}
3,100✔
439
        t.newTimeLock.Unlock()
3,100✔
440

3,100✔
441
        // New Timer has arrived.
3,100✔
442
        t.metricsScope.IncCounter(metrics.NewTimerNotifyCounter)
3,100✔
443
        // notify all queue collections as they are waiting for the notification when there's
3,100✔
444
        // no more task to process. For non-default queue, we choose to do periodic polling
3,100✔
445
        // in the future, then we don't need to notify them.
3,100✔
446
        for _, queueCollection := range t.processingQueueCollections {
6,200✔
447
                t.upsertPollTime(queueCollection.Level(), newTime)
3,100✔
448
        }
3,100✔
449
}
450

451
func (t *timerQueueProcessorBase) handleActionNotification(notification actionNotification) {
218✔
452
        t.processorBase.handleActionNotification(notification, func() {
436✔
453
                switch notification.action.ActionType {
218✔
454
                case ActionTypeReset:
×
455
                        t.upsertPollTime(defaultProcessingQueueLevel, time.Time{})
×
456
                }
457
        })
458
}
459

460
func (t *timerQueueProcessorBase) readAndFilterTasks(readLevel, maxReadLevel task.Key, nextPageToken []byte) (*filteredTimerTasksResponse, error) {
2,432✔
461
        resp, err := t.getTimerTasks(readLevel, maxReadLevel, nextPageToken, t.options.BatchSize())
2,432✔
462
        if err != nil {
2,432✔
463
                return nil, err
×
464
        }
×
465

466
        var lookAheadTask *persistence.TimerTaskInfo
2,432✔
467
        filteredTasks := []*persistence.TimerTaskInfo{}
2,432✔
468
        for _, timerTask := range resp.Timers {
6,373✔
469
                if !t.isProcessNow(timerTask.GetVisibilityTimestamp()) {
5,523✔
470
                        // found the first task that is not ready to be processed yet.
1,582✔
471
                        // reset NextPageToken so we can load more tasks starting from this lookAheadTask next time.
1,582✔
472
                        lookAheadTask = timerTask
1,582✔
473
                        resp.NextPageToken = nil
1,582✔
474
                        break
1,582✔
475
                }
476
                filteredTasks = append(filteredTasks, timerTask)
2,362✔
477
        }
478

479
        if len(resp.NextPageToken) == 0 && lookAheadTask == nil {
3,283✔
480
                // only look ahead within the processing queue boundary
851✔
481
                lookAheadTask, err = t.readLookAheadTask(maxReadLevel, maximumTimerTaskKey)
851✔
482
                if err != nil {
852✔
483
                        // we don't know if look ahead task exists or not, but we know if it exists,
1✔
484
                        // it's visibility timestamp is larger than or equal to maxReadLevel.
1✔
485
                        // so, create a fake look ahead task so another load can be triggered at that time.
1✔
486
                        lookAheadTask = &persistence.TimerTaskInfo{
1✔
487
                                VisibilityTimestamp: maxReadLevel.(timerTaskKey).visibilityTimestamp,
1✔
488
                        }
1✔
489
                }
1✔
490
        }
491

492
        t.logger.Debugf("readAndFilterTasks returning %d tasks and lookAheadTask: %#v for readLevel: %#v, maxReadLevel: %#v", len(filteredTasks), lookAheadTask, readLevel, maxReadLevel)
2,432✔
493
        return &filteredTimerTasksResponse{
2,432✔
494
                timerTasks:    filteredTasks,
2,432✔
495
                lookAheadTask: lookAheadTask,
2,432✔
496
                nextPageToken: resp.NextPageToken,
2,432✔
497
        }, nil
2,432✔
498
}
499

500
func (t *timerQueueProcessorBase) readLookAheadTask(lookAheadStartLevel task.Key, lookAheadMaxLevel task.Key) (*persistence.TimerTaskInfo, error) {
852✔
501
        resp, err := t.getTimerTasks(lookAheadStartLevel, lookAheadMaxLevel, nil, 1)
852✔
502
        if err != nil {
853✔
503
                return nil, err
1✔
504
        }
1✔
505

506
        if len(resp.Timers) == 1 {
1,628✔
507
                return resp.Timers[0], nil
777✔
508
        }
777✔
509
        return nil, nil
77✔
510
}
511

512
func (t *timerQueueProcessorBase) getTimerTasks(readLevel, maxReadLevel task.Key, nextPageToken []byte, batchSize int) (*persistence.GetTimerIndexTasksResponse, error) {
3,283✔
513
        request := &persistence.GetTimerIndexTasksRequest{
3,283✔
514
                MinTimestamp:  readLevel.(timerTaskKey).visibilityTimestamp,
3,283✔
515
                MaxTimestamp:  maxReadLevel.(timerTaskKey).visibilityTimestamp,
3,283✔
516
                BatchSize:     batchSize,
3,283✔
517
                NextPageToken: nextPageToken,
3,283✔
518
        }
3,283✔
519

3,283✔
520
        var err error
3,283✔
521
        var response *persistence.GetTimerIndexTasksResponse
3,283✔
522
        retryCount := t.shard.GetConfig().TimerProcessorGetFailureRetryCount()
3,283✔
523
        for attempt := 0; attempt < retryCount; attempt++ {
6,570✔
524
                response, err = t.shard.GetExecutionManager().GetTimerIndexTasks(context.Background(), request)
3,287✔
525
                if err == nil {
6,569✔
526
                        return response, nil
3,282✔
527
                }
3,282✔
528
                backoff := time.Duration(attempt*100) * time.Millisecond
5✔
529
                t.logger.Debugf("Failed to get timer tasks from execution manager. error: %v, attempt: %d, retryCount: %d, backoff: %v", err, attempt, retryCount, backoff)
5✔
530
                time.Sleep(backoff)
5✔
531
        }
532
        return nil, err
1✔
533
}
534

535
func (t *timerQueueProcessorBase) isProcessNow(expiryTime time.Time) bool {
3,945✔
536
        if expiryTime.IsZero() {
3,946✔
537
                // return true, but somewhere probably have bug creating empty timerTask.
1✔
538
                t.logger.Warn("Timer task has timestamp zero")
1✔
539
        }
1✔
540
        return !t.shard.GetCurrentTime(t.clusterName).Before(expiryTime)
3,945✔
541
}
542

543
func (t *timerQueueProcessorBase) notifyNewTimers(timerTasks []persistence.Task) {
3,126✔
544
        if len(timerTasks) == 0 {
3,126✔
545
                return
×
546
        }
×
547

548
        isActive := t.options.MetricScope == metrics.TimerActiveQueueProcessorScope
3,126✔
549
        minNewTime := timerTasks[0].GetVisibilityTimestamp()
3,126✔
550
        shardIDTag := metrics.ShardIDTag(t.shard.GetShardID())
3,126✔
551
        for _, timerTask := range timerTasks {
6,346✔
552
                ts := timerTask.GetVisibilityTimestamp()
3,220✔
553
                if ts.Before(minNewTime) {
3,296✔
554
                        minNewTime = ts
76✔
555
                }
76✔
556

557
                taskScopeIdx := task.GetTimerTaskMetricScope(
3,220✔
558
                        timerTask.GetType(),
3,220✔
559
                        isActive,
3,220✔
560
                )
3,220✔
561
                t.metricsClient.Scope(taskScopeIdx).Tagged(shardIDTag).IncCounter(metrics.NewTimerNotifyCounter)
3,220✔
562
        }
563

564
        t.notifyNewTimer(minNewTime)
3,126✔
565
}
566

567
func (t *timerQueueProcessorBase) notifyNewTimer(newTime time.Time) {
3,126✔
568
        t.newTimeLock.Lock()
3,126✔
569
        defer t.newTimeLock.Unlock()
3,126✔
570

3,126✔
571
        if t.newTime.IsZero() || newTime.Before(t.newTime) {
6,228✔
572
                t.logger.Debugf("Updating newTime from %v to %v", t.newTime, newTime)
3,102✔
573
                t.newTime = newTime
3,102✔
574
                select {
3,102✔
575
                case t.newTimerCh <- struct{}{}:
3,101✔
576
                        // Notified about new time.
577
                default:
1✔
578
                        // Channel "full" -> drop and move on, this will happen only if service is in high load.
579
                }
580
        }
581
}
582

583
func (t *timerQueueProcessorBase) upsertPollTime(level int, newPollTime time.Time) {
8,171✔
584
        t.pollTimeLock.Lock()
8,171✔
585
        defer t.pollTimeLock.Unlock()
8,171✔
586

8,171✔
587
        if _, ok := t.backoffTimer[level]; ok {
8,171✔
588
                // honor existing backoff timer
×
589
                t.logger.Debugf("Skipping upsertPollTime for timer queue at level %d because there's a backoff timer", level)
×
590
                return
×
591
        }
×
592

593
        currentPollTime, ok := t.nextPollTime[level]
8,171✔
594
        if !ok || newPollTime.Before(currentPollTime) {
13,598✔
595
                t.logger.Debugf("Updating poll timer for timer queue at level %d. CurrentPollTime: %v, newPollTime: %v", level, currentPollTime, newPollTime)
5,427✔
596
                t.nextPollTime[level] = newPollTime
5,427✔
597
                t.timerGate.Update(newPollTime)
5,427✔
598
                return
5,427✔
599
        }
5,427✔
600

601
        t.logger.Debugf("Skipping upsertPollTime for level %d because currentPollTime %v is before newPollTime %v", level, currentPollTime, newPollTime)
2,747✔
602
}
603

604
// setupBackoffTimer will trigger a poll for the specified processing queue collection
605
// after a certain period of (real) time. This means for standby timer, even if the cluster time
606
// has not been updated, the poll will still be triggered when the timer fired. Use this function
607
// for delaying the load for processing queue. If a poll should be triggered immediately
608
// use upsertPollTime.
609
func (t *timerQueueProcessorBase) setupBackoffTimer(level int) {
×
610
        t.pollTimeLock.Lock()
×
611
        defer t.pollTimeLock.Unlock()
×
612

×
613
        if _, ok := t.backoffTimer[level]; ok {
×
614
                // honor existing backoff timer
×
615
                return
×
616
        }
×
617

618
        t.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
×
619
        t.logger.Info("Throttled processing queue", tag.QueueLevel(level))
×
620
        backoffDuration := backoff.JitDuration(
×
621
                t.options.PollBackoffInterval(),
×
622
                t.options.PollBackoffIntervalJitterCoefficient(),
×
623
        )
×
624
        t.backoffTimer[level] = time.AfterFunc(backoffDuration, func() {
×
625
                select {
×
626
                case <-t.shutdownCh:
×
627
                        return
×
628
                default:
×
629
                }
630

631
                t.pollTimeLock.Lock()
×
632
                defer t.pollTimeLock.Unlock()
×
633

×
634
                t.nextPollTime[level] = time.Time{}
×
635
                t.timerGate.Update(time.Time{})
×
636
                delete(t.backoffTimer, level)
×
637
        })
638
}
639

640
func newTimerTaskKey(visibilityTimestamp time.Time, taskID int64) task.Key {
8,010✔
641
        return timerTaskKey{
8,010✔
642
                visibilityTimestamp: visibilityTimestamp,
8,010✔
643
                taskID:              taskID,
8,010✔
644
        }
8,010✔
645
}
8,010✔
646

647
func (k timerTaskKey) Less(
648
        key task.Key,
649
) bool {
29,800✔
650
        timerKey := key.(timerTaskKey)
29,800✔
651
        if k.visibilityTimestamp.Equal(timerKey.visibilityTimestamp) {
30,511✔
652
                return k.taskID < timerKey.taskID
711✔
653
        }
711✔
654
        return k.visibilityTimestamp.Before(timerKey.visibilityTimestamp)
29,092✔
655
}
656

657
func (k timerTaskKey) String() string {
2,426✔
658
        return fmt.Sprintf("{visibilityTimestamp: %v, taskID: %v}", k.visibilityTimestamp, k.taskID)
2,426✔
659
}
2,426✔
660

661
func newTimerQueueProcessorOptions(
662
        config *config.Config,
663
        isActive bool,
664
        isFailover bool,
665
) *queueProcessorOptions {
165✔
666
        options := &queueProcessorOptions{
165✔
667
                BatchSize:                            config.TimerTaskBatchSize,
165✔
668
                DeleteBatchSize:                      config.TimerTaskDeleteBatchSize,
165✔
669
                MaxPollRPS:                           config.TimerProcessorMaxPollRPS,
165✔
670
                MaxPollInterval:                      config.TimerProcessorMaxPollInterval,
165✔
671
                MaxPollIntervalJitterCoefficient:     config.TimerProcessorMaxPollIntervalJitterCoefficient,
165✔
672
                UpdateAckInterval:                    config.TimerProcessorUpdateAckInterval,
165✔
673
                UpdateAckIntervalJitterCoefficient:   config.TimerProcessorUpdateAckIntervalJitterCoefficient,
165✔
674
                RedispatchIntervalJitterCoefficient:  config.TaskRedispatchIntervalJitterCoefficient,
165✔
675
                MaxRedispatchQueueSize:               config.TimerProcessorMaxRedispatchQueueSize,
165✔
676
                SplitQueueInterval:                   config.TimerProcessorSplitQueueInterval,
165✔
677
                SplitQueueIntervalJitterCoefficient:  config.TimerProcessorSplitQueueIntervalJitterCoefficient,
165✔
678
                PollBackoffInterval:                  config.QueueProcessorPollBackoffInterval,
165✔
679
                PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
165✔
680
                EnableGracefulSyncShutdown:           config.QueueProcessorEnableGracefulSyncShutdown,
165✔
681
        }
165✔
682

165✔
683
        if isFailover {
165✔
684
                // disable queue split for failover processor
×
685
                options.EnableSplit = dynamicconfig.GetBoolPropertyFn(false)
×
686

×
687
                // disable persist and load processing queue states for failover processor as it will never be split
×
688
                options.EnablePersistQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
689
                options.EnableLoadQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
690

×
691
                options.MaxStartJitterInterval = config.TimerProcessorFailoverMaxStartJitterInterval
×
692
        } else {
165✔
693
                options.EnableSplit = config.QueueProcessorEnableSplit
165✔
694
                options.SplitMaxLevel = config.QueueProcessorSplitMaxLevel
165✔
695
                options.EnableRandomSplitByDomainID = config.QueueProcessorEnableRandomSplitByDomainID
165✔
696
                options.RandomSplitProbability = config.QueueProcessorRandomSplitProbability
165✔
697
                options.EnablePendingTaskSplitByDomainID = config.QueueProcessorEnablePendingTaskSplitByDomainID
165✔
698
                options.PendingTaskSplitThreshold = config.QueueProcessorPendingTaskSplitThreshold
165✔
699
                options.EnableStuckTaskSplitByDomainID = config.QueueProcessorEnableStuckTaskSplitByDomainID
165✔
700
                options.StuckTaskSplitThreshold = config.QueueProcessorStuckTaskSplitThreshold
165✔
701
                options.SplitLookAheadDurationByDomainID = config.QueueProcessorSplitLookAheadDurationByDomainID
165✔
702

165✔
703
                options.EnablePersistQueueStates = config.QueueProcessorEnablePersistQueueStates
165✔
704
                options.EnableLoadQueueStates = config.QueueProcessorEnableLoadQueueStates
165✔
705

165✔
706
                options.MaxStartJitterInterval = dynamicconfig.GetDurationPropertyFn(0)
165✔
707
        }
165✔
708

709
        if isActive {
257✔
710
                options.MetricScope = metrics.TimerActiveQueueProcessorScope
92✔
711
                options.RedispatchInterval = config.ActiveTaskRedispatchInterval
92✔
712
        } else {
168✔
713
                options.MetricScope = metrics.TimerStandbyQueueProcessorScope
76✔
714
                options.RedispatchInterval = config.StandbyTaskRedispatchInterval
76✔
715
        }
76✔
716

717
        return options
165✔
718
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc