• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0186dd68-6020-4844-b02a-bb5e79dca3e8

14 Mar 2023 12:16AM UTC coverage: 57.084% (+0.001%) from 57.083%
0186dd68-6020-4844-b02a-bb5e79dca3e8

push

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning

335 of 335 new or added lines in 21 files covered. (100.0%)

85415 of 149631 relevant lines covered (57.08%)

2276.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.06
/service/history/queue/transfer_queue_processor.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "math"
28
        "sync"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/pborman/uuid"
33

34
        "github.com/uber/cadence/common"
35
        "github.com/uber/cadence/common/log"
36
        "github.com/uber/cadence/common/log/tag"
37
        "github.com/uber/cadence/common/metrics"
38
        "github.com/uber/cadence/common/ndc"
39
        "github.com/uber/cadence/common/persistence"
40
        "github.com/uber/cadence/common/reconciliation/invariant"
41
        "github.com/uber/cadence/common/types"
42
        hcommon "github.com/uber/cadence/service/history/common"
43
        "github.com/uber/cadence/service/history/config"
44
        "github.com/uber/cadence/service/history/engine"
45
        "github.com/uber/cadence/service/history/execution"
46
        "github.com/uber/cadence/service/history/reset"
47
        "github.com/uber/cadence/service/history/shard"
48
        "github.com/uber/cadence/service/history/task"
49
        "github.com/uber/cadence/service/worker/archiver"
50
)
51

52
const (
53
        defaultProcessingQueueLevel = 0
54
)
55

56
var (
57
        errUnexpectedQueueTask = errors.New("unexpected queue task")
58
        errProcessorShutdown   = errors.New("queue processor has been shutdown")
59

60
        maximumTransferTaskKey = newTransferTaskKey(math.MaxInt64)
61
)
62

63
type (
64
        transferQueueProcessor struct {
65
                shard         shard.Context
66
                historyEngine engine.Engine
67
                taskProcessor task.Processor
68

69
                config             *config.Config
70
                currentClusterName string
71

72
                metricsClient metrics.Client
73
                logger        log.Logger
74

75
                status       int32
76
                shutdownChan chan struct{}
77
                shutdownWG   sync.WaitGroup
78

79
                ackLevel               int64
80
                taskAllocator          TaskAllocator
81
                activeTaskExecutor     task.Executor
82
                activeQueueProcessor   *transferQueueProcessorBase
83
                standbyQueueProcessors map[string]*transferQueueProcessorBase
84
        }
85
)
86

87
// NewTransferQueueProcessor creates a new transfer QueueProcessor
88
func NewTransferQueueProcessor(
89
        shard shard.Context,
90
        historyEngine engine.Engine,
91
        taskProcessor task.Processor,
92
        executionCache *execution.Cache,
93
        workflowResetter reset.WorkflowResetter,
94
        archivalClient archiver.Client,
95
        executionCheck invariant.Invariant,
96
) Processor {
51✔
97
        logger := shard.GetLogger().WithTags(tag.ComponentTransferQueue)
51✔
98
        currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
51✔
99
        config := shard.GetConfig()
51✔
100
        taskAllocator := NewTaskAllocator(shard)
51✔
101

51✔
102
        activeTaskExecutor := task.NewTransferActiveTaskExecutor(
51✔
103
                shard,
51✔
104
                archivalClient,
51✔
105
                executionCache,
51✔
106
                workflowResetter,
51✔
107
                logger,
51✔
108
                config,
51✔
109
        )
51✔
110

51✔
111
        activeQueueProcessor := newTransferQueueActiveProcessor(
51✔
112
                shard,
51✔
113
                historyEngine,
51✔
114
                taskProcessor,
51✔
115
                taskAllocator,
51✔
116
                activeTaskExecutor,
51✔
117
                logger,
51✔
118
        )
51✔
119

51✔
120
        standbyQueueProcessors := make(map[string]*transferQueueProcessorBase)
51✔
121
        for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() {
102✔
122
                historyResender := ndc.NewHistoryResender(
51✔
123
                        shard.GetDomainCache(),
51✔
124
                        shard.GetService().GetClientBean().GetRemoteAdminClient(clusterName),
51✔
125
                        func(ctx context.Context, request *types.ReplicateEventsV2Request) error {
51✔
126
                                return historyEngine.ReplicateEventsV2(ctx, request)
×
127
                        },
×
128
                        config.StandbyTaskReReplicationContextTimeout,
129
                        executionCheck,
130
                        shard.GetLogger(),
131
                )
132
                standbyTaskExecutor := task.NewTransferStandbyTaskExecutor(
51✔
133
                        shard,
51✔
134
                        archivalClient,
51✔
135
                        executionCache,
51✔
136
                        historyResender,
51✔
137
                        logger,
51✔
138
                        clusterName,
51✔
139
                        config,
51✔
140
                )
51✔
141
                standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor(
51✔
142
                        clusterName,
51✔
143
                        shard,
51✔
144
                        historyEngine,
51✔
145
                        taskProcessor,
51✔
146
                        taskAllocator,
51✔
147
                        standbyTaskExecutor,
51✔
148
                        logger,
51✔
149
                )
51✔
150
        }
151

152
        return &transferQueueProcessor{
51✔
153
                shard:         shard,
51✔
154
                historyEngine: historyEngine,
51✔
155
                taskProcessor: taskProcessor,
51✔
156

51✔
157
                config:             config,
51✔
158
                currentClusterName: currentClusterName,
51✔
159

51✔
160
                metricsClient: shard.GetMetricsClient(),
51✔
161
                logger:        logger,
51✔
162

51✔
163
                status:       common.DaemonStatusInitialized,
51✔
164
                shutdownChan: make(chan struct{}),
51✔
165

51✔
166
                ackLevel:               shard.GetTransferAckLevel(),
51✔
167
                taskAllocator:          taskAllocator,
51✔
168
                activeTaskExecutor:     activeTaskExecutor,
51✔
169
                activeQueueProcessor:   activeQueueProcessor,
51✔
170
                standbyQueueProcessors: standbyQueueProcessors,
51✔
171
        }
51✔
172
}
173

174
func (t *transferQueueProcessor) Start() {
51✔
175
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
51✔
176
                return
×
177
        }
×
178

179
        t.activeQueueProcessor.Start()
51✔
180
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
102✔
181
                standbyQueueProcessor.Start()
51✔
182
        }
51✔
183

184
        t.shutdownWG.Add(1)
51✔
185
        go t.completeTransferLoop()
51✔
186
}
187

188
func (t *transferQueueProcessor) Stop() {
51✔
189
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
51✔
190
                return
×
191
        }
×
192

193
        t.activeQueueProcessor.Stop()
51✔
194
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
102✔
195
                standbyQueueProcessor.Stop()
51✔
196
        }
51✔
197

198
        close(t.shutdownChan)
51✔
199
        common.AwaitWaitGroup(&t.shutdownWG, time.Minute)
51✔
200
}
201

202
func (t *transferQueueProcessor) NotifyNewTask(
203
        clusterName string,
204
        info *hcommon.NotifyTaskInfo,
205
) {
2,271✔
206
        if len(info.Tasks) == 0 {
2,271✔
207
                return
×
208
        }
×
209

210
        if clusterName == t.currentClusterName {
4,542✔
211
                t.activeQueueProcessor.notifyNewTask(info)
2,271✔
212
                return
2,271✔
213
        }
2,271✔
214

215
        standbyQueueProcessor, ok := t.standbyQueueProcessors[clusterName]
3✔
216
        if !ok {
3✔
217
                panic(fmt.Sprintf("Cannot find transfer processor for %s.", clusterName))
×
218
        }
219
        standbyQueueProcessor.notifyNewTask(info)
3✔
220
}
221

222
func (t *transferQueueProcessor) FailoverDomain(
223
        domainIDs map[string]struct{},
224
) {
×
225
        // Failover queue is used to scan all inflight tasks, if queue processor is not
×
226
        // started, there's no inflight task and we don't need to create a failover processor.
×
227
        // Also the HandleAction will be blocked if queue processor processing loop is not running.
×
228
        if atomic.LoadInt32(&t.status) != common.DaemonStatusStarted {
×
229
                return
×
230
        }
×
231

232
        minLevel := t.shard.GetTransferClusterAckLevel(t.currentClusterName)
×
233
        standbyClusterName := t.currentClusterName
×
234
        for clusterName := range t.shard.GetClusterMetadata().GetEnabledClusterInfo() {
×
235
                ackLevel := t.shard.GetTransferClusterAckLevel(clusterName)
×
236
                if ackLevel < minLevel {
×
237
                        minLevel = ackLevel
×
238
                        standbyClusterName = clusterName
×
239
                }
×
240
        }
241

242
        maxReadLevel := int64(0)
×
243
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
×
244
        if err != nil {
×
245
                t.logger.Error("Transfer Failover Failed", tag.WorkflowDomainIDs(domainIDs), tag.Error(err))
×
246
                if err == errProcessorShutdown {
×
247
                        // processor/shard already shutdown, we don't need to create failover queue processor
×
248
                        return
×
249
                }
×
250
                // other errors should never be returned for GetStateAction
251
                panic(fmt.Sprintf("unknown error for GetStateAction: %v", err))
×
252
        }
253
        for _, queueState := range actionResult.GetStateActionResult.States {
×
254
                queueReadLevel := queueState.ReadLevel().(transferTaskKey).taskID
×
255
                if maxReadLevel < queueReadLevel {
×
256
                        maxReadLevel = queueReadLevel
×
257
                }
×
258
        }
259
        // maxReadLevel is exclusive, so add 1
260
        maxReadLevel++
×
261

×
262
        t.logger.Info("Transfer Failover Triggered",
×
263
                tag.WorkflowDomainIDs(domainIDs),
×
264
                tag.MinLevel(minLevel),
×
265
                tag.MaxLevel(maxReadLevel))
×
266

×
267
        updateShardAckLevel, failoverQueueProcessor := newTransferQueueFailoverProcessor(
×
268
                t.shard,
×
269
                t.historyEngine,
×
270
                t.taskProcessor,
×
271
                t.taskAllocator,
×
272
                t.activeTaskExecutor,
×
273
                t.logger,
×
274
                minLevel,
×
275
                maxReadLevel,
×
276
                domainIDs,
×
277
                standbyClusterName,
×
278
        )
×
279

×
280
        // NOTE: READ REF BEFORE MODIFICATION
×
281
        // ref: historyEngine.go registerDomainFailoverCallback function
×
282
        err = updateShardAckLevel(newTransferTaskKey(minLevel))
×
283
        if err != nil {
×
284
                t.logger.Error("Error update shard ack level", tag.Error(err))
×
285
        }
×
286
        failoverQueueProcessor.Start()
×
287
}
288

289
func (t *transferQueueProcessor) HandleAction(
290
        ctx context.Context,
291
        clusterName string,
292
        action *Action,
293
) (*ActionResult, error) {
283✔
294
        var resultNotificationCh chan actionResultNotification
283✔
295
        var added bool
283✔
296
        if clusterName == t.currentClusterName {
450✔
297
                resultNotificationCh, added = t.activeQueueProcessor.addAction(ctx, action)
167✔
298
        } else {
283✔
299
                found := false
116✔
300
                for standbyClusterName, standbyProcessor := range t.standbyQueueProcessors {
232✔
301
                        if clusterName == standbyClusterName {
232✔
302
                                resultNotificationCh, added = standbyProcessor.addAction(ctx, action)
116✔
303
                                found = true
116✔
304
                                break
116✔
305
                        }
306
                }
307

308
                if !found {
116✔
309
                        return nil, fmt.Errorf("unknown cluster name: %v", clusterName)
×
310
                }
×
311
        }
312

313
        if !added {
334✔
314
                if ctxErr := ctx.Err(); ctxErr != nil {
51✔
315
                        return nil, ctxErr
×
316
                }
×
317
                return nil, errProcessorShutdown
51✔
318
        }
319

320
        select {
232✔
321
        case resultNotification := <-resultNotificationCh:
232✔
322
                return resultNotification.result, resultNotification.err
232✔
323
        case <-t.shutdownChan:
×
324
                return nil, errProcessorShutdown
×
325
        case <-ctx.Done():
×
326
                return nil, ctx.Err()
×
327
        }
328
}
329

330
func (t *transferQueueProcessor) LockTaskProcessing() {
815✔
331
        t.taskAllocator.Lock()
815✔
332
}
815✔
333

334
func (t *transferQueueProcessor) UnlockTaskProcessing() {
815✔
335
        t.taskAllocator.Unlock()
815✔
336
}
815✔
337

338
func (t *transferQueueProcessor) completeTransferLoop() {
51✔
339
        defer t.shutdownWG.Done()
51✔
340

51✔
341
        completeTimer := time.NewTimer(t.config.TransferProcessorCompleteTransferInterval())
51✔
342
        defer completeTimer.Stop()
51✔
343

51✔
344
        for {
218✔
345
                select {
167✔
346
                case <-t.shutdownChan:
51✔
347
                        // before shutdown, make sure the ack level is up to date
51✔
348
                        if err := t.completeTransfer(); err != nil {
102✔
349
                                t.logger.Error("Error complete transfer task", tag.Error(err))
51✔
350
                        }
51✔
351
                        return
51✔
352
                case <-completeTimer.C:
116✔
353
                        for attempt := 0; attempt < t.config.TransferProcessorCompleteTransferFailureRetryCount(); attempt++ {
232✔
354
                                err := t.completeTransfer()
116✔
355
                                if err == nil {
232✔
356
                                        break
116✔
357
                                }
358

359
                                t.logger.Error("Failed to complete transfer task", tag.Error(err))
×
360
                                if err == shard.ErrShardClosed {
×
361
                                        // shard closed, trigger shutdown and bail out
×
362
                                        go t.Stop()
×
363
                                        return
×
364
                                }
×
365
                                backoff := time.Duration(attempt * 100)
×
366
                                time.Sleep(backoff * time.Millisecond)
×
367

×
368
                                select {
×
369
                                case <-t.shutdownChan:
×
370
                                        // break the retry loop if shutdown chan is closed
×
371
                                        break
×
372
                                default:
×
373
                                }
374
                        }
375

376
                        completeTimer.Reset(t.config.TransferProcessorCompleteTransferInterval())
116✔
377
                }
378
        }
379
}
380

381
func (t *transferQueueProcessor) completeTransfer() error {
167✔
382
        newAckLevel := maximumTransferTaskKey
167✔
383
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
167✔
384
        if err != nil {
218✔
385
                return err
51✔
386
        }
51✔
387
        for _, queueState := range actionResult.GetStateActionResult.States {
232✔
388
                newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
116✔
389
        }
116✔
390

391
        for standbyClusterName := range t.standbyQueueProcessors {
232✔
392
                actionResult, err := t.HandleAction(context.Background(), standbyClusterName, NewGetStateAction())
116✔
393
                if err != nil {
116✔
394
                        return err
×
395
                }
×
396
                for _, queueState := range actionResult.GetStateActionResult.States {
232✔
397
                        newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
116✔
398
                }
116✔
399
        }
400

401
        for _, failoverInfo := range t.shard.GetAllTransferFailoverLevels() {
116✔
402
                failoverLevel := newTransferTaskKey(failoverInfo.MinLevel)
×
403
                if newAckLevel == nil {
×
404
                        newAckLevel = failoverLevel
×
405
                } else {
×
406
                        newAckLevel = minTaskKey(newAckLevel, failoverLevel)
×
407
                }
×
408
        }
409

410
        if newAckLevel == nil {
116✔
411
                panic("Unable to get transfer queue processor ack level")
×
412
        }
413

414
        newAckLevelTaskID := newAckLevel.(transferTaskKey).taskID
116✔
415
        t.logger.Debug(fmt.Sprintf("Start completing transfer task from: %v, to %v.", t.ackLevel, newAckLevelTaskID))
116✔
416
        if t.ackLevel >= newAckLevelTaskID {
123✔
417
                return nil
7✔
418
        }
7✔
419

420
        t.metricsClient.IncCounter(metrics.TransferQueueProcessorScope, metrics.TaskBatchCompleteCounter)
109✔
421

109✔
422
        for {
218✔
423
                pageSize := t.config.TransferTaskDeleteBatchSize()
109✔
424
                resp, err := t.shard.GetExecutionManager().RangeCompleteTransferTask(context.Background(), &persistence.RangeCompleteTransferTaskRequest{
109✔
425
                        ExclusiveBeginTaskID: t.ackLevel,
109✔
426
                        InclusiveEndTaskID:   newAckLevelTaskID,
109✔
427
                        PageSize:             pageSize, // pageSize may or may not be honored
109✔
428
                })
109✔
429
                if err != nil {
109✔
430
                        return err
×
431
                }
×
432
                if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, pageSize) {
218✔
433
                        break
109✔
434
                }
435
        }
436

437
        t.ackLevel = newAckLevelTaskID
109✔
438

109✔
439
        return t.shard.UpdateTransferAckLevel(newAckLevelTaskID)
109✔
440
}
441

442
func newTransferQueueActiveProcessor(
443
        shard shard.Context,
444
        historyEngine engine.Engine,
445
        taskProcessor task.Processor,
446
        taskAllocator TaskAllocator,
447
        taskExecutor task.Executor,
448
        logger log.Logger,
449
) *transferQueueProcessorBase {
51✔
450
        config := shard.GetConfig()
51✔
451
        options := newTransferQueueProcessorOptions(config, true, false)
51✔
452

51✔
453
        currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
51✔
454
        logger = logger.WithTags(tag.ClusterName(currentClusterName))
51✔
455

51✔
456
        taskFilter := func(taskInfo task.Info) (bool, error) {
3,057✔
457
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
3,006✔
458
                if !ok {
3,006✔
459
                        return false, errUnexpectedQueueTask
×
460
                }
×
461
                if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil {
3,006✔
462
                        logger.Info("Domain is not in registered status, skip task in active transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
463
                        return false, nil
×
464
                }
×
465
                return taskAllocator.VerifyActiveTask(task.DomainID, task)
3,006✔
466
        }
467

468
        updateMaxReadLevel := func() task.Key {
2,605✔
469
                return newTransferTaskKey(shard.GetTransferMaxReadLevel())
2,554✔
470
        }
2,554✔
471

472
        updateClusterAckLevel := func(ackLevel task.Key) error {
51✔
473
                taskID := ackLevel.(transferTaskKey).taskID
×
474
                return shard.UpdateTransferClusterAckLevel(currentClusterName, taskID)
×
475
        }
×
476

477
        updateProcessingQueueStates := func(states []ProcessingQueueState) error {
297✔
478
                pStates := convertToPersistenceTransferProcessingQueueStates(states)
246✔
479
                return shard.UpdateTransferProcessingQueueStates(currentClusterName, pStates)
246✔
480
        }
246✔
481

482
        queueShutdown := func() error {
51✔
483
                return nil
×
484
        }
×
485

486
        return newTransferQueueProcessorBase(
51✔
487
                shard,
51✔
488
                loadTransferProcessingQueueStates(currentClusterName, shard, options, logger),
51✔
489
                taskProcessor,
51✔
490
                options,
51✔
491
                updateMaxReadLevel,
51✔
492
                updateClusterAckLevel,
51✔
493
                updateProcessingQueueStates,
51✔
494
                queueShutdown,
51✔
495
                taskFilter,
51✔
496
                taskExecutor,
51✔
497
                logger,
51✔
498
                shard.GetMetricsClient(),
51✔
499
        )
51✔
500
}
501

502
func newTransferQueueStandbyProcessor(
503
        clusterName string,
504
        shard shard.Context,
505
        historyEngine engine.Engine,
506
        taskProcessor task.Processor,
507
        taskAllocator TaskAllocator,
508
        taskExecutor task.Executor,
509
        logger log.Logger,
510
) *transferQueueProcessorBase {
51✔
511
        config := shard.GetConfig()
51✔
512
        options := newTransferQueueProcessorOptions(config, false, false)
51✔
513

51✔
514
        logger = logger.WithTags(tag.ClusterName(clusterName))
51✔
515

51✔
516
        taskFilter := func(taskInfo task.Info) (bool, error) {
2,713✔
517
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
2,662✔
518
                if !ok {
2,662✔
519
                        return false, errUnexpectedQueueTask
×
520
                }
×
521
                if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil {
2,662✔
522
                        logger.Info("Domain is not in registered status, skip task in standby transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
523
                        return false, nil
×
524
                }
×
525
                if task.TaskType == persistence.TransferTaskTypeCloseExecution ||
2,662✔
526
                        task.TaskType == persistence.TransferTaskTypeRecordWorkflowClosed {
3,117✔
527
                        domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID)
455✔
528
                        if err == nil {
910✔
529
                                if domainEntry.HasReplicationCluster(clusterName) {
458✔
530
                                        // guarantee the processing of workflow execution close
3✔
531
                                        return true, nil
3✔
532
                                }
3✔
533
                        } else {
×
534
                                if _, ok := err.(*types.EntityNotExistsError); !ok {
×
535
                                        // retry the task if failed to find the domain
×
536
                                        logger.Warn("Cannot find domain", tag.WorkflowDomainID(task.DomainID))
×
537
                                        return false, err
×
538
                                }
×
539
                                logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(task.DomainID), tag.Value(task))
×
540
                                return false, nil
×
541
                        }
542
                }
543
                return taskAllocator.VerifyStandbyTask(clusterName, task.DomainID, task)
2,662✔
544
        }
545

546
        updateMaxReadLevel := func() task.Key {
338✔
547
                return newTransferTaskKey(shard.GetTransferMaxReadLevel())
287✔
548
        }
287✔
549

550
        updateClusterAckLevel := func(ackLevel task.Key) error {
51✔
551
                taskID := ackLevel.(transferTaskKey).taskID
×
552
                return shard.UpdateTransferClusterAckLevel(clusterName, taskID)
×
553
        }
×
554

555
        updateProcessingQueueStates := func(states []ProcessingQueueState) error {
299✔
556
                pStates := convertToPersistenceTransferProcessingQueueStates(states)
248✔
557
                return shard.UpdateTransferProcessingQueueStates(clusterName, pStates)
248✔
558
        }
248✔
559

560
        queueShutdown := func() error {
51✔
561
                return nil
×
562
        }
×
563

564
        return newTransferQueueProcessorBase(
51✔
565
                shard,
51✔
566
                loadTransferProcessingQueueStates(clusterName, shard, options, logger),
51✔
567
                taskProcessor,
51✔
568
                options,
51✔
569
                updateMaxReadLevel,
51✔
570
                updateClusterAckLevel,
51✔
571
                updateProcessingQueueStates,
51✔
572
                queueShutdown,
51✔
573
                taskFilter,
51✔
574
                taskExecutor,
51✔
575
                logger,
51✔
576
                shard.GetMetricsClient(),
51✔
577
        )
51✔
578
}
579

580
func newTransferQueueFailoverProcessor(
581
        shardContext shard.Context,
582
        historyEngine engine.Engine,
583
        taskProcessor task.Processor,
584
        taskAllocator TaskAllocator,
585
        taskExecutor task.Executor,
586
        logger log.Logger,
587
        minLevel, maxLevel int64,
588
        domainIDs map[string]struct{},
589
        standbyClusterName string,
590
) (updateClusterAckLevelFn, *transferQueueProcessorBase) {
×
591
        config := shardContext.GetConfig()
×
592
        options := newTransferQueueProcessorOptions(config, true, true)
×
593

×
594
        currentClusterName := shardContext.GetService().GetClusterMetadata().GetCurrentClusterName()
×
595
        failoverUUID := uuid.New()
×
596
        logger = logger.WithTags(
×
597
                tag.ClusterName(currentClusterName),
×
598
                tag.WorkflowDomainIDs(domainIDs),
×
599
                tag.FailoverMsg("from: "+standbyClusterName),
×
600
        )
×
601

×
602
        taskFilter := func(taskInfo task.Info) (bool, error) {
×
603
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
×
604
                if !ok {
×
605
                        return false, errUnexpectedQueueTask
×
606
                }
×
607
                if notRegistered, err := isDomainNotRegistered(shardContext, task.DomainID); notRegistered && err == nil {
×
608
                        logger.Info("Domain is not in registered status, skip task in failover transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
609
                        return false, nil
×
610
                }
×
611
                return taskAllocator.VerifyFailoverActiveTask(domainIDs, task.DomainID, task)
×
612
        }
613

614
        maxReadLevelTaskKey := newTransferTaskKey(maxLevel)
×
615
        updateMaxReadLevel := func() task.Key {
×
616
                return maxReadLevelTaskKey // this is a const
×
617
        }
×
618

619
        updateClusterAckLevel := func(ackLevel task.Key) error {
×
620
                taskID := ackLevel.(transferTaskKey).taskID
×
621
                return shardContext.UpdateTransferFailoverLevel(
×
622
                        failoverUUID,
×
623
                        shard.TransferFailoverLevel{
×
624
                                StartTime:    shardContext.GetTimeSource().Now(),
×
625
                                MinLevel:     minLevel,
×
626
                                CurrentLevel: taskID,
×
627
                                MaxLevel:     maxLevel,
×
628
                                DomainIDs:    domainIDs,
×
629
                        },
×
630
                )
×
631
        }
×
632

633
        queueShutdown := func() error {
×
634
                return shardContext.DeleteTransferFailoverLevel(failoverUUID)
×
635
        }
×
636

637
        processingQueueStates := []ProcessingQueueState{
×
638
                NewProcessingQueueState(
×
639
                        defaultProcessingQueueLevel,
×
640
                        newTransferTaskKey(minLevel),
×
641
                        maxReadLevelTaskKey,
×
642
                        NewDomainFilter(domainIDs, false),
×
643
                ),
×
644
        }
×
645

×
646
        return updateClusterAckLevel, newTransferQueueProcessorBase(
×
647
                shardContext,
×
648
                processingQueueStates,
×
649
                taskProcessor,
×
650
                options,
×
651
                updateMaxReadLevel,
×
652
                updateClusterAckLevel,
×
653
                nil,
×
654
                queueShutdown,
×
655
                taskFilter,
×
656
                taskExecutor,
×
657
                logger,
×
658
                shardContext.GetMetricsClient(),
×
659
        )
×
660
}
661

662
func loadTransferProcessingQueueStates(
663
        clusterName string,
664
        shard shard.Context,
665
        options *queueProcessorOptions,
666
        logger log.Logger,
667
) []ProcessingQueueState {
99✔
668
        ackLevel := shard.GetTransferClusterAckLevel(clusterName)
99✔
669
        if options.EnableLoadQueueStates() {
198✔
670
                pStates := shard.GetTransferProcessingQueueStates(clusterName)
99✔
671
                if validateProcessingQueueStates(pStates, ackLevel) {
198✔
672
                        return convertFromPersistenceTransferProcessingQueueStates(pStates)
99✔
673
                }
99✔
674

675
                logger.Error("Incompatible processing queue states and ackLevel",
×
676
                        tag.Value(pStates),
×
677
                        tag.ShardTransferAcks(ackLevel),
×
678
                )
×
679
        }
680

681
        // LoadQueueStates is disabled or sanity check failed
682
        // fallback to use ackLevel
683
        return []ProcessingQueueState{
×
684
                NewProcessingQueueState(
×
685
                        defaultProcessingQueueLevel,
×
686
                        newTransferTaskKey(ackLevel),
×
687
                        maximumTransferTaskKey,
×
688
                        NewDomainFilter(nil, true),
×
689
                ),
×
690
        }
×
691
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc