• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0186c82d-bf3f-4ccd-97bb-e4ae87e88b45

09 Mar 2023 09:21PM UTC coverage: 57.121% (+0.07%) from 57.053%
0186c82d-bf3f-4ccd-97bb-e4ae87e88b45

push

buildkite

GitHub
[history] add domain status check in taskfilter (#5140)

36 of 36 new or added lines in 3 files covered. (100.0%)

85302 of 149335 relevant lines covered (57.12%)

2297.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.57
/service/history/queue/timer_queue_processor.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "fmt"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/pborman/uuid"
31

32
        "github.com/uber/cadence/common"
33
        "github.com/uber/cadence/common/log"
34
        "github.com/uber/cadence/common/log/tag"
35
        "github.com/uber/cadence/common/metrics"
36
        "github.com/uber/cadence/common/ndc"
37
        "github.com/uber/cadence/common/persistence"
38
        "github.com/uber/cadence/common/reconciliation/invariant"
39
        "github.com/uber/cadence/common/types"
40
        hcommon "github.com/uber/cadence/service/history/common"
41
        "github.com/uber/cadence/service/history/config"
42
        "github.com/uber/cadence/service/history/engine"
43
        "github.com/uber/cadence/service/history/execution"
44
        "github.com/uber/cadence/service/history/shard"
45
        "github.com/uber/cadence/service/history/task"
46
        "github.com/uber/cadence/service/worker/archiver"
47
)
48

49
type (
50
        timerQueueProcessor struct {
51
                shard         shard.Context
52
                historyEngine engine.Engine
53
                taskProcessor task.Processor
54

55
                config             *config.Config
56
                currentClusterName string
57

58
                metricsClient metrics.Client
59
                logger        log.Logger
60

61
                status       int32
62
                shutdownChan chan struct{}
63
                shutdownWG   sync.WaitGroup
64

65
                ackLevel               time.Time
66
                taskAllocator          TaskAllocator
67
                activeTaskExecutor     task.Executor
68
                activeQueueProcessor   *timerQueueProcessorBase
69
                standbyQueueProcessors map[string]*timerQueueProcessorBase
70
                standbyQueueTimerGates map[string]RemoteTimerGate
71
        }
72
)
73

74
// NewTimerQueueProcessor creates a new timer QueueProcessor
75
func NewTimerQueueProcessor(
76
        shard shard.Context,
77
        historyEngine engine.Engine,
78
        taskProcessor task.Processor,
79
        executionCache *execution.Cache,
80
        archivalClient archiver.Client,
81
        executionCheck invariant.Invariant,
82
) Processor {
51✔
83
        logger := shard.GetLogger().WithTags(tag.ComponentTimerQueue)
51✔
84
        currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
51✔
85
        config := shard.GetConfig()
51✔
86
        taskAllocator := NewTaskAllocator(shard)
51✔
87

51✔
88
        activeTaskExecutor := task.NewTimerActiveTaskExecutor(
51✔
89
                shard,
51✔
90
                archivalClient,
51✔
91
                executionCache,
51✔
92
                logger,
51✔
93
                shard.GetMetricsClient(),
51✔
94
                config,
51✔
95
        )
51✔
96

51✔
97
        activeQueueProcessor := newTimerQueueActiveProcessor(
51✔
98
                currentClusterName,
51✔
99
                shard,
51✔
100
                historyEngine,
51✔
101
                taskProcessor,
51✔
102
                taskAllocator,
51✔
103
                activeTaskExecutor,
51✔
104
                logger,
51✔
105
        )
51✔
106

51✔
107
        standbyQueueProcessors := make(map[string]*timerQueueProcessorBase)
51✔
108
        standbyQueueTimerGates := make(map[string]RemoteTimerGate)
51✔
109
        for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() {
102✔
110
                historyResender := ndc.NewHistoryResender(
51✔
111
                        shard.GetDomainCache(),
51✔
112
                        shard.GetService().GetClientBean().GetRemoteAdminClient(clusterName),
51✔
113
                        func(ctx context.Context, request *types.ReplicateEventsV2Request) error {
51✔
114
                                return historyEngine.ReplicateEventsV2(ctx, request)
×
115
                        },
×
116
                        config.StandbyTaskReReplicationContextTimeout,
117
                        executionCheck,
118
                        shard.GetLogger(),
119
                )
120
                standbyTaskExecutor := task.NewTimerStandbyTaskExecutor(
51✔
121
                        shard,
51✔
122
                        archivalClient,
51✔
123
                        executionCache,
51✔
124
                        historyResender,
51✔
125
                        logger,
51✔
126
                        shard.GetMetricsClient(),
51✔
127
                        clusterName,
51✔
128
                        config,
51✔
129
                )
51✔
130
                standbyQueueProcessors[clusterName], standbyQueueTimerGates[clusterName] = newTimerQueueStandbyProcessor(
51✔
131
                        clusterName,
51✔
132
                        shard,
51✔
133
                        historyEngine,
51✔
134
                        taskProcessor,
51✔
135
                        taskAllocator,
51✔
136
                        standbyTaskExecutor,
51✔
137
                        logger,
51✔
138
                )
51✔
139
        }
140

141
        return &timerQueueProcessor{
51✔
142
                shard:         shard,
51✔
143
                historyEngine: historyEngine,
51✔
144
                taskProcessor: taskProcessor,
51✔
145

51✔
146
                config:             config,
51✔
147
                currentClusterName: currentClusterName,
51✔
148

51✔
149
                metricsClient: shard.GetMetricsClient(),
51✔
150
                logger:        logger,
51✔
151

51✔
152
                status:       common.DaemonStatusInitialized,
51✔
153
                shutdownChan: make(chan struct{}),
51✔
154

51✔
155
                ackLevel:               shard.GetTimerAckLevel(),
51✔
156
                taskAllocator:          taskAllocator,
51✔
157
                activeTaskExecutor:     activeTaskExecutor,
51✔
158
                activeQueueProcessor:   activeQueueProcessor,
51✔
159
                standbyQueueProcessors: standbyQueueProcessors,
51✔
160
                standbyQueueTimerGates: standbyQueueTimerGates,
51✔
161
        }
51✔
162
}
163

164
func (t *timerQueueProcessor) Start() {
51✔
165
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
51✔
166
                return
×
167
        }
×
168

169
        t.activeQueueProcessor.Start()
51✔
170
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
102✔
171
                standbyQueueProcessor.Start()
51✔
172
        }
51✔
173

174
        t.shutdownWG.Add(1)
51✔
175
        go t.completeTimerLoop()
51✔
176
}
177

178
func (t *timerQueueProcessor) Stop() {
51✔
179
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
51✔
180
                return
×
181
        }
×
182

183
        t.activeQueueProcessor.Stop()
51✔
184
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
102✔
185
                standbyQueueProcessor.Stop()
51✔
186
        }
51✔
187

188
        close(t.shutdownChan)
51✔
189
        common.AwaitWaitGroup(&t.shutdownWG, time.Minute)
51✔
190
}
191

192
func (t *timerQueueProcessor) NotifyNewTask(
193
        clusterName string,
194
        info *hcommon.NotifyTaskInfo,
195
) {
3,073✔
196
        if clusterName == t.currentClusterName {
6,146✔
197
                t.activeQueueProcessor.notifyNewTimers(info.Tasks)
3,073✔
198
                return
3,073✔
199
        }
3,073✔
200

201
        standbyQueueProcessor, ok := t.standbyQueueProcessors[clusterName]
3✔
202
        if !ok {
3✔
203
                panic(fmt.Sprintf("Cannot find timer processor for %s.", clusterName))
×
204
        }
205

206
        standbyQueueTimerGate, ok := t.standbyQueueTimerGates[clusterName]
3✔
207
        if !ok {
3✔
208
                panic(fmt.Sprintf("Cannot find timer gate for %s.", clusterName))
×
209
        }
210

211
        standbyQueueTimerGate.SetCurrentTime(t.shard.GetCurrentTime(clusterName))
3✔
212
        standbyQueueProcessor.notifyNewTimers(info.Tasks)
3✔
213
}
214

215
func (t *timerQueueProcessor) FailoverDomain(
216
        domainIDs map[string]struct{},
217
) {
×
218
        // Failover queue is used to scan all inflight tasks, if queue processor is not
×
219
        // started, there's no inflight task and we don't need to create a failover processor.
×
220
        // Also the HandleAction will be blocked if queue processor processing loop is not running.
×
221
        if atomic.LoadInt32(&t.status) != common.DaemonStatusStarted {
×
222
                return
×
223
        }
×
224

225
        minLevel := t.shard.GetTimerClusterAckLevel(t.currentClusterName)
×
226
        standbyClusterName := t.currentClusterName
×
227
        for clusterName := range t.shard.GetClusterMetadata().GetEnabledClusterInfo() {
×
228
                ackLevel := t.shard.GetTimerClusterAckLevel(clusterName)
×
229
                if ackLevel.Before(minLevel) {
×
230
                        minLevel = ackLevel
×
231
                        standbyClusterName = clusterName
×
232
                }
×
233
        }
234

235
        maxReadLevel := time.Time{}
×
236
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
×
237
        if err != nil {
×
238
                t.logger.Error("Timer Failover Failed", tag.WorkflowDomainIDs(domainIDs), tag.Error(err))
×
239
                if err == errProcessorShutdown {
×
240
                        // processor/shard already shutdown, we don't need to create failover queue processor
×
241
                        return
×
242
                }
×
243
                // other errors should never be returned for GetStateAction
244
                panic(fmt.Sprintf("unknown error for GetStateAction: %v", err))
×
245
        }
246
        for _, queueState := range actionResult.GetStateActionResult.States {
×
247
                queueReadLevel := queueState.ReadLevel().(timerTaskKey).visibilityTimestamp
×
248
                if maxReadLevel.Before(queueReadLevel) {
×
249
                        maxReadLevel = queueReadLevel
×
250
                }
×
251
        }
252
        maxReadLevel.Add(1 * time.Millisecond)
×
253

×
254
        t.logger.Info("Timer Failover Triggered",
×
255
                tag.WorkflowDomainIDs(domainIDs),
×
256
                tag.MinLevel(minLevel.UnixNano()),
×
257
                tag.MaxLevel(maxReadLevel.UnixNano()),
×
258
        )
×
259

×
260
        updateClusterAckLevelFn, failoverQueueProcessor := newTimerQueueFailoverProcessor(
×
261
                standbyClusterName,
×
262
                t.shard,
×
263
                t.historyEngine,
×
264
                t.taskProcessor,
×
265
                t.taskAllocator,
×
266
                t.activeTaskExecutor,
×
267
                t.logger,
×
268
                minLevel,
×
269
                maxReadLevel,
×
270
                domainIDs,
×
271
        )
×
272

×
273
        // NOTE: READ REF BEFORE MODIFICATION
×
274
        // ref: historyEngine.go registerDomainFailoverCallback function
×
275
        err = updateClusterAckLevelFn(newTimerTaskKey(minLevel, 0))
×
276
        if err != nil {
×
277
                t.logger.Error("Error update shard ack level", tag.Error(err))
×
278
        }
×
279
        failoverQueueProcessor.Start()
×
280
}
281

282
func (t *timerQueueProcessor) HandleAction(
283
        ctx context.Context,
284
        clusterName string,
285
        action *Action,
286
) (*ActionResult, error) {
283✔
287
        var resultNotificationCh chan actionResultNotification
283✔
288
        var added bool
283✔
289
        if clusterName == t.currentClusterName {
450✔
290
                resultNotificationCh, added = t.activeQueueProcessor.addAction(ctx, action)
167✔
291
        } else {
286✔
292
                found := false
119✔
293
                for standbyClusterName, standbyProcessor := range t.standbyQueueProcessors {
238✔
294
                        if clusterName == standbyClusterName {
238✔
295
                                resultNotificationCh, added = standbyProcessor.addAction(ctx, action)
119✔
296
                                found = true
119✔
297
                                break
119✔
298
                        }
299
                }
300

301
                if !found {
119✔
302
                        return nil, fmt.Errorf("unknown cluster name: %v", clusterName)
×
303
                }
×
304
        }
305

306
        if !added {
334✔
307
                if ctxErr := ctx.Err(); ctxErr != nil {
51✔
308
                        return nil, ctxErr
×
309
                }
×
310
                return nil, errProcessorShutdown
51✔
311
        }
312

313
        select {
235✔
314
        case resultNotification := <-resultNotificationCh:
235✔
315
                return resultNotification.result, resultNotification.err
235✔
316
        case <-t.shutdownChan:
×
317
                return nil, errProcessorShutdown
×
318
        case <-ctx.Done():
×
319
                return nil, ctx.Err()
×
320
        }
321
}
322

323
func (t *timerQueueProcessor) LockTaskProcessing() {
815✔
324
        t.taskAllocator.Lock()
815✔
325
}
815✔
326

327
func (t *timerQueueProcessor) UnlockTaskProcessing() {
815✔
328
        t.taskAllocator.Unlock()
815✔
329
}
815✔
330

331
func (t *timerQueueProcessor) completeTimerLoop() {
51✔
332
        defer t.shutdownWG.Done()
51✔
333

51✔
334
        completeTimer := time.NewTimer(t.config.TimerProcessorCompleteTimerInterval())
51✔
335
        defer completeTimer.Stop()
51✔
336

51✔
337
        for {
218✔
338
                select {
167✔
339
                case <-t.shutdownChan:
51✔
340
                        if err := t.completeTimer(); err != nil {
102✔
341
                                t.logger.Error("Error complete timer task", tag.Error(err))
51✔
342
                        }
51✔
343
                        return
51✔
344
                case <-completeTimer.C:
119✔
345
                        for attempt := 0; attempt < t.config.TimerProcessorCompleteTimerFailureRetryCount(); attempt++ {
238✔
346
                                err := t.completeTimer()
119✔
347
                                if err == nil {
238✔
348
                                        break
119✔
349
                                }
350

351
                                t.logger.Error("Error complete timer task", tag.Error(err))
×
352
                                if err == shard.ErrShardClosed {
×
353
                                        go t.Stop()
×
354
                                        return
×
355
                                }
×
356
                                backoff := time.Duration(attempt * 100)
×
357
                                time.Sleep(backoff * time.Millisecond)
×
358

×
359
                                select {
×
360
                                case <-t.shutdownChan:
×
361
                                        // break the retry loop if shutdown chan is closed
×
362
                                        break
×
363
                                default:
×
364
                                }
365
                        }
366

367
                        completeTimer.Reset(t.config.TimerProcessorCompleteTimerInterval())
119✔
368
                }
369
        }
370
}
371

372
func (t *timerQueueProcessor) completeTimer() error {
167✔
373
        newAckLevel := maximumTimerTaskKey
167✔
374
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
167✔
375
        if err != nil {
218✔
376
                return err
51✔
377
        }
51✔
378
        for _, queueState := range actionResult.GetStateActionResult.States {
238✔
379
                newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
119✔
380
        }
119✔
381

382
        for standbyClusterName := range t.standbyQueueProcessors {
238✔
383
                actionResult, err := t.HandleAction(context.Background(), standbyClusterName, NewGetStateAction())
119✔
384
                if err != nil {
119✔
385
                        return err
×
386
                }
×
387
                for _, queueState := range actionResult.GetStateActionResult.States {
238✔
388
                        newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
119✔
389
                }
119✔
390
        }
391

392
        for _, failoverInfo := range t.shard.GetAllTimerFailoverLevels() {
119✔
393
                failoverLevel := newTimerTaskKey(failoverInfo.MinLevel, 0)
×
394
                newAckLevel = minTaskKey(newAckLevel, failoverLevel)
×
395
        }
×
396

397
        if newAckLevel == maximumTimerTaskKey {
119✔
398
                panic("Unable to get timer queue processor ack level")
×
399
        }
400

401
        newAckLevelTimestamp := newAckLevel.(timerTaskKey).visibilityTimestamp
119✔
402
        t.logger.Debug(fmt.Sprintf("Start completing timer task from: %v, to %v", t.ackLevel, newAckLevelTimestamp))
119✔
403
        if !t.ackLevel.Before(newAckLevelTimestamp) {
200✔
404
                return nil
81✔
405
        }
81✔
406

407
        t.metricsClient.IncCounter(metrics.TimerQueueProcessorScope, metrics.TaskBatchCompleteCounter)
39✔
408

39✔
409
        for {
78✔
410
                pageSize := t.config.TimerTaskDeleteBatchSize()
39✔
411
                resp, err := t.shard.GetExecutionManager().RangeCompleteTimerTask(context.Background(), &persistence.RangeCompleteTimerTaskRequest{
39✔
412
                        InclusiveBeginTimestamp: t.ackLevel,
39✔
413
                        ExclusiveEndTimestamp:   newAckLevelTimestamp,
39✔
414
                        PageSize:                pageSize, // pageSize may or may not be honored
39✔
415
                })
39✔
416
                if err != nil {
39✔
417
                        return err
×
418
                }
×
419
                if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, pageSize) {
78✔
420
                        break
39✔
421
                }
422
        }
423

424
        t.ackLevel = newAckLevelTimestamp
39✔
425

39✔
426
        return t.shard.UpdateTimerAckLevel(t.ackLevel)
39✔
427
}
428

429
func newTimerQueueActiveProcessor(
430
        clusterName string,
431
        shard shard.Context,
432
        historyEngine engine.Engine,
433
        taskProcessor task.Processor,
434
        taskAllocator TaskAllocator,
435
        taskExecutor task.Executor,
436
        logger log.Logger,
437
) *timerQueueProcessorBase {
51✔
438
        config := shard.GetConfig()
51✔
439
        options := newTimerQueueProcessorOptions(config, true, false)
51✔
440

51✔
441
        logger = logger.WithTags(tag.ClusterName(clusterName))
51✔
442

51✔
443
        taskFilter := func(taskInfo task.Info) (bool, error) {
2,415✔
444
                timer, ok := taskInfo.(*persistence.TimerTaskInfo)
2,364✔
445
                if !ok {
2,364✔
446
                        return false, errUnexpectedQueueTask
×
447
                }
×
448
                if notRegistered, err := isDomainNotRegistered(shard, timer.DomainID); notRegistered && err == nil {
2,364✔
449
                        logger.Info("Domain is not in registered status, skip task in active timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo))
×
450
                        return false, nil
×
451
                }
×
452

453
                return taskAllocator.VerifyActiveTask(timer.DomainID, timer)
2,364✔
454
        }
455

456
        updateMaxReadLevel := func() task.Key {
2,412✔
457
                return newTimerTaskKey(shard.UpdateTimerMaxReadLevel(clusterName), 0)
2,361✔
458
        }
2,361✔
459

460
        updateClusterAckLevel := func(ackLevel task.Key) error {
51✔
461
                return shard.UpdateTimerClusterAckLevel(clusterName, ackLevel.(timerTaskKey).visibilityTimestamp)
×
462
        }
×
463

464
        updateProcessingQueueStates := func(states []ProcessingQueueState) error {
299✔
465
                pStates := convertToPersistenceTimerProcessingQueueStates(states)
248✔
466
                return shard.UpdateTimerProcessingQueueStates(clusterName, pStates)
248✔
467
        }
248✔
468

469
        queueShutdown := func() error {
51✔
470
                return nil
×
471
        }
×
472

473
        return newTimerQueueProcessorBase(
51✔
474
                clusterName,
51✔
475
                shard,
51✔
476
                loadTimerProcessingQueueStates(clusterName, shard, options, logger),
51✔
477
                taskProcessor,
51✔
478
                NewLocalTimerGate(shard.GetTimeSource()),
51✔
479
                options,
51✔
480
                updateMaxReadLevel,
51✔
481
                updateClusterAckLevel,
51✔
482
                updateProcessingQueueStates,
51✔
483
                queueShutdown,
51✔
484
                taskFilter,
51✔
485
                taskExecutor,
51✔
486
                logger,
51✔
487
                shard.GetMetricsClient(),
51✔
488
        )
51✔
489
}
490

491
func newTimerQueueStandbyProcessor(
492
        clusterName string,
493
        shard shard.Context,
494
        historyEngine engine.Engine,
495
        taskProcessor task.Processor,
496
        taskAllocator TaskAllocator,
497
        taskExecutor task.Executor,
498
        logger log.Logger,
499
) (*timerQueueProcessorBase, RemoteTimerGate) {
51✔
500
        config := shard.GetConfig()
51✔
501
        options := newTimerQueueProcessorOptions(config, false, false)
51✔
502

51✔
503
        logger = logger.WithTags(tag.ClusterName(clusterName))
51✔
504

51✔
505
        taskFilter := func(taskInfo task.Info) (bool, error) {
54✔
506
                timer, ok := taskInfo.(*persistence.TimerTaskInfo)
3✔
507
                if !ok {
3✔
508
                        return false, errUnexpectedQueueTask
×
509
                }
×
510
                if notRegistered, err := isDomainNotRegistered(shard, timer.DomainID); notRegistered && err == nil {
3✔
511
                        logger.Info("Domain is not in registered status, skip task in standby timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo))
×
512
                        return false, nil
×
513
                }
×
514
                if timer.TaskType == persistence.TaskTypeWorkflowTimeout ||
3✔
515
                        timer.TaskType == persistence.TaskTypeDeleteHistoryEvent {
6✔
516
                        domainEntry, err := shard.GetDomainCache().GetDomainByID(timer.DomainID)
3✔
517
                        if err == nil {
6✔
518
                                if domainEntry.HasReplicationCluster(clusterName) {
6✔
519
                                        // guarantee the processing of workflow execution history deletion
3✔
520
                                        return true, nil
3✔
521
                                }
3✔
522
                        } else {
×
523
                                if _, ok := err.(*types.EntityNotExistsError); !ok {
×
524
                                        // retry the task if failed to find the domain
×
525
                                        logger.Warn("Cannot find domain", tag.WorkflowDomainID(timer.DomainID))
×
526
                                        return false, err
×
527
                                }
×
528
                                logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(timer.DomainID), tag.Value(timer))
×
529
                                return false, nil
×
530
                        }
531
                }
532
                return taskAllocator.VerifyStandbyTask(clusterName, timer.DomainID, timer)
3✔
533
        }
534

535
        updateMaxReadLevel := func() task.Key {
102✔
536
                return newTimerTaskKey(shard.UpdateTimerMaxReadLevel(clusterName), 0)
51✔
537
        }
51✔
538

539
        updateClusterAckLevel := func(ackLevel task.Key) error {
51✔
540
                return shard.UpdateTimerClusterAckLevel(clusterName, ackLevel.(timerTaskKey).visibilityTimestamp)
×
541
        }
×
542

543
        updateProcessingQueueStates := func(states []ProcessingQueueState) error {
297✔
544
                pStates := convertToPersistenceTimerProcessingQueueStates(states)
246✔
545
                return shard.UpdateTimerProcessingQueueStates(clusterName, pStates)
246✔
546
        }
246✔
547

548
        queueShutdown := func() error {
51✔
549
                return nil
×
550
        }
×
551

552
        remoteTimerGate := NewRemoteTimerGate()
51✔
553
        remoteTimerGate.SetCurrentTime(shard.GetCurrentTime(clusterName))
51✔
554

51✔
555
        return newTimerQueueProcessorBase(
51✔
556
                clusterName,
51✔
557
                shard,
51✔
558
                loadTimerProcessingQueueStates(clusterName, shard, options, logger),
51✔
559
                taskProcessor,
51✔
560
                remoteTimerGate,
51✔
561
                options,
51✔
562
                updateMaxReadLevel,
51✔
563
                updateClusterAckLevel,
51✔
564
                updateProcessingQueueStates,
51✔
565
                queueShutdown,
51✔
566
                taskFilter,
51✔
567
                taskExecutor,
51✔
568
                logger,
51✔
569
                shard.GetMetricsClient(),
51✔
570
        ), remoteTimerGate
51✔
571
}
572

573
func newTimerQueueFailoverProcessor(
574
        standbyClusterName string,
575
        shardContext shard.Context,
576
        historyEngine engine.Engine,
577
        taskProcessor task.Processor,
578
        taskAllocator TaskAllocator,
579
        taskExecutor task.Executor,
580
        logger log.Logger,
581
        minLevel, maxLevel time.Time,
582
        domainIDs map[string]struct{},
583
) (updateClusterAckLevelFn, *timerQueueProcessorBase) {
×
584
        config := shardContext.GetConfig()
×
585
        options := newTimerQueueProcessorOptions(config, true, true)
×
586

×
587
        currentClusterName := shardContext.GetService().GetClusterMetadata().GetCurrentClusterName()
×
588
        failoverStartTime := shardContext.GetTimeSource().Now()
×
589
        failoverUUID := uuid.New()
×
590
        logger = logger.WithTags(
×
591
                tag.ClusterName(currentClusterName),
×
592
                tag.WorkflowDomainIDs(domainIDs),
×
593
                tag.FailoverMsg("from: "+standbyClusterName),
×
594
        )
×
595

×
596
        taskFilter := func(taskInfo task.Info) (bool, error) {
×
597
                timer, ok := taskInfo.(*persistence.TimerTaskInfo)
×
598
                if !ok {
×
599
                        return false, errUnexpectedQueueTask
×
600
                }
×
601
                if notRegistered, err := isDomainNotRegistered(shardContext, timer.DomainID); notRegistered && err == nil {
×
602
                        logger.Info("Domain is not in registered status, skip task in failover timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo))
×
603
                        return false, nil
×
604
                }
×
605
                return taskAllocator.VerifyFailoverActiveTask(domainIDs, timer.DomainID, timer)
×
606
        }
607

608
        maxReadLevelTaskKey := newTimerTaskKey(maxLevel, 0)
×
609
        updateMaxReadLevel := func() task.Key {
×
610
                return maxReadLevelTaskKey // this is a const
×
611
        }
×
612

613
        updateClusterAckLevel := func(ackLevel task.Key) error {
×
614
                return shardContext.UpdateTimerFailoverLevel(
×
615
                        failoverUUID,
×
616
                        shard.TimerFailoverLevel{
×
617
                                StartTime:    failoverStartTime,
×
618
                                MinLevel:     minLevel,
×
619
                                CurrentLevel: ackLevel.(timerTaskKey).visibilityTimestamp,
×
620
                                MaxLevel:     maxLevel,
×
621
                                DomainIDs:    domainIDs,
×
622
                        },
×
623
                )
×
624
        }
×
625

626
        queueShutdown := func() error {
×
627
                return shardContext.DeleteTimerFailoverLevel(failoverUUID)
×
628
        }
×
629

630
        processingQueueStates := []ProcessingQueueState{
×
631
                NewProcessingQueueState(
×
632
                        defaultProcessingQueueLevel,
×
633
                        newTimerTaskKey(minLevel, 0),
×
634
                        maxReadLevelTaskKey,
×
635
                        NewDomainFilter(domainIDs, false),
×
636
                ),
×
637
        }
×
638

×
639
        return updateClusterAckLevel, newTimerQueueProcessorBase(
×
640
                currentClusterName, // should use current cluster's time when doing domain failover
×
641
                shardContext,
×
642
                processingQueueStates,
×
643
                taskProcessor,
×
644
                NewLocalTimerGate(shardContext.GetTimeSource()),
×
645
                options,
×
646
                updateMaxReadLevel,
×
647
                updateClusterAckLevel,
×
648
                nil,
×
649
                queueShutdown,
×
650
                taskFilter,
×
651
                taskExecutor,
×
652
                logger,
×
653
                shardContext.GetMetricsClient(),
×
654
        )
×
655
}
656

657
func loadTimerProcessingQueueStates(
658
        clusterName string,
659
        shard shard.Context,
660
        options *queueProcessorOptions,
661
        logger log.Logger,
662
) []ProcessingQueueState {
99✔
663
        ackLevel := shard.GetTimerClusterAckLevel(clusterName)
99✔
664
        if options.EnableLoadQueueStates() {
198✔
665
                pStates := shard.GetTimerProcessingQueueStates(clusterName)
99✔
666
                if validateProcessingQueueStates(pStates, ackLevel) {
198✔
667
                        return convertFromPersistenceTimerProcessingQueueStates(pStates)
99✔
668
                }
99✔
669

670
                logger.Error("Incompatible processing queue states and ackLevel",
×
671
                        tag.Value(pStates),
×
672
                        tag.ShardTimerAcks(ackLevel),
×
673
                )
×
674
        }
675

676
        return []ProcessingQueueState{
×
677
                NewProcessingQueueState(
×
678
                        defaultProcessingQueueLevel,
×
679
                        newTimerTaskKey(ackLevel, 0),
×
680
                        maximumTimerTaskKey,
×
681
                        NewDomainFilter(nil, true),
×
682
                ),
×
683
        }
×
684
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc