• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01875e2f-959c-4c4d-87af-1d7805759bcc

08 Apr 2023 12:26AM UTC coverage: 57.178% (+0.1%) from 57.072%
01875e2f-959c-4c4d-87af-1d7805759bcc

Pull #5197

buildkite

Steven L
bad cleanup -> good cleanup
Pull Request #5197: Demonstrate a way to get rid of the cadence-idl repo

85396 of 149351 relevant lines covered (57.18%)

2283.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.62
/service/history/queue/transfer_queue_processor_base.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "math/rand"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/backoff"
32
        "github.com/uber/cadence/common/dynamicconfig"
33
        "github.com/uber/cadence/common/log"
34
        "github.com/uber/cadence/common/log/tag"
35
        "github.com/uber/cadence/common/metrics"
36
        "github.com/uber/cadence/common/persistence"
37
        hcommon "github.com/uber/cadence/service/history/common"
38
        "github.com/uber/cadence/service/history/config"
39
        "github.com/uber/cadence/service/history/shard"
40
        "github.com/uber/cadence/service/history/task"
41
)
42

43
const (
44
        numTasksEstimationDecay = 0.6
45
)
46

47
var (
48
        loadQueueTaskThrottleRetryDelay = 5 * time.Second
49

50
        persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy()
51
)
52

53
type (
54
        transferTaskKey struct {
55
                taskID int64
56
        }
57

58
        transferQueueProcessorBase struct {
59
                *processorBase
60

61
                taskInitializer task.Initializer
62

63
                notifyCh  chan struct{}
64
                processCh chan struct{}
65

66
                // for managing if a processing queue collection should be processed
67
                startJitterTimer *time.Timer
68
                processingLock   sync.Mutex
69
                backoffTimer     map[int]*time.Timer
70
                shouldProcess    map[int]bool
71

72
                // for estimating the look ahead taskID during split
73
                lastSplitTime           time.Time
74
                lastMaxReadLevel        int64
75
                estimatedTasksPerMinute int64
76

77
                // for validating if the queue failed to load any tasks
78
                validator *transferQueueValidator
79
        }
80
)
81

82
func newTransferQueueProcessorBase(
83
        shard shard.Context,
84
        processingQueueStates []ProcessingQueueState,
85
        taskProcessor task.Processor,
86
        options *queueProcessorOptions,
87
        updateMaxReadLevel updateMaxReadLevelFn,
88
        updateClusterAckLevel updateClusterAckLevelFn,
89
        updateProcessingQueueStates updateProcessingQueueStatesFn,
90
        queueShutdown queueShutdownFn,
91
        taskFilter task.Filter,
92
        taskExecutor task.Executor,
93
        logger log.Logger,
94
        metricsClient metrics.Client,
95
) *transferQueueProcessorBase {
105✔
96
        processorBase := newProcessorBase(
105✔
97
                shard,
105✔
98
                processingQueueStates,
105✔
99
                taskProcessor,
105✔
100
                options,
105✔
101
                updateMaxReadLevel,
105✔
102
                updateClusterAckLevel,
105✔
103
                updateProcessingQueueStates,
105✔
104
                queueShutdown,
105✔
105
                logger.WithTags(tag.ComponentTransferQueue),
105✔
106
                metricsClient,
105✔
107
        )
105✔
108

105✔
109
        queueType := task.QueueTypeActiveTransfer
105✔
110
        if options.MetricScope == metrics.TransferStandbyQueueProcessorScope {
156✔
111
                queueType = task.QueueTypeStandbyTransfer
51✔
112
        }
51✔
113

114
        transferQueueProcessorBase := &transferQueueProcessorBase{
105✔
115
                processorBase: processorBase,
105✔
116

105✔
117
                taskInitializer: func(taskInfo task.Info) task.Task {
5,551✔
118
                        return task.NewTransferTask(
5,446✔
119
                                shard,
5,446✔
120
                                taskInfo,
5,446✔
121
                                queueType,
5,446✔
122
                                task.InitializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
5,446✔
123
                                taskFilter,
5,446✔
124
                                taskExecutor,
5,446✔
125
                                taskProcessor,
5,446✔
126
                                processorBase.redispatcher.AddTask,
5,446✔
127
                                shard.GetConfig().TaskCriticalRetryCount,
5,446✔
128
                        )
5,446✔
129
                },
5,446✔
130

131
                notifyCh:  make(chan struct{}, 1),
132
                processCh: make(chan struct{}, 1),
133

134
                backoffTimer:  make(map[int]*time.Timer),
135
                shouldProcess: make(map[int]bool),
136

137
                lastSplitTime:    time.Time{},
138
                lastMaxReadLevel: 0,
139
        }
140

141
        if shard.GetConfig().EnableDebugMode && options.EnableValidator() {
105✔
142
                transferQueueProcessorBase.validator = newTransferQueueValidator(
×
143
                        transferQueueProcessorBase,
×
144
                        options.ValidationInterval,
×
145
                        logger,
×
146
                        metricsClient.Scope(options.MetricScope),
×
147
                )
×
148
        }
×
149

150
        return transferQueueProcessorBase
105✔
151
}
152

153
func (t *transferQueueProcessorBase) Start() {
99✔
154
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
99✔
155
                return
×
156
        }
×
157

158
        t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStarting)
99✔
159
        defer t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStarted)
99✔
160

99✔
161
        t.redispatcher.Start()
99✔
162

99✔
163
        // trigger an initial (maybe delayed) load of tasks
99✔
164
        if startJitter := t.options.MaxStartJitterInterval(); startJitter > 0 {
99✔
165
                t.startJitterTimer = time.AfterFunc(
×
166
                        time.Duration(rand.Int63n(int64(startJitter))),
×
167
                        func() {
×
168
                                t.notifyAllQueueCollections()
×
169
                        },
×
170
                )
171
        } else {
99✔
172
                t.notifyAllQueueCollections()
99✔
173
        }
99✔
174

175
        t.shutdownWG.Add(1)
99✔
176
        go t.processorPump()
99✔
177
}
178

179
func (t *transferQueueProcessorBase) Stop() {
99✔
180
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
99✔
181
                return
×
182
        }
×
183

184
        t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStopping)
99✔
185
        defer t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStopped)
99✔
186

99✔
187
        close(t.shutdownCh)
99✔
188
        if t.startJitterTimer != nil {
99✔
189
                t.startJitterTimer.Stop()
×
190
        }
×
191
        t.processingLock.Lock()
99✔
192
        for _, timer := range t.backoffTimer {
99✔
193
                timer.Stop()
×
194
        }
×
195
        for level := range t.shouldProcess {
198✔
196
                t.shouldProcess[level] = false
99✔
197
        }
99✔
198
        t.processingLock.Unlock()
99✔
199

99✔
200
        if success := common.AwaitWaitGroup(&t.shutdownWG, time.Minute); !success {
99✔
201
                t.logger.Warn("", tag.LifeCycleStopTimedout)
×
202
        }
×
203

204
        t.redispatcher.Stop()
99✔
205
}
206

207
func (t *transferQueueProcessorBase) notifyNewTask(
208
        info *hcommon.NotifyTaskInfo,
209
) {
2,270✔
210
        select {
2,270✔
211
        case t.notifyCh <- struct{}{}:
2,270✔
212
        default:
3✔
213
        }
214

215
        if info.ExecutionInfo != nil && t.validator != nil {
2,270✔
216
                // executionInfo will be nil when notifyNewTask is called to trigger a scan, for example during domain failover or sync shard.
×
217
                t.validator.addTasks(info)
×
218
        }
×
219
}
220

221
func (t *transferQueueProcessorBase) readyForProcess(level int) {
2,602✔
222
        t.processingLock.Lock()
2,602✔
223
        defer t.processingLock.Unlock()
2,602✔
224

2,602✔
225
        if _, ok := t.backoffTimer[level]; ok {
2,602✔
226
                // current level is being throttled
×
227
                return
×
228
        }
×
229

230
        t.shouldProcess[level] = true
2,602✔
231

2,602✔
232
        // trigger the actual processing
2,602✔
233
        select {
2,602✔
234
        case t.processCh <- struct{}{}:
2,602✔
235
        default:
×
236
        }
237
}
238

239
func (t *transferQueueProcessorBase) setupBackoffTimer(level int) {
1✔
240
        t.processingLock.Lock()
1✔
241
        defer t.processingLock.Unlock()
1✔
242

1✔
243
        if _, ok := t.backoffTimer[level]; ok {
1✔
244
                // there's an existing backoff timer, no-op
×
245
                // this case should not happen
×
246
                return
×
247
        }
×
248

249
        t.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
1✔
250
        t.logger.Info("Throttled processing queue", tag.QueueLevel(level))
1✔
251

1✔
252
        backoffDuration := backoff.JitDuration(
1✔
253
                t.options.PollBackoffInterval(),
1✔
254
                t.options.PollBackoffIntervalJitterCoefficient(),
1✔
255
        )
1✔
256
        t.backoffTimer[level] = time.AfterFunc(backoffDuration, func() {
2✔
257
                select {
1✔
258
                case <-t.shutdownCh:
×
259
                        return
×
260
                default:
1✔
261
                }
262

263
                t.processingLock.Lock()
1✔
264
                defer t.processingLock.Unlock()
1✔
265

1✔
266
                t.shouldProcess[level] = true
1✔
267
                delete(t.backoffTimer, level)
1✔
268

1✔
269
                // trigger the actual processing
1✔
270
                select {
1✔
271
                case t.processCh <- struct{}{}:
1✔
272
                default:
×
273
                }
274
        })
275
}
276

277
func (t *transferQueueProcessorBase) processorPump() {
99✔
278
        defer t.shutdownWG.Done()
99✔
279

99✔
280
        updateAckTimer := time.NewTimer(backoff.JitDuration(
99✔
281
                t.options.UpdateAckInterval(),
99✔
282
                t.options.UpdateAckIntervalJitterCoefficient(),
99✔
283
        ))
99✔
284
        defer updateAckTimer.Stop()
99✔
285

99✔
286
        splitQueueTimer := time.NewTimer(backoff.JitDuration(
99✔
287
                t.options.SplitQueueInterval(),
99✔
288
                t.options.SplitQueueIntervalJitterCoefficient(),
99✔
289
        ))
99✔
290
        defer splitQueueTimer.Stop()
99✔
291

99✔
292
        maxPollTimer := time.NewTimer(backoff.JitDuration(
99✔
293
                t.options.MaxPollInterval(),
99✔
294
                t.options.MaxPollIntervalJitterCoefficient(),
99✔
295
        ))
99✔
296
        defer maxPollTimer.Stop()
99✔
297

99✔
298
processorPumpLoop:
99✔
299
        for {
6,245✔
300
                select {
6,146✔
301
                case <-t.shutdownCh:
99✔
302
                        break processorPumpLoop
99✔
303
                case <-t.notifyCh:
2,270✔
304
                        // notify all queue collections as they are waiting for the notification when there's
2,270✔
305
                        // no more task to process. For non-default queue, if we choose to do periodic polling
2,270✔
306
                        // in the future, then we don't need to notify them.
2,270✔
307
                        t.notifyAllQueueCollections()
2,270✔
308
                case <-maxPollTimer.C:
232✔
309
                        t.notifyAllQueueCollections()
232✔
310
                        maxPollTimer.Reset(backoff.JitDuration(
232✔
311
                                t.options.MaxPollInterval(),
232✔
312
                                t.options.MaxPollIntervalJitterCoefficient(),
232✔
313
                        ))
232✔
314
                case <-t.processCh:
2,600✔
315
                        maxRedispatchQueueSize := t.options.MaxRedispatchQueueSize()
2,600✔
316
                        if t.redispatcher.Size() > maxRedispatchQueueSize {
2,600✔
317
                                // has too many pending tasks in re-dispatch queue, block loading tasks from persistence
×
318
                                t.redispatcher.Redispatch(maxRedispatchQueueSize)
×
319
                                if t.redispatcher.Size() > maxRedispatchQueueSize {
×
320
                                        // if redispatcher still has a large number of tasks
×
321
                                        // this only happens when system is under very high load
×
322
                                        // we should backoff here instead of keeping submitting tasks to task processor
×
323
                                        time.Sleep(backoff.JitDuration(
×
324
                                                t.options.PollBackoffInterval(),
×
325
                                                t.options.PollBackoffIntervalJitterCoefficient(),
×
326
                                        ))
×
327
                                }
×
328
                                // re-enqueue the event to see if we need keep re-dispatching or load new tasks from persistence
329
                                select {
×
330
                                case t.processCh <- struct{}{}:
×
331
                                default:
×
332
                                }
333
                                continue processorPumpLoop
×
334
                        }
335

336
                        t.processQueueCollections()
2,600✔
337
                case <-updateAckTimer.C:
489✔
338
                        processFinished, _, err := t.updateAckLevel()
489✔
339
                        if err == shard.ErrShardClosed || (err == nil && processFinished) {
489✔
340
                                go t.Stop()
×
341
                                break processorPumpLoop
×
342
                        }
343
                        updateAckTimer.Reset(backoff.JitDuration(
489✔
344
                                t.options.UpdateAckInterval(),
489✔
345
                                t.options.UpdateAckIntervalJitterCoefficient(),
489✔
346
                        ))
489✔
347
                case <-splitQueueTimer.C:
233✔
348
                        t.splitQueue()
233✔
349
                        splitQueueTimer.Reset(backoff.JitDuration(
233✔
350
                                t.options.SplitQueueInterval(),
233✔
351
                                t.options.SplitQueueIntervalJitterCoefficient(),
233✔
352
                        ))
233✔
353
                case notification := <-t.actionNotifyCh:
232✔
354
                        t.handleActionNotification(notification)
232✔
355
                }
356
        }
357
}
358

359
func (t *transferQueueProcessorBase) notifyAllQueueCollections() {
2,598✔
360
        for _, queueCollection := range t.processingQueueCollections {
5,196✔
361
                t.readyForProcess(queueCollection.Level())
2,598✔
362
        }
2,598✔
363
}
364

365
func (t *transferQueueProcessorBase) processQueueCollections() {
2,604✔
366
        for _, queueCollection := range t.processingQueueCollections {
5,208✔
367
                level := queueCollection.Level()
2,604✔
368
                t.processingLock.Lock()
2,604✔
369
                if shouldProcess, ok := t.shouldProcess[level]; !ok || !shouldProcess {
2,604✔
370
                        t.processingLock.Unlock()
×
371
                        continue
×
372
                }
373
                t.shouldProcess[level] = false
2,604✔
374
                t.processingLock.Unlock()
2,604✔
375

2,604✔
376
                activeQueue := queueCollection.ActiveQueue()
2,604✔
377
                if activeQueue == nil {
2,604✔
378
                        // process for this queue collection has finished
×
379
                        // it's possible that new queue will be added to this collection later though,
×
380
                        // pollTime will be updated after split/merge
×
381
                        continue
×
382
                }
383

384
                readLevel := activeQueue.State().ReadLevel()
2,604✔
385
                maxReadLevel := minTaskKey(activeQueue.State().MaxLevel(), t.updateMaxReadLevel())
2,604✔
386
                domainFilter := activeQueue.State().DomainFilter()
2,604✔
387

2,604✔
388
                if !readLevel.Less(maxReadLevel) {
2,900✔
389
                        // no task need to be processed for now, wait for new task notification
296✔
390
                        // note that if taskID for new task is still less than readLevel, the notification
296✔
391
                        // will just be a no-op and there's no DB requests.
296✔
392
                        continue
296✔
393
                }
394

395
                ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay)
2,309✔
396
                if err := t.rateLimiter.Wait(ctx); err != nil {
2,309✔
397
                        cancel()
×
398
                        if level != defaultProcessingQueueLevel {
×
399
                                t.setupBackoffTimer(level)
×
400
                        } else {
×
401
                                t.readyForProcess(level)
×
402
                        }
×
403
                        continue
×
404
                }
405
                cancel()
2,309✔
406

2,309✔
407
                transferTaskInfos, more, err := t.readTasks(readLevel, maxReadLevel)
2,309✔
408
                if err != nil {
2,309✔
409
                        t.logger.Error("Processor unable to retrieve tasks", tag.Error(err))
×
410
                        t.readyForProcess(level) // re-enqueue the event
×
411
                        continue
×
412
                }
413
                t.logger.Debug("load transfer tasks from database",
2,309✔
414
                        tag.ReadLevel(readLevel.(transferTaskKey).taskID),
2,309✔
415
                        tag.MaxLevel(maxReadLevel.(transferTaskKey).taskID),
2,309✔
416
                        tag.Counter(len(transferTaskInfos)))
2,309✔
417

2,309✔
418
                tasks := make(map[task.Key]task.Task)
2,309✔
419
                taskChFull := false
2,309✔
420
                for _, taskInfo := range transferTaskInfos {
7,759✔
421
                        if !domainFilter.Filter(taskInfo.GetDomainID()) {
5,454✔
422
                                t.logger.Debug("transfer task filtered", tag.TaskID(taskInfo.GetTaskID()))
4✔
423
                                continue
4✔
424
                        }
425

426
                        task := t.taskInitializer(taskInfo)
5,446✔
427
                        tasks[newTransferTaskKey(taskInfo.GetTaskID())] = task
5,446✔
428
                        submitted, err := t.submitTask(task)
5,446✔
429
                        if err != nil {
5,449✔
430
                                // only err here is due to the fact that processor has been shutdown
3✔
431
                                // return instead of continue
3✔
432
                                return
3✔
433
                        }
3✔
434
                        taskChFull = taskChFull || !submitted
5,446✔
435
                }
436

437
                var newReadLevel task.Key
2,309✔
438
                if !more {
4,614✔
439
                        newReadLevel = maxReadLevel
2,305✔
440
                } else {
2,310✔
441
                        newReadLevel = newTransferTaskKey(transferTaskInfos[len(transferTaskInfos)-1].GetTaskID())
5✔
442
                }
5✔
443
                queueCollection.AddTasks(tasks, newReadLevel)
2,309✔
444
                if t.validator != nil {
2,309✔
445
                        t.logger.Debug("ack transfer tasks",
×
446
                                tag.ReadLevel(readLevel.(transferTaskKey).taskID),
×
447
                                tag.MaxLevel(newReadLevel.(transferTaskKey).taskID),
×
448
                                tag.Counter(len(tasks)))
×
449
                        t.validator.ackTasks(level, readLevel, newReadLevel, tasks)
×
450
                }
×
451

452
                newActiveQueue := queueCollection.ActiveQueue()
2,309✔
453
                if more || (newActiveQueue != nil && newActiveQueue != activeQueue) {
2,315✔
454
                        // more tasks for the current active queue or the active queue has changed
6✔
455
                        if level != defaultProcessingQueueLevel && taskChFull {
7✔
456
                                t.setupBackoffTimer(level)
1✔
457
                        } else {
6✔
458
                                t.readyForProcess(level)
5✔
459
                        }
5✔
460
                }
461

462
                // else it means we don't have tasks to process for now
463
                // wait for new task notification
464
                // another option for non-default queue is that we can setup a backoff timer to check back later
465
        }
466
}
467

468
func (t *transferQueueProcessorBase) splitQueue() {
233✔
469
        currentTime := t.shard.GetTimeSource().Now()
233✔
470
        currentMaxReadLevel := t.updateMaxReadLevel().(transferTaskKey).taskID
233✔
471
        defer func() {
466✔
472
                t.lastSplitTime = currentTime
233✔
473
                t.lastMaxReadLevel = currentMaxReadLevel
233✔
474
        }()
233✔
475

476
        if currentMaxReadLevel-t.lastMaxReadLevel < 2<<(t.shard.GetConfig().RangeSizeBits-1) {
405✔
477
                // only update the estimation when rangeID is not renewed
172✔
478
                // note the threshold here is only an estimation. If the read level increased too much
172✔
479
                // we will drop that data point.
172✔
480
                numTasksPerMinute := (currentMaxReadLevel - t.lastMaxReadLevel) / int64(currentTime.Sub(t.lastSplitTime).Seconds()) * int64(time.Minute.Seconds())
172✔
481

172✔
482
                if t.estimatedTasksPerMinute == 0 {
200✔
483
                        // set the initial value for the estimation
28✔
484
                        t.estimatedTasksPerMinute = numTasksPerMinute
28✔
485
                } else {
172✔
486
                        t.estimatedTasksPerMinute = int64(numTasksEstimationDecay*float64(t.estimatedTasksPerMinute) + (1-numTasksEstimationDecay)*float64(numTasksPerMinute))
144✔
487
                }
144✔
488
        }
489

490
        if t.lastSplitTime.IsZero() || t.estimatedTasksPerMinute == 0 {
298✔
491
                // skip the split as we can't estimate the look ahead taskID
65✔
492
                return
65✔
493
        }
65✔
494

495
        splitPolicy := t.initializeSplitPolicy(
168✔
496
                func(key task.Key, domainID string) task.Key {
168✔
497
                        totalLookAhead := t.estimatedTasksPerMinute * int64(t.options.SplitLookAheadDurationByDomainID(domainID).Minutes())
×
498
                        // ensure the above calculation doesn't overflow and cap the maximun look ahead interval
×
499
                        totalLookAhead = common.MaxInt64(common.MinInt64(totalLookAhead, 2<<t.shard.GetConfig().RangeSizeBits), 0)
×
500
                        return newTransferTaskKey(key.(transferTaskKey).taskID + totalLookAhead)
×
501
                },
×
502
        )
503

504
        t.splitProcessingQueueCollection(splitPolicy, func(level int, _ time.Time) {
168✔
505
                t.readyForProcess(level)
×
506
        })
×
507
}
508

509
func (t *transferQueueProcessorBase) handleActionNotification(notification actionNotification) {
232✔
510
        t.processorBase.handleActionNotification(notification, func() {
464✔
511
                switch notification.action.ActionType {
232✔
512
                case ActionTypeReset:
×
513
                        t.readyForProcess(defaultProcessingQueueLevel)
×
514
                }
515
        })
516
}
517

518
func (t *transferQueueProcessorBase) readTasks(
519
        readLevel task.Key,
520
        maxReadLevel task.Key,
521
) ([]*persistence.TransferTaskInfo, bool, error) {
2,311✔
522

2,311✔
523
        var response *persistence.GetTransferTasksResponse
2,311✔
524
        op := func() error {
4,622✔
525
                var err error
2,311✔
526
                response, err = t.shard.GetExecutionManager().GetTransferTasks(context.Background(), &persistence.GetTransferTasksRequest{
2,311✔
527
                        ReadLevel:    readLevel.(transferTaskKey).taskID,
2,311✔
528
                        MaxReadLevel: maxReadLevel.(transferTaskKey).taskID,
2,311✔
529
                        BatchSize:    t.options.BatchSize(),
2,311✔
530
                })
2,311✔
531
                return err
2,311✔
532
        }
2,311✔
533

534
        throttleRetry := backoff.NewThrottleRetry(
2,311✔
535
                backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
2,311✔
536
                backoff.WithRetryableError(persistence.IsBackgroundTransientError),
2,311✔
537
        )
2,311✔
538
        err := throttleRetry.Do(context.Background(), op)
2,311✔
539
        if err != nil {
2,311✔
540
                return nil, false, err
×
541
        }
×
542

543
        return response.Tasks, len(response.NextPageToken) != 0, nil
2,311✔
544
}
545

546
func newTransferTaskKey(
547
        taskID int64,
548
) task.Key {
8,644✔
549
        return transferTaskKey{
8,644✔
550
                taskID: taskID,
8,644✔
551
        }
8,644✔
552
}
8,644✔
553

554
func (k transferTaskKey) Less(
555
        key task.Key,
556
) bool {
50,173✔
557
        return k.taskID < key.(transferTaskKey).taskID
50,173✔
558
}
50,173✔
559

560
func newTransferQueueProcessorOptions(
561
        config *config.Config,
562
        isActive bool,
563
        isFailover bool,
564
) *queueProcessorOptions {
110✔
565
        options := &queueProcessorOptions{
110✔
566
                BatchSize:                            config.TransferTaskBatchSize,
110✔
567
                DeleteBatchSize:                      config.TransferTaskDeleteBatchSize,
110✔
568
                MaxPollRPS:                           config.TransferProcessorMaxPollRPS,
110✔
569
                MaxPollInterval:                      config.TransferProcessorMaxPollInterval,
110✔
570
                MaxPollIntervalJitterCoefficient:     config.TransferProcessorMaxPollIntervalJitterCoefficient,
110✔
571
                UpdateAckInterval:                    config.TransferProcessorUpdateAckInterval,
110✔
572
                UpdateAckIntervalJitterCoefficient:   config.TransferProcessorUpdateAckIntervalJitterCoefficient,
110✔
573
                RedispatchIntervalJitterCoefficient:  config.TaskRedispatchIntervalJitterCoefficient,
110✔
574
                MaxRedispatchQueueSize:               config.TransferProcessorMaxRedispatchQueueSize,
110✔
575
                SplitQueueInterval:                   config.TransferProcessorSplitQueueInterval,
110✔
576
                SplitQueueIntervalJitterCoefficient:  config.TransferProcessorSplitQueueIntervalJitterCoefficient,
110✔
577
                PollBackoffInterval:                  config.QueueProcessorPollBackoffInterval,
110✔
578
                PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
110✔
579
                EnableValidator:                      config.TransferProcessorEnableValidator,
110✔
580
                ValidationInterval:                   config.TransferProcessorValidationInterval,
110✔
581
        }
110✔
582

110✔
583
        if isFailover {
110✔
584
                // disable queue split for failover processor
×
585
                options.EnableSplit = dynamicconfig.GetBoolPropertyFn(false)
×
586

×
587
                // disable persist and load processing queue states for failover processor as it will never be split
×
588
                options.EnablePersistQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
589
                options.EnableLoadQueueStates = dynamicconfig.GetBoolPropertyFn(false)
×
590

×
591
                options.MaxStartJitterInterval = config.TransferProcessorFailoverMaxStartJitterInterval
×
592
        } else {
110✔
593
                options.EnableSplit = config.QueueProcessorEnableSplit
110✔
594
                options.SplitMaxLevel = config.QueueProcessorSplitMaxLevel
110✔
595
                options.EnableRandomSplitByDomainID = config.QueueProcessorEnableRandomSplitByDomainID
110✔
596
                options.RandomSplitProbability = config.QueueProcessorRandomSplitProbability
110✔
597
                options.EnablePendingTaskSplitByDomainID = config.QueueProcessorEnablePendingTaskSplitByDomainID
110✔
598
                options.PendingTaskSplitThreshold = config.QueueProcessorPendingTaskSplitThreshold
110✔
599
                options.EnableStuckTaskSplitByDomainID = config.QueueProcessorEnableStuckTaskSplitByDomainID
110✔
600
                options.StuckTaskSplitThreshold = config.QueueProcessorStuckTaskSplitThreshold
110✔
601
                options.SplitLookAheadDurationByDomainID = config.QueueProcessorSplitLookAheadDurationByDomainID
110✔
602

110✔
603
                options.EnablePersistQueueStates = config.QueueProcessorEnablePersistQueueStates
110✔
604
                options.EnableLoadQueueStates = config.QueueProcessorEnableLoadQueueStates
110✔
605

110✔
606
                options.MaxStartJitterInterval = dynamicconfig.GetDurationPropertyFn(0)
110✔
607
        }
110✔
608

609
        if isActive {
172✔
610
                options.MetricScope = metrics.TransferActiveQueueProcessorScope
62✔
611
                options.RedispatchInterval = config.ActiveTaskRedispatchInterval
62✔
612
        } else {
113✔
613
                options.MetricScope = metrics.TransferStandbyQueueProcessorScope
51✔
614
                options.RedispatchInterval = config.StandbyTaskRedispatchInterval
51✔
615
        }
51✔
616

617
        return options
110✔
618
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc