• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018f64f4-c8b4-4dac-8ed7-996896dd65a4

11 May 2024 12:01AM UTC coverage: 69.148% (-0.01%) from 69.162%
018f64f4-c8b4-4dac-8ed7-996896dd65a4

push

buildkite

web-flow
Write tests for replication task processor main loop (#6010)

1 of 1 new or added line in 1 file covered. (100.0%)

229 existing lines in 18 files now uncovered.

101597 of 146926 relevant lines covered (69.15%)

2644.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.89
/service/history/queue/timer_queue_processor.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/uber/cadence/common"
32
        "github.com/uber/cadence/common/log"
33
        "github.com/uber/cadence/common/log/tag"
34
        "github.com/uber/cadence/common/metrics"
35
        "github.com/uber/cadence/common/ndc"
36
        "github.com/uber/cadence/common/persistence"
37
        "github.com/uber/cadence/common/reconciliation/invariant"
38
        "github.com/uber/cadence/common/types"
39
        hcommon "github.com/uber/cadence/service/history/common"
40
        "github.com/uber/cadence/service/history/config"
41
        "github.com/uber/cadence/service/history/engine"
42
        "github.com/uber/cadence/service/history/execution"
43
        "github.com/uber/cadence/service/history/shard"
44
        "github.com/uber/cadence/service/history/task"
45
        "github.com/uber/cadence/service/worker/archiver"
46
)
47

48
type timerQueueProcessor struct {
49
        shard         shard.Context
50
        historyEngine engine.Engine
51
        taskProcessor task.Processor
52

53
        config             *config.Config
54
        currentClusterName string
55

56
        metricsClient metrics.Client
57
        logger        log.Logger
58

59
        status       int32
60
        shutdownChan chan struct{}
61
        shutdownWG   sync.WaitGroup
62

63
        ackLevel                time.Time
64
        taskAllocator           TaskAllocator
65
        activeTaskExecutor      task.Executor
66
        activeQueueProcessor    *timerQueueProcessorBase
67
        standbyQueueProcessors  map[string]*timerQueueProcessorBase
68
        standbyTaskExecutors    []task.Executor
69
        standbyQueueTimerGates  map[string]RemoteTimerGate
70
        failoverQueueProcessors []*timerQueueProcessorBase
71
}
72

73
// NewTimerQueueProcessor creates a new timer QueueProcessor
74
func NewTimerQueueProcessor(
75
        shard shard.Context,
76
        historyEngine engine.Engine,
77
        taskProcessor task.Processor,
78
        executionCache *execution.Cache,
79
        archivalClient archiver.Client,
80
        executionCheck invariant.Invariant,
81
) Processor {
76✔
82
        logger := shard.GetLogger().WithTags(tag.ComponentTimerQueue)
76✔
83
        currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
76✔
84
        config := shard.GetConfig()
76✔
85
        taskAllocator := NewTaskAllocator(shard)
76✔
86

76✔
87
        activeTaskExecutor := task.NewTimerActiveTaskExecutor(
76✔
88
                shard,
76✔
89
                archivalClient,
76✔
90
                executionCache,
76✔
91
                logger,
76✔
92
                shard.GetMetricsClient(),
76✔
93
                config,
76✔
94
        )
76✔
95

76✔
96
        activeQueueProcessor := newTimerQueueActiveProcessor(
76✔
97
                currentClusterName,
76✔
98
                shard,
76✔
99
                historyEngine,
76✔
100
                taskProcessor,
76✔
101
                taskAllocator,
76✔
102
                activeTaskExecutor,
76✔
103
                logger,
76✔
104
        )
76✔
105

76✔
106
        standbyTaskExecutors := make([]task.Executor, 0, len(shard.GetClusterMetadata().GetRemoteClusterInfo()))
76✔
107
        standbyQueueProcessors := make(map[string]*timerQueueProcessorBase)
76✔
108
        standbyQueueTimerGates := make(map[string]RemoteTimerGate)
76✔
109
        for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() {
152✔
110
                historyResender := ndc.NewHistoryResender(
76✔
111
                        shard.GetDomainCache(),
76✔
112
                        shard.GetService().GetClientBean().GetRemoteAdminClient(clusterName),
76✔
113
                        func(ctx context.Context, request *types.ReplicateEventsV2Request) error {
76✔
114
                                return historyEngine.ReplicateEventsV2(ctx, request)
×
UNCOV
115
                        },
×
116
                        config.StandbyTaskReReplicationContextTimeout,
117
                        executionCheck,
118
                        shard.GetLogger(),
119
                )
120
                standbyTaskExecutor := task.NewTimerStandbyTaskExecutor(
76✔
121
                        shard,
76✔
122
                        archivalClient,
76✔
123
                        executionCache,
76✔
124
                        historyResender,
76✔
125
                        logger,
76✔
126
                        shard.GetMetricsClient(),
76✔
127
                        clusterName,
76✔
128
                        config,
76✔
129
                )
76✔
130
                standbyTaskExecutors = append(standbyTaskExecutors, standbyTaskExecutor)
76✔
131
                standbyQueueProcessors[clusterName], standbyQueueTimerGates[clusterName] = newTimerQueueStandbyProcessor(
76✔
132
                        clusterName,
76✔
133
                        shard,
76✔
134
                        historyEngine,
76✔
135
                        taskProcessor,
76✔
136
                        taskAllocator,
76✔
137
                        standbyTaskExecutor,
76✔
138
                        logger,
76✔
139
                )
76✔
140
        }
141

142
        return &timerQueueProcessor{
76✔
143
                shard:         shard,
76✔
144
                historyEngine: historyEngine,
76✔
145
                taskProcessor: taskProcessor,
76✔
146

76✔
147
                config:             config,
76✔
148
                currentClusterName: currentClusterName,
76✔
149

76✔
150
                metricsClient: shard.GetMetricsClient(),
76✔
151
                logger:        logger,
76✔
152

76✔
153
                status:       common.DaemonStatusInitialized,
76✔
154
                shutdownChan: make(chan struct{}),
76✔
155

76✔
156
                ackLevel:               shard.GetTimerAckLevel(),
76✔
157
                taskAllocator:          taskAllocator,
76✔
158
                activeTaskExecutor:     activeTaskExecutor,
76✔
159
                activeQueueProcessor:   activeQueueProcessor,
76✔
160
                standbyQueueProcessors: standbyQueueProcessors,
76✔
161
                standbyQueueTimerGates: standbyQueueTimerGates,
76✔
162
                standbyTaskExecutors:   standbyTaskExecutors,
76✔
163
        }
76✔
164
}
165

166
func (t *timerQueueProcessor) Start() {
75✔
167
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
75✔
168
                return
×
UNCOV
169
        }
×
170

171
        t.activeQueueProcessor.Start()
75✔
172
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
150✔
173
                standbyQueueProcessor.Start()
75✔
174
        }
75✔
175

176
        t.shutdownWG.Add(1)
75✔
177
        go t.completeTimerLoop()
75✔
178
}
179

180
func (t *timerQueueProcessor) Stop() {
75✔
181
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
75✔
182
                return
×
UNCOV
183
        }
×
184

185
        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
150✔
186
                t.activeQueueProcessor.Stop()
75✔
187
                // stop active executor after queue processor
75✔
188
                t.activeTaskExecutor.Stop()
75✔
189
                for _, standbyQueueProcessor := range t.standbyQueueProcessors {
150✔
190
                        standbyQueueProcessor.Stop()
75✔
191
                }
75✔
192

193
                // stop standby executors after queue processors
194
                for _, standbyTaskExecutor := range t.standbyTaskExecutors {
150✔
195
                        standbyTaskExecutor.Stop()
75✔
196
                }
75✔
197

198
                close(t.shutdownChan)
75✔
199
                common.AwaitWaitGroup(&t.shutdownWG, time.Minute)
75✔
200
                return
75✔
201
        }
202

203
        // close the shutdown channel first so processor pumps drains tasks
204
        // and then stop the processors
205
        close(t.shutdownChan)
×
206
        if !common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout) {
×
207
                t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout)
×
208
        }
×
209
        t.activeQueueProcessor.Stop()
×
210

×
211
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
×
212
                standbyQueueProcessor.Stop()
×
UNCOV
213
        }
×
214

215
        // stop standby executors after queue processors
216
        for _, standbyTaskExecutor := range t.standbyTaskExecutors {
×
217
                standbyTaskExecutor.Stop()
×
UNCOV
218
        }
×
219

UNCOV
220
        if len(t.failoverQueueProcessors) > 0 {
×
UNCOV
221
                t.logger.Info("Shutting down failover timer queues", tag.Counter(len(t.failoverQueueProcessors)))
×
UNCOV
222
                for _, failoverQueueProcessor := range t.failoverQueueProcessors {
×
UNCOV
223
                        failoverQueueProcessor.Stop()
×
UNCOV
224
                }
×
225
        }
226

UNCOV
227
        t.activeTaskExecutor.Stop()
×
228
}
229

230
func (t *timerQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
3,113✔
231
        if clusterName == t.currentClusterName {
6,226✔
232
                t.activeQueueProcessor.notifyNewTimers(info.Tasks)
3,113✔
233
                return
3,113✔
234
        }
3,113✔
235

236
        standbyQueueProcessor, ok := t.standbyQueueProcessors[clusterName]
3✔
237
        if !ok {
3✔
UNCOV
238
                panic(fmt.Sprintf("Cannot find standby timer processor for %s.", clusterName))
×
239
        }
240

241
        standbyQueueTimerGate, ok := t.standbyQueueTimerGates[clusterName]
3✔
242
        if !ok {
3✔
243
                panic(fmt.Sprintf("Cannot find standby timer gate for %s.", clusterName))
×
244
        }
245

246
        curTime := t.shard.GetCurrentTime(clusterName)
3✔
247
        standbyQueueTimerGate.SetCurrentTime(curTime)
3✔
248
        t.logger.Debug("Current time for standby queue timergate is updated", tag.ClusterName(clusterName), tag.Timestamp(curTime))
3✔
249
        standbyQueueProcessor.notifyNewTimers(info.Tasks)
3✔
250
}
251

252
func (t *timerQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) {
×
253
        // Failover queue is used to scan all inflight tasks, if queue processor is not
×
254
        // started, there's no inflight task and we don't need to create a failover processor.
×
255
        // Also the HandleAction will be blocked if queue processor processing loop is not running.
×
256
        if atomic.LoadInt32(&t.status) != common.DaemonStatusStarted {
×
257
                return
×
UNCOV
258
        }
×
259

260
        minLevel := t.shard.GetTimerClusterAckLevel(t.currentClusterName)
×
261
        standbyClusterName := t.currentClusterName
×
262
        for clusterName := range t.shard.GetClusterMetadata().GetEnabledClusterInfo() {
×
263
                ackLevel := t.shard.GetTimerClusterAckLevel(clusterName)
×
264
                if ackLevel.Before(minLevel) {
×
UNCOV
265
                        minLevel = ackLevel
×
266
                        standbyClusterName = clusterName
×
267
                }
×
268
        }
269

270
        if standbyClusterName != t.currentClusterName {
×
271
                t.logger.Debugf("Timer queue failover will use minLevel: %v from standbyClusterName: %s", minLevel, standbyClusterName)
×
272
        } else {
×
273
                t.logger.Debugf("Timer queue failover will use minLevel: %v from current cluster: %s", minLevel, t.currentClusterName)
×
UNCOV
274
        }
×
275

UNCOV
276
        maxReadLevel := time.Time{}
×
UNCOV
277
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
×
278
        if err != nil {
×
279
                t.logger.Error("Timer failover failed while getting queue states", tag.WorkflowDomainIDs(domainIDs), tag.Error(err))
×
280
                if err == errProcessorShutdown {
×
281
                        // processor/shard already shutdown, we don't need to create failover queue processor
×
282
                        return
×
283
                }
×
284
                // other errors should never be returned for GetStateAction
UNCOV
285
                panic(fmt.Sprintf("unknown error for GetStateAction: %v", err))
×
286
        }
287

288
        var maxReadLevelQueueLevel int
×
289
        for _, queueState := range actionResult.GetStateActionResult.States {
×
UNCOV
290
                queueReadLevel := queueState.ReadLevel().(timerTaskKey).visibilityTimestamp
×
UNCOV
291
                if maxReadLevel.Before(queueReadLevel) {
×
292
                        maxReadLevel = queueReadLevel
×
293
                        maxReadLevelQueueLevel = queueState.Level()
×
294
                }
×
295
        }
296

297
        if !maxReadLevel.IsZero() {
×
298
                t.logger.Debugf("Timer queue failover will use maxReadLevel: %v from queue at level: %v", maxReadLevel, maxReadLevelQueueLevel)
×
299
        }
×
300

301
        // TODO: Below Add call has no effect, understand the underlying intent and fix it.
302
        maxReadLevel.Add(1 * time.Millisecond)
×
303

×
304
        t.logger.Info("Timer Failover Triggered",
×
305
                tag.WorkflowDomainIDs(domainIDs),
×
306
                tag.MinLevel(minLevel.UnixNano()),
×
307
                tag.MaxLevel(maxReadLevel.UnixNano()),
×
308
        )
×
309

×
310
        updateClusterAckLevelFn, failoverQueueProcessor := newTimerQueueFailoverProcessor(
×
311
                standbyClusterName,
×
312
                t.shard,
×
313
                t.taskProcessor,
×
314
                t.taskAllocator,
×
315
                t.activeTaskExecutor,
×
316
                t.logger,
×
317
                minLevel,
×
318
                maxReadLevel,
×
319
                domainIDs,
×
UNCOV
320
        )
×
UNCOV
321

×
UNCOV
322
        // NOTE: READ REF BEFORE MODIFICATION
×
UNCOV
323
        // ref: historyEngine.go registerDomainFailoverCallback function
×
UNCOV
324
        err = updateClusterAckLevelFn(newTimerTaskKey(minLevel, 0))
×
UNCOV
325
        if err != nil {
×
UNCOV
326
                t.logger.Error("Error update shard ack level", tag.Error(err))
×
UNCOV
327
        }
×
328

329
        // Failover queue processors are started on the fly when domains are failed over.
330
        // Failover queue processors will be stopped when the timer queue instance is stopped (due to restart or shard movement).
331
        // This means the failover queue processor might not finish its job.
332
        // There is no mechanism to re-start ongoing failover queue processors in the new shard owner.
UNCOV
333
        t.failoverQueueProcessors = append(t.failoverQueueProcessors, failoverQueueProcessor)
×
UNCOV
334
        failoverQueueProcessor.Start()
×
335
}
336

337
func (t *timerQueueProcessor) HandleAction(ctx context.Context, clusterName string, action *Action) (*ActionResult, error) {
291✔
338
        var resultNotificationCh chan actionResultNotification
291✔
339
        var added bool
291✔
340
        if clusterName == t.currentClusterName {
474✔
341
                resultNotificationCh, added = t.activeQueueProcessor.addAction(ctx, action)
183✔
342
        } else {
293✔
343
                found := false
110✔
344
                for standbyClusterName, standbyProcessor := range t.standbyQueueProcessors {
220✔
345
                        if clusterName == standbyClusterName {
220✔
346
                                resultNotificationCh, added = standbyProcessor.addAction(ctx, action)
110✔
347
                                found = true
110✔
348
                                break
110✔
349
                        }
350
                }
351

352
                if !found {
110✔
353
                        return nil, fmt.Errorf("unknown cluster name: %v", clusterName)
×
354
                }
×
355
        }
356

357
        if !added {
366✔
358
                if ctxErr := ctx.Err(); ctxErr != nil {
75✔
UNCOV
359
                        return nil, ctxErr
×
UNCOV
360
                }
×
361
                return nil, errProcessorShutdown
75✔
362
        }
363

364
        select {
218✔
365
        case resultNotification := <-resultNotificationCh:
218✔
366
                return resultNotification.result, resultNotification.err
218✔
UNCOV
367
        case <-t.shutdownChan:
×
UNCOV
368
                return nil, errProcessorShutdown
×
UNCOV
369
        case <-ctx.Done():
×
UNCOV
370
                return nil, ctx.Err()
×
371
        }
372
}
373

374
func (t *timerQueueProcessor) LockTaskProcessing() {
798✔
375
        t.taskAllocator.Lock()
798✔
376
}
798✔
377

378
func (t *timerQueueProcessor) UnlockTaskProcessing() {
798✔
379
        t.taskAllocator.Unlock()
798✔
380
}
798✔
381

382
func (t *timerQueueProcessor) drain() {
75✔
383
        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
150✔
384
                if err := t.completeTimer(context.Background()); err != nil {
150✔
385
                        t.logger.Error("Failed to complete timer task during drain", tag.Error(err))
75✔
386
                }
75✔
387
                return
75✔
388
        }
389

390
        // when graceful shutdown is enabled for queue processor, use a context with timeout
UNCOV
391
        ctx, cancel := context.WithTimeout(context.Background(), gracefulShutdownTimeout)
×
UNCOV
392
        defer cancel()
×
UNCOV
393
        if err := t.completeTimer(ctx); err != nil {
×
UNCOV
394
                t.logger.Error("Failed to complete timer task during drain", tag.Error(err))
×
UNCOV
395
        }
×
396
}
397

398
func (t *timerQueueProcessor) completeTimerLoop() {
75✔
399
        defer t.shutdownWG.Done()
75✔
400

75✔
401
        completeTimer := time.NewTimer(t.config.TimerProcessorCompleteTimerInterval())
75✔
402
        defer completeTimer.Stop()
75✔
403

75✔
404
        for {
258✔
405
                select {
183✔
406
                case <-t.shutdownChan:
75✔
407
                        t.drain()
75✔
408
                        return
75✔
409
                case <-completeTimer.C:
110✔
410
                        for attempt := 0; attempt < t.config.TimerProcessorCompleteTimerFailureRetryCount(); attempt++ {
220✔
411
                                err := t.completeTimer(context.Background())
110✔
412
                                if err == nil {
220✔
413
                                        break
110✔
414
                                }
415

416
                                t.logger.Error("Failed to complete timer task", tag.Error(err))
×
417
                                var errShardClosed *shard.ErrShardClosed
×
UNCOV
418
                                if errors.As(err, &errShardClosed) {
×
UNCOV
419
                                        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
×
UNCOV
420
                                                go t.Stop()
×
UNCOV
421
                                                return
×
UNCOV
422
                                        }
×
423

UNCOV
424
                                        t.Stop()
×
UNCOV
425
                                        return
×
426
                                }
427

UNCOV
428
                                select {
×
UNCOV
429
                                case <-t.shutdownChan:
×
UNCOV
430
                                        t.drain()
×
UNCOV
431
                                        return
×
UNCOV
432
                                case <-time.After(time.Duration(attempt*100) * time.Millisecond):
×
433
                                        // do nothing. retry loop will continue
434
                                }
435
                        }
436

437
                        completeTimer.Reset(t.config.TimerProcessorCompleteTimerInterval())
110✔
438
                }
439
        }
440
}
441

442
func (t *timerQueueProcessor) completeTimer(ctx context.Context) error {
183✔
443
        newAckLevel := maximumTimerTaskKey
183✔
444
        actionResult, err := t.HandleAction(ctx, t.currentClusterName, NewGetStateAction())
183✔
445
        if err != nil {
258✔
446
                return err
75✔
447
        }
75✔
448
        for _, queueState := range actionResult.GetStateActionResult.States {
220✔
449
                newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
110✔
450
        }
110✔
451

452
        for standbyClusterName := range t.standbyQueueProcessors {
220✔
453
                actionResult, err := t.HandleAction(ctx, standbyClusterName, NewGetStateAction())
110✔
454
                if err != nil {
110✔
UNCOV
455
                        return err
×
UNCOV
456
                }
×
457
                for _, queueState := range actionResult.GetStateActionResult.States {
220✔
458
                        newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
110✔
459
                }
110✔
460
        }
461

462
        for _, failoverInfo := range t.shard.GetAllTimerFailoverLevels() {
110✔
UNCOV
463
                failoverLevel := newTimerTaskKey(failoverInfo.MinLevel, 0)
×
UNCOV
464
                newAckLevel = minTaskKey(newAckLevel, failoverLevel)
×
UNCOV
465
        }
×
466

467
        if newAckLevel == maximumTimerTaskKey {
110✔
UNCOV
468
                panic("Unable to get timer queue processor ack level")
×
469
        }
470

471
        newAckLevelTimestamp := newAckLevel.(timerTaskKey).visibilityTimestamp
110✔
472
        if !t.ackLevel.Before(newAckLevelTimestamp) {
182✔
473
                t.logger.Debugf("Skipping timer task completion because new ack level %v is not before ack level %v", newAckLevelTimestamp, t.ackLevel)
72✔
474
                return nil
72✔
475
        }
72✔
476

477
        t.logger.Debugf("Start completing timer task from: %v, to %v", t.ackLevel, newAckLevelTimestamp)
38✔
478
        t.metricsClient.Scope(metrics.TimerQueueProcessorScope).
38✔
479
                Tagged(metrics.ShardIDTag(t.shard.GetShardID())).
38✔
480
                IncCounter(metrics.TaskBatchCompleteCounter)
38✔
481

38✔
482
        totalDeleted := 0
38✔
483
        for {
76✔
484
                pageSize := t.config.TimerTaskDeleteBatchSize()
38✔
485
                resp, err := t.shard.GetExecutionManager().RangeCompleteTimerTask(ctx, &persistence.RangeCompleteTimerTaskRequest{
38✔
486
                        InclusiveBeginTimestamp: t.ackLevel,
38✔
487
                        ExclusiveEndTimestamp:   newAckLevelTimestamp,
38✔
488
                        PageSize:                pageSize, // pageSize may or may not be honored
38✔
489
                })
38✔
490
                if err != nil {
38✔
UNCOV
491
                        return err
×
UNCOV
492
                }
×
493

494
                totalDeleted += resp.TasksCompleted
38✔
495
                t.logger.Debug("Timer task batch deletion", tag.Dynamic("page-size", pageSize), tag.Dynamic("total-deleted", totalDeleted))
38✔
496
                if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, pageSize) {
76✔
497
                        break
38✔
498
                }
499
        }
500

501
        t.ackLevel = newAckLevelTimestamp
38✔
502

38✔
503
        return t.shard.UpdateTimerAckLevel(t.ackLevel)
38✔
504
}
505

506
func loadTimerProcessingQueueStates(
507
        clusterName string,
508
        shard shard.Context,
509
        options *queueProcessorOptions,
510
        logger log.Logger,
511
) []ProcessingQueueState {
149✔
512
        ackLevel := shard.GetTimerClusterAckLevel(clusterName)
149✔
513
        if options.EnableLoadQueueStates() {
298✔
514
                pStates := shard.GetTimerProcessingQueueStates(clusterName)
149✔
515
                if validateProcessingQueueStates(pStates, ackLevel) {
298✔
516
                        return convertFromPersistenceTimerProcessingQueueStates(pStates)
149✔
517
                }
149✔
518

UNCOV
519
                logger.Error("Incompatible processing queue states and ackLevel",
×
UNCOV
520
                        tag.Value(pStates),
×
UNCOV
521
                        tag.ShardTimerAcks(ackLevel),
×
UNCOV
522
                )
×
523
        }
524

UNCOV
525
        return []ProcessingQueueState{
×
UNCOV
526
                NewProcessingQueueState(
×
UNCOV
527
                        defaultProcessingQueueLevel,
×
UNCOV
528
                        newTimerTaskKey(ackLevel, 0),
×
UNCOV
529
                        maximumTimerTaskKey,
×
UNCOV
530
                        NewDomainFilter(nil, true),
×
UNCOV
531
                ),
×
UNCOV
532
        }
×
533
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc