• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018f7a32-1b26-4980-857d-003eb52f072b

15 May 2024 03:00AM UTC coverage: 69.159% (-0.007%) from 69.166%
018f7a32-1b26-4980-857d-003eb52f072b

push

buildkite

web-flow
Stop history queue task processor after shard controller is stopped (#6022)

1 of 1 new or added line in 1 file covered. (100.0%)

62 existing lines in 15 files now uncovered.

101818 of 147224 relevant lines covered (69.16%)

2563.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.46
/service/history/queue/transfer_queue_processor_base.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "errors"
26
        "math/rand"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/uber/cadence/common"
32
        "github.com/uber/cadence/common/backoff"
33
        "github.com/uber/cadence/common/dynamicconfig"
34
        "github.com/uber/cadence/common/log"
35
        "github.com/uber/cadence/common/log/tag"
36
        "github.com/uber/cadence/common/metrics"
37
        "github.com/uber/cadence/common/persistence"
38
        hcommon "github.com/uber/cadence/service/history/common"
39
        "github.com/uber/cadence/service/history/config"
40
        "github.com/uber/cadence/service/history/shard"
41
        "github.com/uber/cadence/service/history/task"
42
)
43

44
const (
45
        numTasksEstimationDecay = 0.6
46
)
47

48
var (
49
        loadQueueTaskThrottleRetryDelay = 5 * time.Second
50
        persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy()
51
)
52

53
type (
54
        transferTaskKey struct {
55
                taskID int64
56
        }
57

58
        transferQueueProcessorBase struct {
59
                *processorBase
60

61
                taskInitializer task.Initializer
62

63
                notifyCh  chan struct{}
64
                processCh chan struct{}
65

66
                // for managing if a processing queue collection should be processed
67
                startJitterTimer *time.Timer
68
                processingLock   sync.Mutex
69
                backoffTimer     map[int]*time.Timer
70
                shouldProcess    map[int]bool
71

72
                // for estimating the look ahead taskID during split
73
                lastSplitTime           time.Time
74
                lastMaxReadLevel        int64
75
                estimatedTasksPerMinute int64
76

77
                // for validating if the queue failed to load any tasks
78
                validator *transferQueueValidator
79

80
                processQueueCollectionsFn func()
81
                updateAckLevelFn          func() (bool, task.Key, error)
82
        }
83
)
84

85
func newTransferQueueProcessorBase(
86
        shard shard.Context,
87
        processingQueueStates []ProcessingQueueState,
88
        taskProcessor task.Processor,
89
        options *queueProcessorOptions,
90
        updateMaxReadLevel updateMaxReadLevelFn,
91
        updateClusterAckLevel updateClusterAckLevelFn,
92
        updateProcessingQueueStates updateProcessingQueueStatesFn,
93
        queueShutdown queueShutdownFn,
94
        taskFilter task.Filter,
95
        taskExecutor task.Executor,
96
        logger log.Logger,
97
        metricsClient metrics.Client,
98
) *transferQueueProcessorBase {
160✔
99
        processorBase := newProcessorBase(
160✔
100
                shard,
160✔
101
                processingQueueStates,
160✔
102
                taskProcessor,
160✔
103
                options,
160✔
104
                updateMaxReadLevel,
160✔
105
                updateClusterAckLevel,
160✔
106
                updateProcessingQueueStates,
160✔
107
                queueShutdown,
160✔
108
                logger.WithTags(tag.ComponentTransferQueue),
160✔
109
                metricsClient,
160✔
110
        )
160✔
111

160✔
112
        queueType := task.QueueTypeActiveTransfer
160✔
113
        if options.MetricScope == metrics.TransferStandbyQueueProcessorScope {
237✔
114
                queueType = task.QueueTypeStandbyTransfer
77✔
115
        }
77✔
116

117
        transferQueueProcessorBase := &transferQueueProcessorBase{
160✔
118
                processorBase: processorBase,
160✔
119
                taskInitializer: func(taskInfo task.Info) task.Task {
5,795✔
120
                        return task.NewTransferTask(
5,635✔
121
                                shard,
5,635✔
122
                                taskInfo,
5,635✔
123
                                queueType,
5,635✔
124
                                task.InitializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
5,635✔
125
                                taskFilter,
5,635✔
126
                                taskExecutor,
5,635✔
127
                                taskProcessor,
5,635✔
128
                                processorBase.redispatcher.AddTask,
5,635✔
129
                                shard.GetConfig().TaskCriticalRetryCount,
5,635✔
130
                        )
5,635✔
131
                },
5,635✔
132
                notifyCh:         make(chan struct{}, 1),
133
                processCh:        make(chan struct{}, 1),
134
                backoffTimer:     make(map[int]*time.Timer),
135
                shouldProcess:    make(map[int]bool),
136
                lastSplitTime:    time.Time{},
137
                lastMaxReadLevel: 0,
138
        }
139

140
        transferQueueProcessorBase.processQueueCollectionsFn = transferQueueProcessorBase.processQueueCollections
160✔
141
        transferQueueProcessorBase.updateAckLevelFn = transferQueueProcessorBase.updateAckLevel
160✔
142

160✔
143
        if shard.GetConfig().EnableDebugMode && options.EnableValidator() {
160✔
144
                transferQueueProcessorBase.validator = newTransferQueueValidator(
×
145
                        transferQueueProcessorBase,
×
146
                        options.ValidationInterval,
×
147
                        logger,
×
148
                        processorBase.metricsScope,
×
149
                )
×
150
        }
×
151

152
        return transferQueueProcessorBase
160✔
153
}
154

155
func (t *transferQueueProcessorBase) Start() {
152✔
156
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
152✔
157
                return
×
158
        }
×
159

160
        t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStarting)
152✔
161
        defer t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStarted)
152✔
162

152✔
163
        t.redispatcher.Start()
152✔
164

152✔
165
        // trigger an initial (maybe delayed) load of tasks
152✔
166
        if startJitter := t.options.MaxStartJitterInterval(); startJitter > 0 {
152✔
167
                t.startJitterTimer = time.AfterFunc(
×
168
                        time.Duration(rand.Int63n(int64(startJitter))),
×
169
                        func() {
×
170
                                t.notifyAllQueueCollections()
×
171
                        },
×
172
                )
173
        } else {
152✔
174
                t.notifyAllQueueCollections()
152✔
175
        }
152✔
176

177
        t.shutdownWG.Add(1)
152✔
178
        go t.processorPump()
152✔
179
}
180

181
func (t *transferQueueProcessorBase) Stop() {
152✔
182
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
152✔
183
                return
×
184
        }
×
185

186
        t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStopping)
152✔
187
        defer t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStopped)
152✔
188

152✔
189
        close(t.shutdownCh)
152✔
190
        if t.startJitterTimer != nil {
152✔
191
                t.startJitterTimer.Stop()
×
192
        }
×
193
        t.processingLock.Lock()
152✔
194
        for _, timer := range t.backoffTimer {
152✔
195
                timer.Stop()
×
196
        }
×
197
        for level := range t.shouldProcess {
304✔
198
                t.shouldProcess[level] = false
152✔
199
        }
152✔
200
        t.processingLock.Unlock()
152✔
201

152✔
202
        if success := common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout); !success {
152✔
203
                t.logger.Warn("transferQueueProcessorBase timed out on shut down", tag.LifeCycleStopTimedout)
×
204
        }
×
205

206
        t.redispatcher.Stop()
152✔
207
}
208

209
func (t *transferQueueProcessorBase) notifyNewTask(info *hcommon.NotifyTaskInfo) {
2,296✔
210
        select {
2,296✔
211
        case t.notifyCh <- struct{}{}:
2,296✔
212
        default:
3✔
213
        }
214

215
        if info.ExecutionInfo != nil && t.validator != nil {
2,296✔
216
                // executionInfo will be nil when notifyNewTask is called to trigger a scan, for example during domain failover or sync shard.
×
217
                t.validator.addTasks(info)
×
218
        }
×
219
}
220

221
func (t *transferQueueProcessorBase) readyForProcess(level int) {
2,670✔
222
        t.processingLock.Lock()
2,670✔
223
        defer t.processingLock.Unlock()
2,670✔
224

2,670✔
225
        if _, ok := t.backoffTimer[level]; ok {
2,670✔
226
                // current level is being throttled
×
227
                return
×
228
        }
×
229

230
        t.shouldProcess[level] = true
2,670✔
231

2,670✔
232
        // trigger the actual processing
2,670✔
233
        select {
2,670✔
234
        case t.processCh <- struct{}{}:
2,656✔
235
        default:
14✔
236
        }
237
}
238

239
func (t *transferQueueProcessorBase) setupBackoffTimer(level int) {
1✔
240
        t.processingLock.Lock()
1✔
241
        defer t.processingLock.Unlock()
1✔
242

1✔
243
        if _, ok := t.backoffTimer[level]; ok {
1✔
244
                // there's an existing backoff timer, no-op
×
245
                // this case should not happen
×
246
                return
×
247
        }
×
248

249
        t.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
1✔
250
        t.logger.Info("Throttled processing queue", tag.QueueLevel(level))
1✔
251

1✔
252
        backoffDuration := backoff.JitDuration(
1✔
253
                t.options.PollBackoffInterval(),
1✔
254
                t.options.PollBackoffIntervalJitterCoefficient(),
1✔
255
        )
1✔
256
        t.backoffTimer[level] = time.AfterFunc(backoffDuration, func() {
2✔
257
                select {
1✔
258
                case <-t.shutdownCh:
×
259
                        return
×
260
                default:
1✔
261
                }
262

263
                t.processingLock.Lock()
1✔
264
                defer t.processingLock.Unlock()
1✔
265

1✔
266
                t.shouldProcess[level] = true
1✔
267
                delete(t.backoffTimer, level)
1✔
268

1✔
269
                // trigger the actual processing
1✔
270
                select {
1✔
271
                case t.processCh <- struct{}{}:
1✔
272
                default:
×
273
                }
274
        })
275
}
276

277
func (t *transferQueueProcessorBase) processorPump() {
152✔
278
        defer t.shutdownWG.Done()
152✔
279

152✔
280
        updateAckTimer := time.NewTimer(backoff.JitDuration(
152✔
281
                t.options.UpdateAckInterval(),
152✔
282
                t.options.UpdateAckIntervalJitterCoefficient(),
152✔
283
        ))
152✔
284
        defer updateAckTimer.Stop()
152✔
285

152✔
286
        splitQueueTimer := time.NewTimer(backoff.JitDuration(
152✔
287
                t.options.SplitQueueInterval(),
152✔
288
                t.options.SplitQueueIntervalJitterCoefficient(),
152✔
289
        ))
152✔
290
        defer splitQueueTimer.Stop()
152✔
291

152✔
292
        maxPollTimer := time.NewTimer(backoff.JitDuration(
152✔
293
                t.options.MaxPollInterval(),
152✔
294
                t.options.MaxPollIntervalJitterCoefficient(),
152✔
295
        ))
152✔
296
        defer maxPollTimer.Stop()
152✔
297

152✔
298
        for {
6,346✔
299
                select {
6,194✔
300
                case <-t.shutdownCh:
152✔
301
                        return
152✔
302
                case <-t.notifyCh:
2,297✔
303
                        // notify all queue collections as they are waiting for the notification when there's
2,297✔
304
                        // no more task to process. For non-default queue, if we choose to do periodic polling
2,297✔
305
                        // in the future, then we don't need to notify them.
2,297✔
306
                        t.notifyAllQueueCollections()
2,297✔
307
                case <-maxPollTimer.C:
222✔
308
                        t.notifyAllQueueCollections()
222✔
309
                        maxPollTimer.Reset(backoff.JitDuration(
222✔
310
                                t.options.MaxPollInterval(),
222✔
311
                                t.options.MaxPollIntervalJitterCoefficient(),
222✔
312
                        ))
222✔
313
                case <-t.processCh:
2,655✔
314
                        maxRedispatchQueueSize := t.options.MaxRedispatchQueueSize()
2,655✔
315
                        if redispathSize := t.redispatcher.Size(); redispathSize > maxRedispatchQueueSize {
2,655✔
316
                                t.logger.Debugf("Transfer queue has too many pending tasks in re-dispatch queue: %v > maxRedispatchQueueSize: %v, block loading tasks from persistence", redispathSize, maxRedispatchQueueSize)
×
317
                                t.redispatcher.Redispatch(maxRedispatchQueueSize)
×
318
                                if redispathSize := t.redispatcher.Size(); redispathSize > maxRedispatchQueueSize {
×
319
                                        // if redispatcher still has a large number of tasks
×
320
                                        // this only happens when system is under very high load
×
321
                                        // we should backoff here instead of keeping submitting tasks to task processor
×
322
                                        t.logger.Debugf("Transfer queue still has too many pending tasks in re-dispatch queue: %v > maxRedispatchQueueSize: %v, backing off for %v", redispathSize, maxRedispatchQueueSize, t.options.PollBackoffInterval())
×
323
                                        time.Sleep(backoff.JitDuration(
×
324
                                                t.options.PollBackoffInterval(),
×
325
                                                t.options.PollBackoffIntervalJitterCoefficient(),
×
326
                                        ))
×
327
                                }
×
328
                                // re-enqueue the event to see if we need keep re-dispatching or load new tasks from persistence
329
                                select {
×
330
                                case t.processCh <- struct{}{}:
×
331
                                default:
×
332
                                }
333
                        } else {
2,655✔
334
                                t.processQueueCollectionsFn()
2,655✔
335
                        }
2,655✔
336
                case <-updateAckTimer.C:
455✔
337
                        processFinished, _, err := t.updateAckLevelFn()
455✔
338
                        var errShardClosed *shard.ErrShardClosed
455✔
339
                        if errors.As(err, &errShardClosed) || (err == nil && processFinished) {
455✔
340
                                if !t.options.EnableGracefulSyncShutdown() {
×
341
                                        go t.Stop()
×
342
                                        return
×
343
                                }
×
344

345
                                t.Stop()
×
346
                                return
×
347
                        }
348
                        updateAckTimer.Reset(backoff.JitDuration(
455✔
349
                                t.options.UpdateAckInterval(),
455✔
350
                                t.options.UpdateAckIntervalJitterCoefficient(),
455✔
351
                        ))
455✔
352
                case <-splitQueueTimer.C:
217✔
353
                        t.splitQueue()
217✔
354
                        splitQueueTimer.Reset(backoff.JitDuration(
217✔
355
                                t.options.SplitQueueInterval(),
217✔
356
                                t.options.SplitQueueIntervalJitterCoefficient(),
217✔
357
                        ))
217✔
358
                case notification := <-t.actionNotifyCh:
211✔
359
                        t.handleActionNotification(notification)
211✔
360
                }
361
        }
362
}
363

364
func (t *transferQueueProcessorBase) notifyAllQueueCollections() {
2,666✔
365
        for _, queueCollection := range t.processingQueueCollections {
5,332✔
366
                t.readyForProcess(queueCollection.Level())
2,666✔
367
        }
2,666✔
368
}
369

370
func (t *transferQueueProcessorBase) processQueueCollections() {
2,655✔
371
        for _, queueCollection := range t.processingQueueCollections {
5,310✔
372
                level := queueCollection.Level()
2,655✔
373
                t.processingLock.Lock()
2,655✔
374
                if shouldProcess, ok := t.shouldProcess[level]; !ok || !shouldProcess {
2,655✔
375
                        t.processingLock.Unlock()
×
376
                        continue
×
377
                }
378
                t.shouldProcess[level] = false
2,655✔
379
                t.processingLock.Unlock()
2,655✔
380

2,655✔
381
                activeQueue := queueCollection.ActiveQueue()
2,655✔
382
                if activeQueue == nil {
2,655✔
383
                        // process for this queue collection has finished
×
384
                        // it's possible that new queue will be added to this collection later though,
×
385
                        // pollTime will be updated after split/merge
×
386
                        continue
×
387
                }
388

389
                readLevel := activeQueue.State().ReadLevel()
2,655✔
390
                maxReadLevel := minTaskKey(activeQueue.State().MaxLevel(), t.updateMaxReadLevel())
2,655✔
391
                domainFilter := activeQueue.State().DomainFilter()
2,655✔
392

2,655✔
393
                if !readLevel.Less(maxReadLevel) {
2,930✔
394
                        // no task need to be processed for now, wait for new task notification
275✔
395
                        // note that if taskID for new task is still less than readLevel, the notification
275✔
396
                        // will just be a no-op and there's no DB requests.
275✔
397
                        continue
275✔
398
                }
399

400
                ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay)
2,382✔
401
                if err := t.rateLimiter.Wait(ctx); err != nil {
2,382✔
402
                        cancel()
×
403
                        if level != defaultProcessingQueueLevel {
×
404
                                t.setupBackoffTimer(level)
×
405
                        } else {
×
406
                                t.readyForProcess(level)
×
407
                        }
×
408
                        continue
×
409
                }
410
                cancel()
2,382✔
411

2,382✔
412
                transferTaskInfos, more, err := t.readTasks(readLevel, maxReadLevel)
2,382✔
413
                if err != nil {
2,382✔
414
                        t.logger.Error("Processor unable to retrieve tasks", tag.Error(err))
×
415
                        t.readyForProcess(level) // re-enqueue the event
×
416
                        continue
×
417
                }
418
                t.logger.Debug("load transfer tasks from database",
2,382✔
419
                        tag.ReadLevel(readLevel.(transferTaskKey).taskID),
2,382✔
420
                        tag.MaxLevel(maxReadLevel.(transferTaskKey).taskID),
2,382✔
421
                        tag.Counter(len(transferTaskInfos)))
2,382✔
422

2,382✔
423
                tasks := make(map[task.Key]task.Task)
2,382✔
424
                taskChFull := false
2,382✔
425
                for _, taskInfo := range transferTaskInfos {
8,021✔
426
                        if !domainFilter.Filter(taskInfo.GetDomainID()) {
5,643✔
427
                                t.logger.Debug("transfer task filtered", tag.TaskID(taskInfo.GetTaskID()))
4✔
428
                                continue
4✔
429
                        }
430

431
                        task := t.taskInitializer(taskInfo)
5,635✔
432
                        tasks[newTransferTaskKey(taskInfo.GetTaskID())] = task
5,635✔
433
                        submitted, err := t.submitTask(task)
5,635✔
434
                        if err != nil {
5,635✔
UNCOV
435
                                // only err here is due to the fact that processor has been shutdown
×
UNCOV
436
                                // return instead of continue
×
UNCOV
437
                                return
×
UNCOV
438
                        }
×
439
                        taskChFull = taskChFull || !submitted
5,635✔
440
                }
441

442
                var newReadLevel task.Key
2,382✔
443
                if !more {
4,760✔
444
                        newReadLevel = maxReadLevel
2,378✔
445
                } else {
2,385✔
446
                        newReadLevel = newTransferTaskKey(transferTaskInfos[len(transferTaskInfos)-1].GetTaskID())
7✔
447
                }
7✔
448
                queueCollection.AddTasks(tasks, newReadLevel)
2,382✔
449
                if t.validator != nil {
2,382✔
450
                        t.logger.Debug("ack transfer tasks",
×
451
                                tag.ReadLevel(readLevel.(transferTaskKey).taskID),
×
452
                                tag.MaxLevel(newReadLevel.(transferTaskKey).taskID),
×
453
                                tag.Counter(len(tasks)))
×
454
                        t.validator.ackTasks(level, readLevel, newReadLevel, tasks)
×
455
                }
×
456

457
                newActiveQueue := queueCollection.ActiveQueue()
2,382✔
458
                if more || (newActiveQueue != nil && newActiveQueue != activeQueue) {
2,390✔
459
                        // more tasks for the current active queue or the active queue has changed
8✔
460
                        if level != defaultProcessingQueueLevel && taskChFull {
9✔
461
                                t.setupBackoffTimer(level)
1✔
462
                        } else {
8✔
463
                                t.readyForProcess(level)
7✔
464
                        }
7✔
465
                }
466

467
                // else it means we don't have tasks to process for now
468
                // wait for new task notification
469
                // another option for non-default queue is that we can setup a backoff timer to check back later
470
        }
471
}
472

473
func (t *transferQueueProcessorBase) splitQueue() {
217✔
474
        currentTime := t.shard.GetTimeSource().Now()
217✔
475
        currentMaxReadLevel := t.updateMaxReadLevel().(transferTaskKey).taskID
217✔
476
        defer func() {
434✔
477
                t.lastSplitTime = currentTime
217✔
478
                t.lastMaxReadLevel = currentMaxReadLevel
217✔
479
        }()
217✔
480

481
        if currentMaxReadLevel-t.lastMaxReadLevel < 2<<(t.shard.GetConfig().RangeSizeBits-1) {
368✔
482
                // only update the estimation when rangeID is not renewed
151✔
483
                // note the threshold here is only an estimation. If the read level increased too much
151✔
484
                // we will drop that data point.
151✔
485
                numTasksPerMinute := (currentMaxReadLevel - t.lastMaxReadLevel) / int64(currentTime.Sub(t.lastSplitTime).Seconds()) * int64(time.Minute.Seconds())
151✔
486

151✔
487
                if t.estimatedTasksPerMinute == 0 {
175✔
488
                        // set the initial value for the estimation
24✔
489
                        t.estimatedTasksPerMinute = numTasksPerMinute
24✔
490
                } else {
151✔
491
                        t.estimatedTasksPerMinute = int64(numTasksEstimationDecay*float64(t.estimatedTasksPerMinute) + (1-numTasksEstimationDecay)*float64(numTasksPerMinute))
127✔
492
                }
127✔
493
        }
494

495
        if t.lastSplitTime.IsZero() || t.estimatedTasksPerMinute == 0 {
283✔
496
                // skip the split as we can't estimate the look ahead taskID
66✔
497
                return
66✔
498
        }
66✔
499

500
        splitPolicy := t.initializeSplitPolicy(
151✔
501
                func(key task.Key, domainID string) task.Key {
151✔
502
                        totalLookAhead := t.estimatedTasksPerMinute * int64(t.options.SplitLookAheadDurationByDomainID(domainID).Minutes())
×
503
                        // ensure the above calculation doesn't overflow and cap the maximun look ahead interval
×
504
                        totalLookAhead = common.MaxInt64(common.MinInt64(totalLookAhead, 2<<t.shard.GetConfig().RangeSizeBits), 0)
×
505
                        return newTransferTaskKey(key.(transferTaskKey).taskID + totalLookAhead)
×
506
                },
×
507
        )
508

509
        t.splitProcessingQueueCollection(splitPolicy, func(level int, _ time.Time) {
151✔
510
                t.readyForProcess(level)
×
511
        })
×
512
}
513

514
func (t *transferQueueProcessorBase) handleActionNotification(notification actionNotification) {
211✔
515
        t.processorBase.handleActionNotification(notification, func() {
422✔
516
                switch notification.action.ActionType {
211✔
517
                case ActionTypeReset:
×
518
                        t.readyForProcess(defaultProcessingQueueLevel)
×
519
                }
520
        })
521
}
522

523
func (t *transferQueueProcessorBase) readTasks(
524
        readLevel task.Key,
525
        maxReadLevel task.Key,
526
) ([]*persistence.TransferTaskInfo, bool, error) {
2,384✔
527

2,384✔
528
        var response *persistence.GetTransferTasksResponse
2,384✔
529
        op := func() error {
4,768✔
530
                var err error
2,384✔
531
                response, err = t.shard.GetExecutionManager().GetTransferTasks(context.Background(), &persistence.GetTransferTasksRequest{
2,384✔
532
                        ReadLevel:    readLevel.(transferTaskKey).taskID,
2,384✔
533
                        MaxReadLevel: maxReadLevel.(transferTaskKey).taskID,
2,384✔
534
                        BatchSize:    t.options.BatchSize(),
2,384✔
535
                })
2,384✔
536
                return err
2,384✔
537
        }
2,384✔
538

539
        throttleRetry := backoff.NewThrottleRetry(
2,384✔
540
                backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
2,384✔
541
                backoff.WithRetryableError(persistence.IsBackgroundTransientError),
2,384✔
542
        )
2,384✔
543
        err := throttleRetry.Do(context.Background(), op)
2,384✔
544
        if err != nil {
2,384✔
545
                return nil, false, err
×
546
        }
×
547

548
        return response.Tasks, len(response.NextPageToken) != 0, nil
2,384✔
549
}
550

551
func newTransferTaskKey(taskID int64) task.Key {
9,038✔
552
        return transferTaskKey{
9,038✔
553
                taskID: taskID,
9,038✔
554
        }
9,038✔
555
}
9,038✔
556

557
func (k transferTaskKey) Less(
558
        key task.Key,
559
) bool {
53,994✔
560
        return k.taskID < key.(transferTaskKey).taskID
53,994✔
561
}
53,994✔
562

563
func newTransferQueueProcessorOptions(
564
        config *config.Config,
565
        isActive bool,
566
        isFailover bool,
567
) *queueProcessorOptions {
168✔
568
        options := &queueProcessorOptions{
168✔
569
                BatchSize:                            config.TransferTaskBatchSize,
168✔
570
                DeleteBatchSize:                      config.TransferTaskDeleteBatchSize,
168✔
571
                MaxPollRPS:                           config.TransferProcessorMaxPollRPS,
168✔
572
                MaxPollInterval:                      config.TransferProcessorMaxPollInterval,
168✔
573
                MaxPollIntervalJitterCoefficient:     config.TransferProcessorMaxPollIntervalJitterCoefficient,
168✔
574
                UpdateAckInterval:                    config.TransferProcessorUpdateAckInterval,
168✔
575
                UpdateAckIntervalJitterCoefficient:   config.TransferProcessorUpdateAckIntervalJitterCoefficient,
168✔
576
                RedispatchIntervalJitterCoefficient:  config.TaskRedispatchIntervalJitterCoefficient,
168✔
577
                MaxRedispatchQueueSize:               config.TransferProcessorMaxRedispatchQueueSize,
168✔
578
                SplitQueueInterval:                   config.TransferProcessorSplitQueueInterval,
168✔
579
                SplitQueueIntervalJitterCoefficient:  config.TransferProcessorSplitQueueIntervalJitterCoefficient,
168✔
580
                PollBackoffInterval:                  config.QueueProcessorPollBackoffInterval,
168✔
581
                PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
168✔
582
                EnableValidator:                      config.TransferProcessorEnableValidator,
168✔
583
                ValidationInterval:                   config.TransferProcessorValidationInterval,
168✔
584
                EnableGracefulSyncShutdown:           config.QueueProcessorEnableGracefulSyncShutdown,
168✔
585
        }
168✔
586

168✔
587
        if isFailover {
168✔
588
                // disable queue split for failover processor
×
589
                options.EnableSplit = dynamicconfig.GetBoolPropertyFn(false)
×
590

×
591
                // disable persist and load processing queue states for failover processor as it will never be split
×
592
                options.EnablePersistQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
593
                options.EnableLoadQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
594

×
595
                options.MaxStartJitterInterval = config.TransferProcessorFailoverMaxStartJitterInterval
×
596
        } else {
168✔
597
                options.EnableSplit = config.QueueProcessorEnableSplit
168✔
598
                options.SplitMaxLevel = config.QueueProcessorSplitMaxLevel
168✔
599
                options.EnableRandomSplitByDomainID = config.QueueProcessorEnableRandomSplitByDomainID
168✔
600
                options.RandomSplitProbability = config.QueueProcessorRandomSplitProbability
168✔
601
                options.EnablePendingTaskSplitByDomainID = config.QueueProcessorEnablePendingTaskSplitByDomainID
168✔
602
                options.PendingTaskSplitThreshold = config.QueueProcessorPendingTaskSplitThreshold
168✔
603
                options.EnableStuckTaskSplitByDomainID = config.QueueProcessorEnableStuckTaskSplitByDomainID
168✔
604
                options.StuckTaskSplitThreshold = config.QueueProcessorStuckTaskSplitThreshold
168✔
605
                options.SplitLookAheadDurationByDomainID = config.QueueProcessorSplitLookAheadDurationByDomainID
168✔
606

168✔
607
                options.EnablePersistQueueStates = config.QueueProcessorEnablePersistQueueStates
168✔
608
                options.EnableLoadQueueStates = config.QueueProcessorEnableLoadQueueStates
168✔
609

168✔
610
                options.MaxStartJitterInterval = dynamicconfig.GetDurationPropertyFn(0)
168✔
611
        }
168✔
612

613
        if isActive {
262✔
614
                options.MetricScope = metrics.TransferActiveQueueProcessorScope
94✔
615
                options.RedispatchInterval = config.ActiveTaskRedispatchInterval
94✔
616
        } else {
171✔
617
                options.MetricScope = metrics.TransferStandbyQueueProcessorScope
77✔
618
                options.RedispatchInterval = config.StandbyTaskRedispatchInterval
77✔
619
        }
77✔
620

621
        return options
168✔
622
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc