• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01875e2f-959c-4c4d-87af-1d7805759bcc

08 Apr 2023 12:26AM UTC coverage: 57.178% (+0.1%) from 57.072%
01875e2f-959c-4c4d-87af-1d7805759bcc

Pull #5197

buildkite

Steven L
bad cleanup -> good cleanup
Pull Request #5197: Demonstrate a way to get rid of the cadence-idl repo

85396 of 149351 relevant lines covered (57.18%)

2283.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.81
/service/history/queue/timer_queue_processor_base.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "fmt"
26
        "math"
27
        "math/rand"
28
        "sync"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/uber/cadence/common"
33
        "github.com/uber/cadence/common/backoff"
34
        "github.com/uber/cadence/common/dynamicconfig"
35
        "github.com/uber/cadence/common/log"
36
        "github.com/uber/cadence/common/log/tag"
37
        "github.com/uber/cadence/common/metrics"
38
        "github.com/uber/cadence/common/persistence"
39
        "github.com/uber/cadence/service/history/config"
40
        "github.com/uber/cadence/service/history/shard"
41
        "github.com/uber/cadence/service/history/task"
42
)
43

44
var (
45
        maximumTimerTaskKey = newTimerTaskKey(
46
                time.Unix(0, math.MaxInt64),
47
                0,
48
        )
49
)
50

51
type (
52
        timerTaskKey struct {
53
                visibilityTimestamp time.Time
54
                taskID              int64
55
        }
56

57
        timeTaskReadProgress struct {
58
                currentQueue  ProcessingQueue
59
                readLevel     task.Key
60
                maxReadLevel  task.Key
61
                nextPageToken []byte
62
        }
63

64
        timerQueueProcessorBase struct {
65
                *processorBase
66

67
                taskInitializer task.Initializer
68

69
                clusterName string
70

71
                pollTimeLock sync.Mutex
72
                backoffTimer map[int]*time.Timer
73
                nextPollTime map[int]time.Time
74
                timerGate    TimerGate
75

76
                // timer notification
77
                newTimerCh  chan struct{}
78
                newTimeLock sync.Mutex
79
                newTime     time.Time
80

81
                processingQueueReadProgress map[int]timeTaskReadProgress
82
        }
83
)
84

85
func newTimerQueueProcessorBase(
86
        clusterName string,
87
        shard shard.Context,
88
        processingQueueStates []ProcessingQueueState,
89
        taskProcessor task.Processor,
90
        timerGate TimerGate,
91
        options *queueProcessorOptions,
92
        updateMaxReadLevel updateMaxReadLevelFn,
93
        updateClusterAckLevel updateClusterAckLevelFn,
94
        updateProcessingQueueStates updateProcessingQueueStatesFn,
95
        queueShutdown queueShutdownFn,
96
        taskFilter task.Filter,
97
        taskExecutor task.Executor,
98
        logger log.Logger,
99
        metricsClient metrics.Client,
100
) *timerQueueProcessorBase {
113✔
101
        processorBase := newProcessorBase(
113✔
102
                shard,
113✔
103
                processingQueueStates,
113✔
104
                taskProcessor,
113✔
105
                options,
113✔
106
                updateMaxReadLevel,
113✔
107
                updateClusterAckLevel,
113✔
108
                updateProcessingQueueStates,
113✔
109
                queueShutdown,
113✔
110
                logger.WithTags(tag.ComponentTimerQueue),
113✔
111
                metricsClient,
113✔
112
        )
113✔
113

113✔
114
        queueType := task.QueueTypeActiveTimer
113✔
115
        if options.MetricScope == metrics.TimerStandbyQueueProcessorScope {
164✔
116
                queueType = task.QueueTypeStandbyTimer
51✔
117
        }
51✔
118

119
        return &timerQueueProcessorBase{
113✔
120
                processorBase: processorBase,
113✔
121

113✔
122
                taskInitializer: func(taskInfo task.Info) task.Task {
2,481✔
123
                        return task.NewTimerTask(
2,368✔
124
                                shard,
2,368✔
125
                                taskInfo,
2,368✔
126
                                queueType,
2,368✔
127
                                task.InitializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
2,368✔
128
                                taskFilter,
2,368✔
129
                                taskExecutor,
2,368✔
130
                                taskProcessor,
2,368✔
131
                                processorBase.redispatcher.AddTask,
2,368✔
132
                                shard.GetConfig().TaskCriticalRetryCount,
2,368✔
133
                        )
2,368✔
134
                },
2,368✔
135

136
                clusterName: clusterName,
137

138
                backoffTimer: make(map[int]*time.Timer),
139
                nextPollTime: make(map[int]time.Time),
140
                timerGate:    timerGate,
141

142
                newTimerCh: make(chan struct{}, 1),
143

144
                processingQueueReadProgress: make(map[int]timeTaskReadProgress),
145
        }
146
}
147

148
func (t *timerQueueProcessorBase) Start() {
99✔
149
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
99✔
150
                return
×
151
        }
×
152

153
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarting)
99✔
154
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarted)
99✔
155

99✔
156
        t.redispatcher.Start()
99✔
157

99✔
158
        newPollTime := time.Time{}
99✔
159
        if startJitter := t.options.MaxStartJitterInterval(); startJitter > 0 {
99✔
160
                now := t.shard.GetTimeSource().Now()
×
161
                newPollTime = now.Add(time.Duration(rand.Int63n(int64(startJitter))))
×
162
        }
×
163
        for _, queueCollections := range t.processingQueueCollections {
198✔
164
                t.upsertPollTime(queueCollections.Level(), newPollTime)
99✔
165
        }
99✔
166

167
        t.shutdownWG.Add(1)
99✔
168
        go t.processorPump()
99✔
169
}
170

171
func (t *timerQueueProcessorBase) Stop() {
99✔
172
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
99✔
173
                return
×
174
        }
×
175

176
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopping)
99✔
177
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopped)
99✔
178

99✔
179
        t.timerGate.Close()
99✔
180
        close(t.shutdownCh)
99✔
181
        t.pollTimeLock.Lock()
99✔
182
        for _, timer := range t.backoffTimer {
99✔
183
                timer.Stop()
×
184
        }
×
185
        t.pollTimeLock.Unlock()
99✔
186

99✔
187
        if success := common.AwaitWaitGroup(&t.shutdownWG, time.Minute); !success {
99✔
188
                t.logger.Warn("", tag.LifeCycleStopTimedout)
×
189
        }
×
190

191
        t.redispatcher.Stop()
99✔
192
}
193

194
func (t *timerQueueProcessorBase) processorPump() {
99✔
195
        defer t.shutdownWG.Done()
99✔
196

99✔
197
        updateAckTimer := time.NewTimer(backoff.JitDuration(
99✔
198
                t.options.UpdateAckInterval(),
99✔
199
                t.options.UpdateAckIntervalJitterCoefficient(),
99✔
200
        ))
99✔
201
        defer updateAckTimer.Stop()
99✔
202

99✔
203
        splitQueueTimer := time.NewTimer(backoff.JitDuration(
99✔
204
                t.options.SplitQueueInterval(),
99✔
205
                t.options.SplitQueueIntervalJitterCoefficient(),
99✔
206
        ))
99✔
207
        defer splitQueueTimer.Stop()
99✔
208

99✔
209
processorPumpLoop:
99✔
210
        for {
6,615✔
211
                select {
6,516✔
212
                case <-t.shutdownCh:
99✔
213
                        break processorPumpLoop
99✔
214
                case <-t.timerGate.FireChan():
2,401✔
215
                        maxRedispatchQueueSize := t.options.MaxRedispatchQueueSize()
2,401✔
216
                        if t.redispatcher.Size() > maxRedispatchQueueSize {
2,401✔
217
                                t.redispatcher.Redispatch(maxRedispatchQueueSize)
×
218
                                if t.redispatcher.Size() > maxRedispatchQueueSize {
×
219
                                        // if redispatcher still has a large number of tasks
×
220
                                        // this only happens when system is under very high load
×
221
                                        // we should backoff here instead of keeping submitting tasks to task processor
×
222
                                        // don't call t.timerGate.Update(time.Now() + loadQueueTaskThrottleRetryDelay) as the time in
×
223
                                        // standby timer processor is not real time and is managed separately
×
224
                                        time.Sleep(backoff.JitDuration(
×
225
                                                t.options.PollBackoffInterval(),
×
226
                                                t.options.PollBackoffIntervalJitterCoefficient(),
×
227
                                        ))
×
228
                                }
×
229
                                t.timerGate.Update(time.Time{})
×
230
                                continue processorPumpLoop
×
231
                        }
232

233
                        t.pollTimeLock.Lock()
2,401✔
234
                        levels := make(map[int]struct{})
2,401✔
235
                        now := t.shard.GetCurrentTime(t.clusterName)
2,401✔
236
                        for level, pollTime := range t.nextPollTime {
4,802✔
237
                                if !now.Before(pollTime) {
4,800✔
238
                                        levels[level] = struct{}{}
2,399✔
239
                                        delete(t.nextPollTime, level)
2,399✔
240
                                } else {
2,401✔
241
                                        t.timerGate.Update(pollTime)
2✔
242
                                }
2✔
243
                        }
244
                        t.pollTimeLock.Unlock()
2,401✔
245

2,401✔
246
                        t.processQueueCollections(levels)
2,401✔
247
                case <-updateAckTimer.C:
492✔
248
                        processFinished, _, err := t.updateAckLevel()
492✔
249
                        if err == shard.ErrShardClosed || (err == nil && processFinished) {
492✔
250
                                go t.Stop()
×
251
                                break processorPumpLoop
×
252
                        }
253
                        updateAckTimer.Reset(backoff.JitDuration(
492✔
254
                                t.options.UpdateAckInterval(),
492✔
255
                                t.options.UpdateAckIntervalJitterCoefficient(),
492✔
256
                        ))
492✔
257
                case <-t.newTimerCh:
3,070✔
258
                        t.newTimeLock.Lock()
3,070✔
259
                        newTime := t.newTime
3,070✔
260
                        t.newTime = time.Time{}
3,070✔
261
                        t.newTimeLock.Unlock()
3,070✔
262

3,070✔
263
                        // New Timer has arrived.
3,070✔
264
                        t.metricsScope.IncCounter(metrics.NewTimerNotifyCounter)
3,070✔
265
                        // notify all queue collections as they are waiting for the notification when there's
3,070✔
266
                        // no more task to process. For non-default queue, we choose to do periodic polling
3,070✔
267
                        // in the future, then we don't need to notify them.
3,070✔
268
                        for _, queueCollection := range t.processingQueueCollections {
6,140✔
269
                                t.upsertPollTime(queueCollection.Level(), newTime)
3,070✔
270
                        }
3,070✔
271
                case <-splitQueueTimer.C:
231✔
272
                        t.splitQueue()
231✔
273
                        splitQueueTimer.Reset(backoff.JitDuration(
231✔
274
                                t.options.SplitQueueInterval(),
231✔
275
                                t.options.SplitQueueIntervalJitterCoefficient(),
231✔
276
                        ))
231✔
277
                case notification := <-t.actionNotifyCh:
232✔
278
                        t.handleActionNotification(notification)
232✔
279
                }
280
        }
281
}
282

283
func (t *timerQueueProcessorBase) processQueueCollections(levels map[int]struct{}) {
2,405✔
284
        for _, queueCollection := range t.processingQueueCollections {
4,810✔
285
                level := queueCollection.Level()
2,405✔
286
                if _, ok := levels[level]; !ok {
2,407✔
287
                        continue
2✔
288
                }
289

290
                activeQueue := queueCollection.ActiveQueue()
2,403✔
291
                if activeQueue == nil {
2,403✔
292
                        // process for this queue collection has finished
×
293
                        // it's possible that new queue will be added to this collection later though,
×
294
                        // pollTime will be updated after split/merge
×
295
                        continue
×
296
                }
297

298
                t.upsertPollTime(level, t.shard.GetCurrentTime(t.clusterName).Add(backoff.JitDuration(
2,403✔
299
                        t.options.MaxPollInterval(),
2,403✔
300
                        t.options.MaxPollIntervalJitterCoefficient(),
2,403✔
301
                )))
2,403✔
302

2,403✔
303
                var nextPageToken []byte
2,403✔
304
                readLevel := activeQueue.State().ReadLevel()
2,403✔
305
                maxReadLevel := minTaskKey(activeQueue.State().MaxLevel(), t.updateMaxReadLevel())
2,403✔
306
                domainFilter := activeQueue.State().DomainFilter()
2,403✔
307

2,403✔
308
                if progress, ok := t.processingQueueReadProgress[level]; ok {
2,405✔
309
                        if progress.currentQueue == activeQueue {
4✔
310
                                readLevel = progress.readLevel
2✔
311
                                maxReadLevel = progress.maxReadLevel
2✔
312
                                nextPageToken = progress.nextPageToken
2✔
313
                        }
2✔
314
                        delete(t.processingQueueReadProgress, level)
2✔
315
                }
316

317
                if !readLevel.Less(maxReadLevel) {
2,455✔
318
                        // notify timer gate about the min time
52✔
319
                        t.upsertPollTime(level, readLevel.(timerTaskKey).visibilityTimestamp)
52✔
320
                        continue
52✔
321
                }
322

323
                ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay)
2,354✔
324
                if err := t.rateLimiter.Wait(ctx); err != nil {
2,354✔
325
                        cancel()
×
326
                        if level == defaultProcessingQueueLevel {
×
327
                                t.upsertPollTime(level, time.Time{})
×
328
                        } else {
×
329
                                t.setupBackoffTimer(level)
×
330
                        }
×
331
                        continue
×
332
                }
333
                cancel()
2,354✔
334

2,354✔
335
                timerTaskInfos, lookAheadTask, nextPageToken, err := t.readAndFilterTasks(readLevel, maxReadLevel, nextPageToken)
2,354✔
336
                if err != nil {
2,354✔
337
                        t.logger.Error("Processor unable to retrieve tasks", tag.Error(err))
×
338
                        t.upsertPollTime(level, time.Time{}) // re-enqueue the event
×
339
                        continue
×
340
                }
341

342
                tasks := make(map[task.Key]task.Task)
2,354✔
343
                taskChFull := false
2,354✔
344
                for _, taskInfo := range timerTaskInfos {
4,724✔
345
                        if !domainFilter.Filter(taskInfo.GetDomainID()) {
2,372✔
346
                                continue
2✔
347
                        }
348

349
                        task := t.taskInitializer(taskInfo)
2,368✔
350
                        tasks[newTimerTaskKey(taskInfo.GetVisibilityTimestamp(), taskInfo.GetTaskID())] = task
2,368✔
351
                        submitted, err := t.submitTask(task)
2,368✔
352
                        if err != nil {
2,369✔
353
                                // only err here is due to the fact that processor has been shutdown
1✔
354
                                // return instead of continue
1✔
355
                                return
1✔
356
                        }
1✔
357
                        taskChFull = taskChFull || !submitted
2,367✔
358
                }
359

360
                var newReadLevel task.Key
2,353✔
361
                if len(nextPageToken) == 0 {
4,705✔
362
                        newReadLevel = maxReadLevel
2,352✔
363
                        if lookAheadTask != nil {
4,655✔
364
                                // lookAheadTask may exist only when nextPageToken is empty
2,303✔
365
                                // notice that lookAheadTask.VisibilityTimestamp may be larger than shard max read level,
2,303✔
366
                                // which means new tasks can be generated before that timestamp. This issue is solved by
2,303✔
367
                                // upsertPollTime whenever there are new tasks
2,303✔
368
                                t.upsertPollTime(level, lookAheadTask.VisibilityTimestamp)
2,303✔
369
                                newReadLevel = minTaskKey(newReadLevel, newTimerTaskKey(lookAheadTask.GetVisibilityTimestamp(), 0))
2,303✔
370
                        }
2,303✔
371
                        // else we have no idea when the next poll should happen
372
                        // rely on notifyNewTask to trigger the next poll even for non-default queue.
373
                        // another option for non-default queue is that we can setup a backoff timer to check back later
374
                } else {
1✔
375
                        // more tasks should be loaded for this processing queue
1✔
376
                        // record the current progress and update the poll time
1✔
377
                        if level == defaultProcessingQueueLevel || !taskChFull {
2✔
378
                                t.upsertPollTime(level, time.Time{})
1✔
379
                        } else {
1✔
380
                                t.setupBackoffTimer(level)
×
381
                        }
×
382
                        t.processingQueueReadProgress[level] = timeTaskReadProgress{
1✔
383
                                currentQueue:  activeQueue,
1✔
384
                                readLevel:     readLevel,
1✔
385
                                maxReadLevel:  maxReadLevel,
1✔
386
                                nextPageToken: nextPageToken,
1✔
387
                        }
1✔
388
                        newReadLevel = newTimerTaskKey(timerTaskInfos[len(timerTaskInfos)-1].GetVisibilityTimestamp(), 0)
1✔
389
                }
390
                queueCollection.AddTasks(tasks, newReadLevel)
2,353✔
391
        }
392
}
393

394
func (t *timerQueueProcessorBase) splitQueue() {
231✔
395
        splitPolicy := t.initializeSplitPolicy(
231✔
396
                func(key task.Key, domainID string) task.Key {
231✔
397
                        return newTimerTaskKey(
×
398
                                key.(timerTaskKey).visibilityTimestamp.Add(
×
399
                                        t.options.SplitLookAheadDurationByDomainID(domainID),
×
400
                                ),
×
401
                                0,
×
402
                        )
×
403
                },
×
404
        )
405

406
        t.splitProcessingQueueCollection(splitPolicy, t.upsertPollTime)
231✔
407
}
408

409
func (t *timerQueueProcessorBase) handleActionNotification(notification actionNotification) {
232✔
410
        t.processorBase.handleActionNotification(notification, func() {
464✔
411
                switch notification.action.ActionType {
232✔
412
                case ActionTypeReset:
×
413
                        t.upsertPollTime(defaultProcessingQueueLevel, time.Time{})
×
414
                }
415
        })
416
}
417

418
func (t *timerQueueProcessorBase) readAndFilterTasks(
419
        readLevel task.Key,
420
        maxReadLevel task.Key,
421
        nextPageToken []byte,
422
) ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, []byte, error) {
2,359✔
423
        timerTasks, nextPageToken, err := t.getTimerTasks(readLevel, maxReadLevel, nextPageToken, t.options.BatchSize())
2,359✔
424
        if err != nil {
2,359✔
425
                return nil, nil, nil, err
×
426
        }
×
427

428
        var lookAheadTask *persistence.TimerTaskInfo
2,359✔
429
        filteredTasks := []*persistence.TimerTaskInfo{}
2,359✔
430

2,359✔
431
        for _, timerTask := range timerTasks {
6,267✔
432
                if !t.isProcessNow(timerTask.GetVisibilityTimestamp()) {
5,443✔
433
                        lookAheadTask = timerTask
1,535✔
434
                        nextPageToken = nil
1,535✔
435
                        break
1,535✔
436
                }
437
                filteredTasks = append(filteredTasks, timerTask)
2,376✔
438
        }
439

440
        if len(nextPageToken) == 0 && lookAheadTask == nil {
3,184✔
441
                // only look ahead within the processing queue boundary
825✔
442
                lookAheadTask, err = t.readLookAheadTask(maxReadLevel, maximumTimerTaskKey)
825✔
443
                if err != nil {
826✔
444
                        // we don't know if look ahead task exists or not, but we know if it exists,
1✔
445
                        // it's visibility timestamp is larger than or equal to maxReadLevel.
1✔
446
                        // so, create a fake look ahead task so another load can be triggered at that time.
1✔
447
                        lookAheadTask = &persistence.TimerTaskInfo{
1✔
448
                                VisibilityTimestamp: maxReadLevel.(timerTaskKey).visibilityTimestamp,
1✔
449
                        }
1✔
450
                        return filteredTasks, lookAheadTask, nil, nil
1✔
451
                }
1✔
452
        }
453

454
        return filteredTasks, lookAheadTask, nextPageToken, nil
2,358✔
455
}
456

457
func (t *timerQueueProcessorBase) readLookAheadTask(
458
        lookAheadStartLevel task.Key,
459
        lookAheadMaxLevel task.Key,
460
) (*persistence.TimerTaskInfo, error) {
826✔
461
        tasks, _, err := t.getTimerTasks(
826✔
462
                lookAheadStartLevel,
826✔
463
                lookAheadMaxLevel,
826✔
464
                nil,
826✔
465
                1,
826✔
466
        )
826✔
467
        if err != nil {
827✔
468
                return nil, err
1✔
469
        }
1✔
470

471
        if len(tasks) == 1 {
1,600✔
472
                return tasks[0], nil
775✔
473
        }
775✔
474
        return nil, nil
53✔
475
}
476

477
func (t *timerQueueProcessorBase) getTimerTasks(
478
        readLevel task.Key,
479
        maxReadLevel task.Key,
480
        nextPageToken []byte,
481
        batchSize int,
482
) ([]*persistence.TimerTaskInfo, []byte, error) {
3,184✔
483
        request := &persistence.GetTimerIndexTasksRequest{
3,184✔
484
                MinTimestamp:  readLevel.(timerTaskKey).visibilityTimestamp,
3,184✔
485
                MaxTimestamp:  maxReadLevel.(timerTaskKey).visibilityTimestamp,
3,184✔
486
                BatchSize:     batchSize,
3,184✔
487
                NextPageToken: nextPageToken,
3,184✔
488
        }
3,184✔
489

3,184✔
490
        var err error
3,184✔
491
        var response *persistence.GetTimerIndexTasksResponse
3,184✔
492
        retryCount := t.shard.GetConfig().TimerProcessorGetFailureRetryCount()
3,184✔
493
        for attempt := 0; attempt < retryCount; attempt++ {
6,372✔
494
                response, err = t.shard.GetExecutionManager().GetTimerIndexTasks(context.Background(), request)
3,188✔
495
                if err == nil {
6,371✔
496
                        return response.Timers, response.NextPageToken, nil
3,183✔
497
                }
3,183✔
498
                backoff := time.Duration(attempt * 100)
5✔
499
                time.Sleep(backoff * time.Millisecond)
5✔
500
        }
501
        return nil, nil, err
1✔
502
}
503

504
func (t *timerQueueProcessorBase) isProcessNow(
505
        expiryTime time.Time,
506
) bool {
3,912✔
507
        if expiryTime.IsZero() {
3,913✔
508
                // return true, but somewhere probably have bug creating empty timerTask.
1✔
509
                t.logger.Warn("Timer task has timestamp zero")
1✔
510
        }
1✔
511
        return expiryTime.UnixNano() <= t.shard.GetCurrentTime(t.clusterName).UnixNano()
3,912✔
512
}
513

514
func (t *timerQueueProcessorBase) notifyNewTimers(
515
        timerTasks []persistence.Task,
516
) {
3,076✔
517
        if len(timerTasks) == 0 {
3,076✔
518
                return
×
519
        }
×
520

521
        isActive := t.options.MetricScope == metrics.TimerActiveQueueProcessorScope
3,076✔
522

3,076✔
523
        minNewTime := timerTasks[0].GetVisibilityTimestamp()
3,076✔
524
        for _, timerTask := range timerTasks {
6,248✔
525
                ts := timerTask.GetVisibilityTimestamp()
3,172✔
526
                if ts.Before(minNewTime) {
3,250✔
527
                        minNewTime = ts
78✔
528
                }
78✔
529

530
                taskScopeIdx := task.GetTimerTaskMetricScope(
3,172✔
531
                        timerTask.GetType(),
3,172✔
532
                        isActive,
3,172✔
533
                )
3,172✔
534
                t.metricsClient.IncCounter(taskScopeIdx, metrics.NewTimerCounter)
3,172✔
535
        }
536

537
        t.notifyNewTimer(minNewTime)
3,076✔
538
}
539

540
func (t *timerQueueProcessorBase) notifyNewTimer(
541
        newTime time.Time,
542
) {
3,076✔
543
        t.newTimeLock.Lock()
3,076✔
544
        defer t.newTimeLock.Unlock()
3,076✔
545

3,076✔
546
        if t.newTime.IsZero() || newTime.Before(t.newTime) {
6,147✔
547
                t.newTime = newTime
3,071✔
548
                select {
3,071✔
549
                case t.newTimerCh <- struct{}{}:
3,071✔
550
                        // Notified about new time.
551
                default:
×
552
                        // Channel "full" -> drop and move on, this will happen only if service is in high load.
553
                }
554
        }
555
}
556

557
func (t *timerQueueProcessorBase) upsertPollTime(level int, newPollTime time.Time) {
7,916✔
558
        t.pollTimeLock.Lock()
7,916✔
559
        defer t.pollTimeLock.Unlock()
7,916✔
560

7,916✔
561
        if _, ok := t.backoffTimer[level]; ok {
7,916✔
562
                // honor existing backoff timer
×
563
                return
×
564
        }
×
565

566
        if currentPollTime, ok := t.nextPollTime[level]; !ok || newPollTime.Before(currentPollTime) {
13,103✔
567
                t.nextPollTime[level] = newPollTime
5,187✔
568
                t.timerGate.Update(newPollTime)
5,187✔
569
        }
5,187✔
570
}
571

572
// setupBackoffTimer will trigger a poll for the specified processing queue collection
573
// after a certain period of (real) time. This means for standby timer, even if the cluster time
574
// has not been updated, the poll will still be triggered when the timer fired. Use this function
575
// for delaying the load for processing queue. If a poll should be triggered immediately
576
// use upsertPollTime.
577
func (t *timerQueueProcessorBase) setupBackoffTimer(level int) {
×
578
        t.pollTimeLock.Lock()
×
579
        defer t.pollTimeLock.Unlock()
×
580

×
581
        if _, ok := t.backoffTimer[level]; ok {
×
582
                // honor existing backoff timer
×
583
                return
×
584
        }
×
585

586
        t.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
×
587
        t.logger.Info("Throttled processing queue", tag.QueueLevel(level))
×
588
        backoffDuration := backoff.JitDuration(
×
589
                t.options.PollBackoffInterval(),
×
590
                t.options.PollBackoffIntervalJitterCoefficient(),
×
591
        )
×
592
        t.backoffTimer[level] = time.AfterFunc(backoffDuration, func() {
×
593
                select {
×
594
                case <-t.shutdownCh:
×
595
                        return
×
596
                default:
×
597
                }
598

599
                t.pollTimeLock.Lock()
×
600
                defer t.pollTimeLock.Unlock()
×
601

×
602
                t.nextPollTime[level] = time.Time{}
×
603
                t.timerGate.Update(time.Time{})
×
604
                delete(t.backoffTimer, level)
×
605
        })
606
}
607

608
func newTimerTaskKey(
609
        visibilityTimestamp time.Time,
610
        taskID int64,
611
) task.Key {
7,806✔
612
        return timerTaskKey{
7,806✔
613
                visibilityTimestamp: visibilityTimestamp,
7,806✔
614
                taskID:              taskID,
7,806✔
615
        }
7,806✔
616
}
7,806✔
617

618
func (k timerTaskKey) Less(
619
        key task.Key,
620
) bool {
28,495✔
621
        timerKey := key.(timerTaskKey)
28,495✔
622
        if k.visibilityTimestamp.Equal(timerKey.visibilityTimestamp) {
29,164✔
623
                return k.taskID < timerKey.taskID
669✔
624
        }
669✔
625
        return k.visibilityTimestamp.Before(timerKey.visibilityTimestamp)
27,829✔
626
}
627

628
func (k timerTaskKey) String() string {
×
629
        return fmt.Sprintf("{visibilityTimestamp: %v, taskID: %v}", k.visibilityTimestamp, k.taskID)
×
630
}
×
631

632
func newTimerQueueProcessorOptions(
633
        config *config.Config,
634
        isActive bool,
635
        isFailover bool,
636
) *queueProcessorOptions {
113✔
637
        options := &queueProcessorOptions{
113✔
638
                BatchSize:                            config.TimerTaskBatchSize,
113✔
639
                DeleteBatchSize:                      config.TimerTaskDeleteBatchSize,
113✔
640
                MaxPollRPS:                           config.TimerProcessorMaxPollRPS,
113✔
641
                MaxPollInterval:                      config.TimerProcessorMaxPollInterval,
113✔
642
                MaxPollIntervalJitterCoefficient:     config.TimerProcessorMaxPollIntervalJitterCoefficient,
113✔
643
                UpdateAckInterval:                    config.TimerProcessorUpdateAckInterval,
113✔
644
                UpdateAckIntervalJitterCoefficient:   config.TimerProcessorUpdateAckIntervalJitterCoefficient,
113✔
645
                RedispatchIntervalJitterCoefficient:  config.TaskRedispatchIntervalJitterCoefficient,
113✔
646
                MaxRedispatchQueueSize:               config.TimerProcessorMaxRedispatchQueueSize,
113✔
647
                SplitQueueInterval:                   config.TimerProcessorSplitQueueInterval,
113✔
648
                SplitQueueIntervalJitterCoefficient:  config.TimerProcessorSplitQueueIntervalJitterCoefficient,
113✔
649
                PollBackoffInterval:                  config.QueueProcessorPollBackoffInterval,
113✔
650
                PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
113✔
651
        }
113✔
652

113✔
653
        if isFailover {
113✔
654
                // disable queue split for failover processor
×
655
                options.EnableSplit = dynamicconfig.GetBoolPropertyFn(false)
×
656

×
657
                // disable persist and load processing queue states for failover processor as it will never be split
×
658
                options.EnablePersistQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
659
                options.EnableLoadQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
660

×
661
                options.MaxStartJitterInterval = config.TimerProcessorFailoverMaxStartJitterInterval
×
662
        } else {
113✔
663
                options.EnableSplit = config.QueueProcessorEnableSplit
113✔
664
                options.SplitMaxLevel = config.QueueProcessorSplitMaxLevel
113✔
665
                options.EnableRandomSplitByDomainID = config.QueueProcessorEnableRandomSplitByDomainID
113✔
666
                options.RandomSplitProbability = config.QueueProcessorRandomSplitProbability
113✔
667
                options.EnablePendingTaskSplitByDomainID = config.QueueProcessorEnablePendingTaskSplitByDomainID
113✔
668
                options.PendingTaskSplitThreshold = config.QueueProcessorPendingTaskSplitThreshold
113✔
669
                options.EnableStuckTaskSplitByDomainID = config.QueueProcessorEnableStuckTaskSplitByDomainID
113✔
670
                options.StuckTaskSplitThreshold = config.QueueProcessorStuckTaskSplitThreshold
113✔
671
                options.SplitLookAheadDurationByDomainID = config.QueueProcessorSplitLookAheadDurationByDomainID
113✔
672

113✔
673
                options.EnablePersistQueueStates = config.QueueProcessorEnablePersistQueueStates
113✔
674
                options.EnableLoadQueueStates = config.QueueProcessorEnableLoadQueueStates
113✔
675

113✔
676
                options.MaxStartJitterInterval = dynamicconfig.GetDurationPropertyFn(0)
113✔
677
        }
113✔
678

679
        if isActive {
178✔
680
                options.MetricScope = metrics.TimerActiveQueueProcessorScope
65✔
681
                options.RedispatchInterval = config.ActiveTaskRedispatchInterval
65✔
682
        } else {
116✔
683
                options.MetricScope = metrics.TimerStandbyQueueProcessorScope
51✔
684
                options.RedispatchInterval = config.StandbyTaskRedispatchInterval
51✔
685
        }
51✔
686

687
        return options
113✔
688
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc