• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01894bf6-a954-4eb3-a178-a38598dfe3b0

12 Jul 2023 09:16PM UTC coverage: 57.228% (-0.06%) from 57.284%
01894bf6-a954-4eb3-a178-a38598dfe3b0

push

buildkite

web-flow
[CLI] add domain migration command with domain metadata checker (#5335)

Added a domain migration command. Currently, it checks the domain metadata and long running workflows.

To ensure both domains exist before domain migration happens.
To ensure domain doesn't have long running workflows that migration cannot handle.

How did you test it?
tested locally with docker compose

103 of 103 new or added lines in 3 files covered. (100.0%)

87149 of 152284 relevant lines covered (57.23%)

2494.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.48
/service/history/queue/timer_queue_processor_base.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "fmt"
26
        "math"
27
        "math/rand"
28
        "sync"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/uber/cadence/common"
33
        "github.com/uber/cadence/common/backoff"
34
        "github.com/uber/cadence/common/dynamicconfig"
35
        "github.com/uber/cadence/common/log"
36
        "github.com/uber/cadence/common/log/tag"
37
        "github.com/uber/cadence/common/metrics"
38
        "github.com/uber/cadence/common/persistence"
39
        "github.com/uber/cadence/service/history/config"
40
        "github.com/uber/cadence/service/history/shard"
41
        "github.com/uber/cadence/service/history/task"
42
)
43

44
var (
45
        maximumTimerTaskKey = newTimerTaskKey(
46
                time.Unix(0, math.MaxInt64),
47
                0,
48
        )
49
)
50

51
type (
52
        timerTaskKey struct {
53
                visibilityTimestamp time.Time
54
                taskID              int64
55
        }
56

57
        timeTaskReadProgress struct {
58
                currentQueue  ProcessingQueue
59
                readLevel     task.Key
60
                maxReadLevel  task.Key
61
                nextPageToken []byte
62
        }
63

64
        timerQueueProcessorBase struct {
65
                *processorBase
66

67
                taskInitializer task.Initializer
68

69
                clusterName string
70

71
                pollTimeLock sync.Mutex
72
                backoffTimer map[int]*time.Timer
73
                nextPollTime map[int]time.Time
74
                timerGate    TimerGate
75

76
                // timer notification
77
                newTimerCh  chan struct{}
78
                newTimeLock sync.Mutex
79
                newTime     time.Time
80

81
                processingQueueReadProgress map[int]timeTaskReadProgress
82
        }
83
)
84

85
func newTimerQueueProcessorBase(
86
        clusterName string,
87
        shard shard.Context,
88
        processingQueueStates []ProcessingQueueState,
89
        taskProcessor task.Processor,
90
        timerGate TimerGate,
91
        options *queueProcessorOptions,
92
        updateMaxReadLevel updateMaxReadLevelFn,
93
        updateClusterAckLevel updateClusterAckLevelFn,
94
        updateProcessingQueueStates updateProcessingQueueStatesFn,
95
        queueShutdown queueShutdownFn,
96
        taskFilter task.Filter,
97
        taskExecutor task.Executor,
98
        logger log.Logger,
99
        metricsClient metrics.Client,
100
) *timerQueueProcessorBase {
113✔
101
        processorBase := newProcessorBase(
113✔
102
                shard,
113✔
103
                processingQueueStates,
113✔
104
                taskProcessor,
113✔
105
                options,
113✔
106
                updateMaxReadLevel,
113✔
107
                updateClusterAckLevel,
113✔
108
                updateProcessingQueueStates,
113✔
109
                queueShutdown,
113✔
110
                logger.WithTags(tag.ComponentTimerQueue),
113✔
111
                metricsClient,
113✔
112
        )
113✔
113

113✔
114
        queueType := task.QueueTypeActiveTimer
113✔
115
        if options.MetricScope == metrics.TimerStandbyQueueProcessorScope {
164✔
116
                queueType = task.QueueTypeStandbyTimer
51✔
117
        }
51✔
118

119
        return &timerQueueProcessorBase{
113✔
120
                processorBase: processorBase,
113✔
121

113✔
122
                taskInitializer: func(taskInfo task.Info) task.Task {
2,503✔
123
                        return task.NewTimerTask(
2,390✔
124
                                shard,
2,390✔
125
                                taskInfo,
2,390✔
126
                                queueType,
2,390✔
127
                                task.InitializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
2,390✔
128
                                taskFilter,
2,390✔
129
                                taskExecutor,
2,390✔
130
                                taskProcessor,
2,390✔
131
                                processorBase.redispatcher.AddTask,
2,390✔
132
                                shard.GetConfig().TaskCriticalRetryCount,
2,390✔
133
                        )
2,390✔
134
                },
2,390✔
135

136
                clusterName: clusterName,
137

138
                backoffTimer: make(map[int]*time.Timer),
139
                nextPollTime: make(map[int]time.Time),
140
                timerGate:    timerGate,
141

142
                newTimerCh: make(chan struct{}, 1),
143

144
                processingQueueReadProgress: make(map[int]timeTaskReadProgress),
145
        }
146
}
147

148
func (t *timerQueueProcessorBase) Start() {
99✔
149
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
99✔
150
                return
×
151
        }
×
152

153
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarting)
99✔
154
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStarted)
99✔
155

99✔
156
        t.redispatcher.Start()
99✔
157

99✔
158
        newPollTime := time.Time{}
99✔
159
        if startJitter := t.options.MaxStartJitterInterval(); startJitter > 0 {
99✔
160
                now := t.shard.GetTimeSource().Now()
×
161
                newPollTime = now.Add(time.Duration(rand.Int63n(int64(startJitter))))
×
162
        }
×
163
        for _, queueCollections := range t.processingQueueCollections {
198✔
164
                t.upsertPollTime(queueCollections.Level(), newPollTime)
99✔
165
        }
99✔
166

167
        t.shutdownWG.Add(1)
99✔
168
        go t.processorPump()
99✔
169
}
170

171
func (t *timerQueueProcessorBase) Stop() {
99✔
172
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
99✔
173
                return
×
174
        }
×
175

176
        t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopping)
99✔
177
        defer t.logger.Info("Timer queue processor state changed", tag.LifeCycleStopped)
99✔
178

99✔
179
        t.timerGate.Close()
99✔
180
        close(t.shutdownCh)
99✔
181
        t.pollTimeLock.Lock()
99✔
182
        for _, timer := range t.backoffTimer {
99✔
183
                timer.Stop()
×
184
        }
×
185
        t.pollTimeLock.Unlock()
99✔
186

99✔
187
        if success := common.AwaitWaitGroup(&t.shutdownWG, time.Minute); !success {
99✔
188
                t.logger.Warn("", tag.LifeCycleStopTimedout)
×
189
        }
×
190

191
        t.redispatcher.Stop()
99✔
192
}
193

194
func (t *timerQueueProcessorBase) processorPump() {
99✔
195
        defer t.shutdownWG.Done()
99✔
196

99✔
197
        updateAckTimer := time.NewTimer(backoff.JitDuration(
99✔
198
                t.options.UpdateAckInterval(),
99✔
199
                t.options.UpdateAckIntervalJitterCoefficient(),
99✔
200
        ))
99✔
201
        defer updateAckTimer.Stop()
99✔
202

99✔
203
        splitQueueTimer := time.NewTimer(backoff.JitDuration(
99✔
204
                t.options.SplitQueueInterval(),
99✔
205
                t.options.SplitQueueIntervalJitterCoefficient(),
99✔
206
        ))
99✔
207
        defer splitQueueTimer.Stop()
99✔
208

99✔
209
processorPumpLoop:
99✔
210
        for {
6,656✔
211
                select {
6,557✔
212
                case <-t.shutdownCh:
99✔
213
                        break processorPumpLoop
99✔
214
                case <-t.timerGate.FireChan():
2,425✔
215
                        maxRedispatchQueueSize := t.options.MaxRedispatchQueueSize()
2,425✔
216
                        if t.redispatcher.Size() > maxRedispatchQueueSize {
2,425✔
217
                                t.redispatcher.Redispatch(maxRedispatchQueueSize)
×
218
                                if t.redispatcher.Size() > maxRedispatchQueueSize {
×
219
                                        // if redispatcher still has a large number of tasks
×
220
                                        // this only happens when system is under very high load
×
221
                                        // we should backoff here instead of keeping submitting tasks to task processor
×
222
                                        // don't call t.timerGate.Update(time.Now() + loadQueueTaskThrottleRetryDelay) as the time in
×
223
                                        // standby timer processor is not real time and is managed separately
×
224
                                        time.Sleep(backoff.JitDuration(
×
225
                                                t.options.PollBackoffInterval(),
×
226
                                                t.options.PollBackoffIntervalJitterCoefficient(),
×
227
                                        ))
×
228
                                }
×
229
                                t.timerGate.Update(time.Time{})
×
230
                                continue processorPumpLoop
×
231
                        }
232

233
                        t.pollTimeLock.Lock()
2,425✔
234
                        levels := make(map[int]struct{})
2,425✔
235
                        now := t.shard.GetCurrentTime(t.clusterName)
2,425✔
236
                        for level, pollTime := range t.nextPollTime {
4,850✔
237
                                if !now.Before(pollTime) {
4,850✔
238
                                        levels[level] = struct{}{}
2,425✔
239
                                        delete(t.nextPollTime, level)
2,425✔
240
                                } else {
2,425✔
241
                                        t.timerGate.Update(pollTime)
×
242
                                }
×
243
                        }
244
                        t.pollTimeLock.Unlock()
2,425✔
245

2,425✔
246
                        t.processQueueCollections(levels)
2,425✔
247
                case <-updateAckTimer.C:
502✔
248
                        processFinished, _, err := t.updateAckLevel()
502✔
249
                        if err == shard.ErrShardClosed || (err == nil && processFinished) {
502✔
250
                                go t.Stop()
×
251
                                break processorPumpLoop
×
252
                        }
253
                        updateAckTimer.Reset(backoff.JitDuration(
502✔
254
                                t.options.UpdateAckInterval(),
502✔
255
                                t.options.UpdateAckIntervalJitterCoefficient(),
502✔
256
                        ))
502✔
257
                case <-t.newTimerCh:
3,065✔
258
                        t.newTimeLock.Lock()
3,065✔
259
                        newTime := t.newTime
3,065✔
260
                        t.newTime = time.Time{}
3,065✔
261
                        t.newTimeLock.Unlock()
3,065✔
262

3,065✔
263
                        // New Timer has arrived.
3,065✔
264
                        t.metricsScope.IncCounter(metrics.NewTimerNotifyCounter)
3,065✔
265
                        // notify all queue collections as they are waiting for the notification when there's
3,065✔
266
                        // no more task to process. For non-default queue, we choose to do periodic polling
3,065✔
267
                        // in the future, then we don't need to notify them.
3,065✔
268
                        for _, queueCollection := range t.processingQueueCollections {
6,130✔
269
                                t.upsertPollTime(queueCollection.Level(), newTime)
3,065✔
270
                        }
3,065✔
271
                case <-splitQueueTimer.C:
235✔
272
                        t.splitQueue()
235✔
273
                        splitQueueTimer.Reset(backoff.JitDuration(
235✔
274
                                t.options.SplitQueueInterval(),
235✔
275
                                t.options.SplitQueueIntervalJitterCoefficient(),
235✔
276
                        ))
235✔
277
                case notification := <-t.actionNotifyCh:
240✔
278
                        t.handleActionNotification(notification)
240✔
279
                }
280
        }
281
}
282

283
func (t *timerQueueProcessorBase) processQueueCollections(levels map[int]struct{}) {
2,429✔
284
        for _, queueCollection := range t.processingQueueCollections {
4,858✔
285
                level := queueCollection.Level()
2,429✔
286
                if _, ok := levels[level]; !ok {
2,429✔
287
                        continue
×
288
                }
289

290
                activeQueue := queueCollection.ActiveQueue()
2,429✔
291
                if activeQueue == nil {
2,429✔
292
                        // process for this queue collection has finished
×
293
                        // it's possible that new queue will be added to this collection later though,
×
294
                        // pollTime will be updated after split/merge
×
295
                        continue
×
296
                }
297

298
                t.upsertPollTime(level, t.shard.GetCurrentTime(t.clusterName).Add(backoff.JitDuration(
2,429✔
299
                        t.options.MaxPollInterval(),
2,429✔
300
                        t.options.MaxPollIntervalJitterCoefficient(),
2,429✔
301
                )))
2,429✔
302

2,429✔
303
                var nextPageToken []byte
2,429✔
304
                readLevel := activeQueue.State().ReadLevel()
2,429✔
305
                maxReadLevel := minTaskKey(activeQueue.State().MaxLevel(), t.updateMaxReadLevel())
2,429✔
306
                domainFilter := activeQueue.State().DomainFilter()
2,429✔
307

2,429✔
308
                if progress, ok := t.processingQueueReadProgress[level]; ok {
2,431✔
309
                        if progress.currentQueue == activeQueue {
4✔
310
                                readLevel = progress.readLevel
2✔
311
                                maxReadLevel = progress.maxReadLevel
2✔
312
                                nextPageToken = progress.nextPageToken
2✔
313
                        }
2✔
314
                        delete(t.processingQueueReadProgress, level)
2✔
315
                }
316

317
                if !readLevel.Less(maxReadLevel) {
2,481✔
318
                        // notify timer gate about the min time
52✔
319
                        t.upsertPollTime(level, readLevel.(timerTaskKey).visibilityTimestamp)
52✔
320
                        continue
52✔
321
                }
322

323
                ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay)
2,380✔
324
                if err := t.rateLimiter.Wait(ctx); err != nil {
2,380✔
325
                        cancel()
×
326
                        if level == defaultProcessingQueueLevel {
×
327
                                t.upsertPollTime(level, time.Time{})
×
328
                        } else {
×
329
                                t.setupBackoffTimer(level)
×
330
                        }
×
331
                        continue
×
332
                }
333
                cancel()
2,380✔
334

2,380✔
335
                timerTaskInfos, lookAheadTask, nextPageToken, err := t.readAndFilterTasks(readLevel, maxReadLevel, nextPageToken)
2,380✔
336
                if err != nil {
2,380✔
337
                        t.logger.Error("Processor unable to retrieve tasks", tag.Error(err))
×
338
                        t.upsertPollTime(level, time.Time{}) // re-enqueue the event
×
339
                        continue
×
340
                }
341

342
                tasks := make(map[task.Key]task.Task)
2,380✔
343
                taskChFull := false
2,380✔
344
                for _, taskInfo := range timerTaskInfos {
4,772✔
345
                        if !domainFilter.Filter(taskInfo.GetDomainID()) {
2,394✔
346
                                continue
2✔
347
                        }
348

349
                        task := t.taskInitializer(taskInfo)
2,390✔
350
                        tasks[newTimerTaskKey(taskInfo.GetVisibilityTimestamp(), taskInfo.GetTaskID())] = task
2,390✔
351
                        submitted, err := t.submitTask(task)
2,390✔
352
                        if err != nil {
2,390✔
353
                                // only err here is due to the fact that processor has been shutdown
×
354
                                // return instead of continue
×
355
                                return
×
356
                        }
×
357
                        taskChFull = taskChFull || !submitted
2,390✔
358
                }
359

360
                var newReadLevel task.Key
2,380✔
361
                if len(nextPageToken) == 0 {
4,759✔
362
                        newReadLevel = maxReadLevel
2,379✔
363
                        if lookAheadTask != nil {
4,709✔
364
                                // lookAheadTask may exist only when nextPageToken is empty
2,330✔
365
                                // notice that lookAheadTask.VisibilityTimestamp may be larger than shard max read level,
2,330✔
366
                                // which means new tasks can be generated before that timestamp. This issue is solved by
2,330✔
367
                                // upsertPollTime whenever there are new tasks
2,330✔
368
                                t.upsertPollTime(level, lookAheadTask.VisibilityTimestamp)
2,330✔
369
                                newReadLevel = minTaskKey(newReadLevel, newTimerTaskKey(lookAheadTask.GetVisibilityTimestamp(), 0))
2,330✔
370
                        }
2,330✔
371
                        // else we have no idea when the next poll should happen
372
                        // rely on notifyNewTask to trigger the next poll even for non-default queue.
373
                        // another option for non-default queue is that we can setup a backoff timer to check back later
374
                } else {
1✔
375
                        // more tasks should be loaded for this processing queue
1✔
376
                        // record the current progress and update the poll time
1✔
377
                        if level == defaultProcessingQueueLevel || !taskChFull {
2✔
378
                                t.upsertPollTime(level, time.Time{})
1✔
379
                        } else {
1✔
380
                                t.setupBackoffTimer(level)
×
381
                        }
×
382
                        t.processingQueueReadProgress[level] = timeTaskReadProgress{
1✔
383
                                currentQueue:  activeQueue,
1✔
384
                                readLevel:     readLevel,
1✔
385
                                maxReadLevel:  maxReadLevel,
1✔
386
                                nextPageToken: nextPageToken,
1✔
387
                        }
1✔
388
                        newReadLevel = newTimerTaskKey(timerTaskInfos[len(timerTaskInfos)-1].GetVisibilityTimestamp(), 0)
1✔
389
                }
390
                queueCollection.AddTasks(tasks, newReadLevel)
2,380✔
391
        }
392
}
393

394
func (t *timerQueueProcessorBase) splitQueue() {
235✔
395
        splitPolicy := t.initializeSplitPolicy(
235✔
396
                func(key task.Key, domainID string) task.Key {
235✔
397
                        return newTimerTaskKey(
×
398
                                key.(timerTaskKey).visibilityTimestamp.Add(
×
399
                                        t.options.SplitLookAheadDurationByDomainID(domainID),
×
400
                                ),
×
401
                                0,
×
402
                        )
×
403
                },
×
404
        )
405

406
        t.splitProcessingQueueCollection(splitPolicy, t.upsertPollTime)
235✔
407
}
408

409
func (t *timerQueueProcessorBase) handleActionNotification(notification actionNotification) {
240✔
410
        t.processorBase.handleActionNotification(notification, func() {
480✔
411
                switch notification.action.ActionType {
240✔
412
                case ActionTypeReset:
×
413
                        t.upsertPollTime(defaultProcessingQueueLevel, time.Time{})
×
414
                }
415
        })
416
}
417

418
func (t *timerQueueProcessorBase) readAndFilterTasks(
419
        readLevel task.Key,
420
        maxReadLevel task.Key,
421
        nextPageToken []byte,
422
) ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, []byte, error) {
2,385✔
423
        timerTasks, nextPageToken, err := t.getTimerTasks(readLevel, maxReadLevel, nextPageToken, t.options.BatchSize())
2,385✔
424
        if err != nil {
2,385✔
425
                return nil, nil, nil, err
×
426
        }
×
427

428
        var lookAheadTask *persistence.TimerTaskInfo
2,385✔
429
        filteredTasks := []*persistence.TimerTaskInfo{}
2,385✔
430

2,385✔
431
        for _, timerTask := range timerTasks {
6,307✔
432
                if !t.isProcessNow(timerTask.GetVisibilityTimestamp()) {
5,449✔
433
                        lookAheadTask = timerTask
1,527✔
434
                        nextPageToken = nil
1,527✔
435
                        break
1,527✔
436
                }
437
                filteredTasks = append(filteredTasks, timerTask)
2,398✔
438
        }
439

440
        if len(nextPageToken) == 0 && lookAheadTask == nil {
3,244✔
441
                // only look ahead within the processing queue boundary
859✔
442
                lookAheadTask, err = t.readLookAheadTask(maxReadLevel, maximumTimerTaskKey)
859✔
443
                if err != nil {
860✔
444
                        // we don't know if look ahead task exists or not, but we know if it exists,
1✔
445
                        // it's visibility timestamp is larger than or equal to maxReadLevel.
1✔
446
                        // so, create a fake look ahead task so another load can be triggered at that time.
1✔
447
                        lookAheadTask = &persistence.TimerTaskInfo{
1✔
448
                                VisibilityTimestamp: maxReadLevel.(timerTaskKey).visibilityTimestamp,
1✔
449
                        }
1✔
450
                        return filteredTasks, lookAheadTask, nil, nil
1✔
451
                }
1✔
452
        }
453

454
        return filteredTasks, lookAheadTask, nextPageToken, nil
2,384✔
455
}
456

457
func (t *timerQueueProcessorBase) readLookAheadTask(
458
        lookAheadStartLevel task.Key,
459
        lookAheadMaxLevel task.Key,
460
) (*persistence.TimerTaskInfo, error) {
860✔
461
        tasks, _, err := t.getTimerTasks(
860✔
462
                lookAheadStartLevel,
860✔
463
                lookAheadMaxLevel,
860✔
464
                nil,
860✔
465
                1,
860✔
466
        )
860✔
467
        if err != nil {
861✔
468
                return nil, err
1✔
469
        }
1✔
470

471
        if len(tasks) == 1 {
1,668✔
472
                return tasks[0], nil
809✔
473
        }
809✔
474
        return nil, nil
53✔
475
}
476

477
func (t *timerQueueProcessorBase) getTimerTasks(
478
        readLevel task.Key,
479
        maxReadLevel task.Key,
480
        nextPageToken []byte,
481
        batchSize int,
482
) ([]*persistence.TimerTaskInfo, []byte, error) {
3,244✔
483
        request := &persistence.GetTimerIndexTasksRequest{
3,244✔
484
                MinTimestamp:  readLevel.(timerTaskKey).visibilityTimestamp,
3,244✔
485
                MaxTimestamp:  maxReadLevel.(timerTaskKey).visibilityTimestamp,
3,244✔
486
                BatchSize:     batchSize,
3,244✔
487
                NextPageToken: nextPageToken,
3,244✔
488
        }
3,244✔
489

3,244✔
490
        var err error
3,244✔
491
        var response *persistence.GetTimerIndexTasksResponse
3,244✔
492
        retryCount := t.shard.GetConfig().TimerProcessorGetFailureRetryCount()
3,244✔
493
        for attempt := 0; attempt < retryCount; attempt++ {
6,492✔
494
                response, err = t.shard.GetExecutionManager().GetTimerIndexTasks(context.Background(), request)
3,248✔
495
                if err == nil {
6,491✔
496
                        return response.Timers, response.NextPageToken, nil
3,243✔
497
                }
3,243✔
498
                backoff := time.Duration(attempt * 100)
5✔
499
                time.Sleep(backoff * time.Millisecond)
5✔
500
        }
501
        return nil, nil, err
1✔
502
}
503

504
func (t *timerQueueProcessorBase) isProcessNow(
505
        expiryTime time.Time,
506
) bool {
3,926✔
507
        if expiryTime.IsZero() {
3,927✔
508
                // return true, but somewhere probably have bug creating empty timerTask.
1✔
509
                t.logger.Warn("Timer task has timestamp zero")
1✔
510
        }
1✔
511
        return expiryTime.UnixNano() <= t.shard.GetCurrentTime(t.clusterName).UnixNano()
3,926✔
512
}
513

514
func (t *timerQueueProcessorBase) notifyNewTimers(
515
        timerTasks []persistence.Task,
516
) {
3,074✔
517
        if len(timerTasks) == 0 {
3,074✔
518
                return
×
519
        }
×
520

521
        isActive := t.options.MetricScope == metrics.TimerActiveQueueProcessorScope
3,074✔
522

3,074✔
523
        minNewTime := timerTasks[0].GetVisibilityTimestamp()
3,074✔
524
        for _, timerTask := range timerTasks {
6,245✔
525
                ts := timerTask.GetVisibilityTimestamp()
3,171✔
526
                if ts.Before(minNewTime) {
3,250✔
527
                        minNewTime = ts
79✔
528
                }
79✔
529

530
                taskScopeIdx := task.GetTimerTaskMetricScope(
3,171✔
531
                        timerTask.GetType(),
3,171✔
532
                        isActive,
3,171✔
533
                )
3,171✔
534
                t.metricsClient.IncCounter(taskScopeIdx, metrics.NewTimerCounter)
3,171✔
535
        }
536

537
        t.notifyNewTimer(minNewTime)
3,074✔
538
}
539

540
func (t *timerQueueProcessorBase) notifyNewTimer(
541
        newTime time.Time,
542
) {
3,074✔
543
        t.newTimeLock.Lock()
3,074✔
544
        defer t.newTimeLock.Unlock()
3,074✔
545

3,074✔
546
        if t.newTime.IsZero() || newTime.Before(t.newTime) {
6,142✔
547
                t.newTime = newTime
3,068✔
548
                select {
3,068✔
549
                case t.newTimerCh <- struct{}{}:
3,066✔
550
                        // Notified about new time.
551
                default:
2✔
552
                        // Channel "full" -> drop and move on, this will happen only if service is in high load.
553
                }
554
        }
555
}
556

557
func (t *timerQueueProcessorBase) upsertPollTime(level int, newPollTime time.Time) {
7,964✔
558
        t.pollTimeLock.Lock()
7,964✔
559
        defer t.pollTimeLock.Unlock()
7,964✔
560

7,964✔
561
        if _, ok := t.backoffTimer[level]; ok {
7,964✔
562
                // honor existing backoff timer
×
563
                return
×
564
        }
×
565

566
        if currentPollTime, ok := t.nextPollTime[level]; !ok || newPollTime.Before(currentPollTime) {
13,203✔
567
                t.nextPollTime[level] = newPollTime
5,239✔
568
                t.timerGate.Update(newPollTime)
5,239✔
569
        }
5,239✔
570
}
571

572
// setupBackoffTimer will trigger a poll for the specified processing queue collection
573
// after a certain period of (real) time. This means for standby timer, even if the cluster time
574
// has not been updated, the poll will still be triggered when the timer fired. Use this function
575
// for delaying the load for processing queue. If a poll should be triggered immediately
576
// use upsertPollTime.
577
func (t *timerQueueProcessorBase) setupBackoffTimer(level int) {
×
578
        t.pollTimeLock.Lock()
×
579
        defer t.pollTimeLock.Unlock()
×
580

×
581
        if _, ok := t.backoffTimer[level]; ok {
×
582
                // honor existing backoff timer
×
583
                return
×
584
        }
×
585

586
        t.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
×
587
        t.logger.Info("Throttled processing queue", tag.QueueLevel(level))
×
588
        backoffDuration := backoff.JitDuration(
×
589
                t.options.PollBackoffInterval(),
×
590
                t.options.PollBackoffIntervalJitterCoefficient(),
×
591
        )
×
592
        t.backoffTimer[level] = time.AfterFunc(backoffDuration, func() {
×
593
                select {
×
594
                case <-t.shutdownCh:
×
595
                        return
×
596
                default:
×
597
                }
598

599
                t.pollTimeLock.Lock()
×
600
                defer t.pollTimeLock.Unlock()
×
601

×
602
                t.nextPollTime[level] = time.Time{}
×
603
                t.timerGate.Update(time.Time{})
×
604
                delete(t.backoffTimer, level)
×
605
        })
606
}
607

608
func newTimerTaskKey(
609
        visibilityTimestamp time.Time,
610
        taskID int64,
611
) task.Key {
7,891✔
612
        return timerTaskKey{
7,891✔
613
                visibilityTimestamp: visibilityTimestamp,
7,891✔
614
                taskID:              taskID,
7,891✔
615
        }
7,891✔
616
}
7,891✔
617

618
func (k timerTaskKey) Less(
619
        key task.Key,
620
) bool {
28,778✔
621
        timerKey := key.(timerTaskKey)
28,778✔
622
        if k.visibilityTimestamp.Equal(timerKey.visibilityTimestamp) {
29,456✔
623
                return k.taskID < timerKey.taskID
678✔
624
        }
678✔
625
        return k.visibilityTimestamp.Before(timerKey.visibilityTimestamp)
28,103✔
626
}
627

628
func (k timerTaskKey) String() string {
×
629
        return fmt.Sprintf("{visibilityTimestamp: %v, taskID: %v}", k.visibilityTimestamp, k.taskID)
×
630
}
×
631

632
func newTimerQueueProcessorOptions(
633
        config *config.Config,
634
        isActive bool,
635
        isFailover bool,
636
) *queueProcessorOptions {
113✔
637
        options := &queueProcessorOptions{
113✔
638
                BatchSize:                            config.TimerTaskBatchSize,
113✔
639
                DeleteBatchSize:                      config.TimerTaskDeleteBatchSize,
113✔
640
                MaxPollRPS:                           config.TimerProcessorMaxPollRPS,
113✔
641
                MaxPollInterval:                      config.TimerProcessorMaxPollInterval,
113✔
642
                MaxPollIntervalJitterCoefficient:     config.TimerProcessorMaxPollIntervalJitterCoefficient,
113✔
643
                UpdateAckInterval:                    config.TimerProcessorUpdateAckInterval,
113✔
644
                UpdateAckIntervalJitterCoefficient:   config.TimerProcessorUpdateAckIntervalJitterCoefficient,
113✔
645
                RedispatchIntervalJitterCoefficient:  config.TaskRedispatchIntervalJitterCoefficient,
113✔
646
                MaxRedispatchQueueSize:               config.TimerProcessorMaxRedispatchQueueSize,
113✔
647
                SplitQueueInterval:                   config.TimerProcessorSplitQueueInterval,
113✔
648
                SplitQueueIntervalJitterCoefficient:  config.TimerProcessorSplitQueueIntervalJitterCoefficient,
113✔
649
                PollBackoffInterval:                  config.QueueProcessorPollBackoffInterval,
113✔
650
                PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
113✔
651
        }
113✔
652

113✔
653
        if isFailover {
113✔
654
                // disable queue split for failover processor
×
655
                options.EnableSplit = dynamicconfig.GetBoolPropertyFn(false)
×
656

×
657
                // disable persist and load processing queue states for failover processor as it will never be split
×
658
                options.EnablePersistQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
659
                options.EnableLoadQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
660

×
661
                options.MaxStartJitterInterval = config.TimerProcessorFailoverMaxStartJitterInterval
×
662
        } else {
113✔
663
                options.EnableSplit = config.QueueProcessorEnableSplit
113✔
664
                options.SplitMaxLevel = config.QueueProcessorSplitMaxLevel
113✔
665
                options.EnableRandomSplitByDomainID = config.QueueProcessorEnableRandomSplitByDomainID
113✔
666
                options.RandomSplitProbability = config.QueueProcessorRandomSplitProbability
113✔
667
                options.EnablePendingTaskSplitByDomainID = config.QueueProcessorEnablePendingTaskSplitByDomainID
113✔
668
                options.PendingTaskSplitThreshold = config.QueueProcessorPendingTaskSplitThreshold
113✔
669
                options.EnableStuckTaskSplitByDomainID = config.QueueProcessorEnableStuckTaskSplitByDomainID
113✔
670
                options.StuckTaskSplitThreshold = config.QueueProcessorStuckTaskSplitThreshold
113✔
671
                options.SplitLookAheadDurationByDomainID = config.QueueProcessorSplitLookAheadDurationByDomainID
113✔
672

113✔
673
                options.EnablePersistQueueStates = config.QueueProcessorEnablePersistQueueStates
113✔
674
                options.EnableLoadQueueStates = config.QueueProcessorEnableLoadQueueStates
113✔
675

113✔
676
                options.MaxStartJitterInterval = dynamicconfig.GetDurationPropertyFn(0)
113✔
677
        }
113✔
678

679
        if isActive {
178✔
680
                options.MetricScope = metrics.TimerActiveQueueProcessorScope
65✔
681
                options.RedispatchInterval = config.ActiveTaskRedispatchInterval
65✔
682
        } else {
116✔
683
                options.MetricScope = metrics.TimerStandbyQueueProcessorScope
51✔
684
                options.RedispatchInterval = config.StandbyTaskRedispatchInterval
51✔
685
        }
51✔
686

687
        return options
113✔
688
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc