• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018f64f4-c8b4-4dac-8ed7-996896dd65a4

11 May 2024 12:01AM UTC coverage: 69.148% (-0.01%) from 69.162%
018f64f4-c8b4-4dac-8ed7-996896dd65a4

push

buildkite

web-flow
Write tests for replication task processor main loop (#6010)

1 of 1 new or added line in 1 file covered. (100.0%)

229 existing lines in 18 files now uncovered.

101597 of 146926 relevant lines covered (69.15%)

2644.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.49
/service/history/queue/transfer_queue_processor.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "math"
28
        "sync"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/pborman/uuid"
33

34
        "github.com/uber/cadence/common"
35
        "github.com/uber/cadence/common/dynamicconfig"
36
        "github.com/uber/cadence/common/log"
37
        "github.com/uber/cadence/common/log/tag"
38
        "github.com/uber/cadence/common/metrics"
39
        "github.com/uber/cadence/common/ndc"
40
        "github.com/uber/cadence/common/persistence"
41
        "github.com/uber/cadence/common/reconciliation/invariant"
42
        "github.com/uber/cadence/common/types"
43
        hcommon "github.com/uber/cadence/service/history/common"
44
        "github.com/uber/cadence/service/history/config"
45
        "github.com/uber/cadence/service/history/engine"
46
        "github.com/uber/cadence/service/history/execution"
47
        "github.com/uber/cadence/service/history/reset"
48
        "github.com/uber/cadence/service/history/shard"
49
        "github.com/uber/cadence/service/history/task"
50
        "github.com/uber/cadence/service/history/workflowcache"
51
        "github.com/uber/cadence/service/worker/archiver"
52
)
53

54
var (
55
        errUnexpectedQueueTask = errors.New("unexpected queue task")
56
        errProcessorShutdown   = errors.New("queue processor has been shutdown")
57

58
        maximumTransferTaskKey = newTransferTaskKey(math.MaxInt64)
59
)
60

61
type transferQueueProcessor struct {
62
        shard         shard.Context
63
        historyEngine engine.Engine
64
        taskProcessor task.Processor
65

66
        config             *config.Config
67
        currentClusterName string
68

69
        metricsClient metrics.Client
70
        logger        log.Logger
71

72
        status       int32
73
        shutdownChan chan struct{}
74
        shutdownWG   sync.WaitGroup
75

76
        ackLevel                int64
77
        taskAllocator           TaskAllocator
78
        activeTaskExecutor      task.Executor
79
        activeQueueProcessor    *transferQueueProcessorBase
80
        standbyQueueProcessors  map[string]*transferQueueProcessorBase
81
        failoverQueueProcessors []*transferQueueProcessorBase
82
}
83

84
// NewTransferQueueProcessor creates a new transfer QueueProcessor
85
func NewTransferQueueProcessor(
86
        shard shard.Context,
87
        historyEngine engine.Engine,
88
        taskProcessor task.Processor,
89
        executionCache *execution.Cache,
90
        workflowResetter reset.WorkflowResetter,
91
        archivalClient archiver.Client,
92
        executionCheck invariant.Invariant,
93
        wfIDCache workflowcache.WFCache,
94
        ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter,
95
) Processor {
77✔
96
        logger := shard.GetLogger().WithTags(tag.ComponentTransferQueue)
77✔
97
        currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
77✔
98
        config := shard.GetConfig()
77✔
99
        taskAllocator := NewTaskAllocator(shard)
77✔
100

77✔
101
        activeTaskExecutor := task.NewTransferActiveTaskExecutor(
77✔
102
                shard,
77✔
103
                archivalClient,
77✔
104
                executionCache,
77✔
105
                workflowResetter,
77✔
106
                logger,
77✔
107
                config,
77✔
108
                wfIDCache,
77✔
109
                ratelimitInternalPerWorkflowID,
77✔
110
        )
77✔
111

77✔
112
        activeQueueProcessor := newTransferQueueActiveProcessor(
77✔
113
                shard,
77✔
114
                taskProcessor,
77✔
115
                taskAllocator,
77✔
116
                activeTaskExecutor,
77✔
117
                logger,
77✔
118
        )
77✔
119

77✔
120
        standbyQueueProcessors := make(map[string]*transferQueueProcessorBase)
77✔
121
        for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() {
154✔
122
                historyResender := ndc.NewHistoryResender(
77✔
123
                        shard.GetDomainCache(),
77✔
124
                        shard.GetService().GetClientBean().GetRemoteAdminClient(clusterName),
77✔
125
                        func(ctx context.Context, request *types.ReplicateEventsV2Request) error {
77✔
126
                                return historyEngine.ReplicateEventsV2(ctx, request)
×
127
                        },
×
128
                        config.StandbyTaskReReplicationContextTimeout,
129
                        executionCheck,
130
                        shard.GetLogger(),
131
                )
132
                standbyTaskExecutor := task.NewTransferStandbyTaskExecutor(
77✔
133
                        shard,
77✔
134
                        archivalClient,
77✔
135
                        executionCache,
77✔
136
                        historyResender,
77✔
137
                        logger,
77✔
138
                        clusterName,
77✔
139
                        config,
77✔
140
                )
77✔
141
                standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor(
77✔
142
                        clusterName,
77✔
143
                        shard,
77✔
144
                        taskProcessor,
77✔
145
                        taskAllocator,
77✔
146
                        standbyTaskExecutor,
77✔
147
                        logger,
77✔
148
                )
77✔
149
        }
150

151
        return &transferQueueProcessor{
77✔
152
                shard:                  shard,
77✔
153
                historyEngine:          historyEngine,
77✔
154
                taskProcessor:          taskProcessor,
77✔
155
                config:                 config,
77✔
156
                currentClusterName:     currentClusterName,
77✔
157
                metricsClient:          shard.GetMetricsClient(),
77✔
158
                logger:                 logger,
77✔
159
                status:                 common.DaemonStatusInitialized,
77✔
160
                shutdownChan:           make(chan struct{}),
77✔
161
                ackLevel:               shard.GetTransferAckLevel(),
77✔
162
                taskAllocator:          taskAllocator,
77✔
163
                activeTaskExecutor:     activeTaskExecutor,
77✔
164
                activeQueueProcessor:   activeQueueProcessor,
77✔
165
                standbyQueueProcessors: standbyQueueProcessors,
77✔
166
        }
77✔
167
}
168

169
func (t *transferQueueProcessor) Start() {
76✔
170
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
76✔
UNCOV
171
                return
×
172
        }
×
173

174
        t.activeQueueProcessor.Start()
76✔
175
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
152✔
176
                standbyQueueProcessor.Start()
76✔
177
        }
76✔
178

179
        t.shutdownWG.Add(1)
76✔
180
        go t.completeTransferLoop()
76✔
181
}
182

183
func (t *transferQueueProcessor) Stop() {
76✔
184
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
76✔
UNCOV
185
                return
×
186
        }
×
187

188
        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
151✔
189
                t.activeQueueProcessor.Stop()
75✔
190
                for _, standbyQueueProcessor := range t.standbyQueueProcessors {
150✔
191
                        standbyQueueProcessor.Stop()
75✔
192
                }
75✔
193

194
                close(t.shutdownChan)
75✔
195
                common.AwaitWaitGroup(&t.shutdownWG, time.Minute)
75✔
196
                return
75✔
197
        }
198

199
        // close the shutdown channel so processor pump goroutine drains tasks and then stop the processors
200
        close(t.shutdownChan)
1✔
201
        if !common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout) {
1✔
UNCOV
202
                t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout)
×
203
        }
×
204
        t.activeQueueProcessor.Stop()
1✔
205
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
2✔
206
                standbyQueueProcessor.Stop()
1✔
207
        }
1✔
208

209
        if len(t.failoverQueueProcessors) > 0 {
1✔
UNCOV
210
                t.logger.Info("Shutting down failover transfer queues", tag.Counter(len(t.failoverQueueProcessors)))
×
UNCOV
211
                for _, failoverQueueProcessor := range t.failoverQueueProcessors {
×
UNCOV
212
                        failoverQueueProcessor.Stop()
×
213
                }
×
214
        }
215
}
216

217
func (t *transferQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
2,296✔
218
        if len(info.Tasks) == 0 {
2,296✔
UNCOV
219
                return
×
UNCOV
220
        }
×
221

222
        if clusterName == t.currentClusterName {
4,592✔
223
                t.activeQueueProcessor.notifyNewTask(info)
2,296✔
224
                return
2,296✔
225
        }
2,296✔
226

227
        standbyQueueProcessor, ok := t.standbyQueueProcessors[clusterName]
3✔
228
        if !ok {
3✔
229
                panic(fmt.Sprintf("Cannot find transfer processor for %s.", clusterName))
×
230
        }
231
        standbyQueueProcessor.notifyNewTask(info)
3✔
232
}
233

234
func (t *transferQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) {
×
UNCOV
235
        // Failover queue is used to scan all inflight tasks, if queue processor is not
×
236
        // started, there's no inflight task and we don't need to create a failover processor.
×
237
        // Also the HandleAction will be blocked if queue processor processing loop is not running.
×
238
        if atomic.LoadInt32(&t.status) != common.DaemonStatusStarted {
×
239
                return
×
240
        }
×
241

242
        minLevel := t.shard.GetTransferClusterAckLevel(t.currentClusterName)
×
243
        standbyClusterName := t.currentClusterName
×
UNCOV
244
        for clusterName := range t.shard.GetClusterMetadata().GetEnabledClusterInfo() {
×
UNCOV
245
                ackLevel := t.shard.GetTransferClusterAckLevel(clusterName)
×
246
                if ackLevel < minLevel {
×
247
                        minLevel = ackLevel
×
248
                        standbyClusterName = clusterName
×
249
                }
×
250
        }
251

252
        maxReadLevel := int64(0)
×
253
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
×
UNCOV
254
        if err != nil {
×
255
                t.logger.Error("Transfer Failover Failed", tag.WorkflowDomainIDs(domainIDs), tag.Error(err))
×
UNCOV
256
                if err == errProcessorShutdown {
×
257
                        // processor/shard already shutdown, we don't need to create failover queue processor
×
258
                        return
×
259
                }
×
260
                // other errors should never be returned for GetStateAction
261
                panic(fmt.Sprintf("unknown error for GetStateAction: %v", err))
×
262
        }
UNCOV
263
        for _, queueState := range actionResult.GetStateActionResult.States {
×
264
                queueReadLevel := queueState.ReadLevel().(transferTaskKey).taskID
×
265
                if maxReadLevel < queueReadLevel {
×
266
                        maxReadLevel = queueReadLevel
×
267
                }
×
268
        }
269
        // maxReadLevel is exclusive, so add 1
270
        maxReadLevel++
×
271

×
272
        t.logger.Info("Transfer Failover Triggered",
×
273
                tag.WorkflowDomainIDs(domainIDs),
×
274
                tag.MinLevel(minLevel),
×
275
                tag.MaxLevel(maxReadLevel))
×
276

×
277
        updateShardAckLevel, failoverQueueProcessor := newTransferQueueFailoverProcessor(
×
278
                t.shard,
×
279
                t.taskProcessor,
×
280
                t.taskAllocator,
×
281
                t.activeTaskExecutor,
×
282
                t.logger,
×
283
                minLevel,
×
284
                maxReadLevel,
×
285
                domainIDs,
×
286
                standbyClusterName,
×
287
        )
×
288

×
289
        // NOTE: READ REF BEFORE MODIFICATION
×
290
        // ref: historyEngine.go registerDomainFailoverCallback function
×
UNCOV
291
        err = updateShardAckLevel(newTransferTaskKey(minLevel))
×
UNCOV
292
        if err != nil {
×
UNCOV
293
                t.logger.Error("Error update shard ack level", tag.Error(err))
×
UNCOV
294
        }
×
295

296
        // Failover queue processors are started on the fly when domains are failed over.
297
        // Failover queue processors will be stopped when the transfer queue instance is stopped (due to restart or shard movement).
298
        // This means the failover queue processor might not finish its job.
299
        // There is no mechanism to re-start ongoing failover queue processors in the new shard owner.
UNCOV
300
        t.failoverQueueProcessors = append(t.failoverQueueProcessors, failoverQueueProcessor)
×
UNCOV
301
        failoverQueueProcessor.Start()
×
302
}
303

304
func (t *transferQueueProcessor) HandleAction(
305
        ctx context.Context,
306
        clusterName string,
307
        action *Action,
308
) (*ActionResult, error) {
292✔
309
        var resultNotificationCh chan actionResultNotification
292✔
310
        var added bool
292✔
311
        if clusterName == t.currentClusterName {
476✔
312
                resultNotificationCh, added = t.activeQueueProcessor.addAction(ctx, action)
184✔
313
        } else {
294✔
314
                found := false
110✔
315
                for standbyClusterName, standbyProcessor := range t.standbyQueueProcessors {
220✔
316
                        if clusterName == standbyClusterName {
220✔
317
                                resultNotificationCh, added = standbyProcessor.addAction(ctx, action)
110✔
318
                                found = true
110✔
319
                                break
110✔
320
                        }
321
                }
322

323
                if !found {
110✔
UNCOV
324
                        return nil, fmt.Errorf("unknown cluster name: %v", clusterName)
×
UNCOV
325
                }
×
326
        }
327

328
        if !added {
367✔
329
                if ctxErr := ctx.Err(); ctxErr != nil {
75✔
330
                        return nil, ctxErr
×
UNCOV
331
                }
×
332
                return nil, errProcessorShutdown
75✔
333
        }
334

335
        select {
219✔
336
        case resultNotification := <-resultNotificationCh:
218✔
337
                return resultNotification.result, resultNotification.err
218✔
338
        case <-t.shutdownChan:
1✔
339
                return nil, errProcessorShutdown
1✔
UNCOV
340
        case <-ctx.Done():
×
UNCOV
341
                return nil, ctx.Err()
×
342
        }
343
}
344

345
func (t *transferQueueProcessor) LockTaskProcessing() {
798✔
346
        t.taskAllocator.Lock()
798✔
347
}
798✔
348

349
func (t *transferQueueProcessor) UnlockTaskProcessing() {
798✔
350
        t.taskAllocator.Unlock()
798✔
351
}
798✔
352

353
func (t *transferQueueProcessor) drain() {
76✔
354
        // before shutdown, make sure the ack level is up to date
76✔
355
        if err := t.completeTransfer(); err != nil {
152✔
356
                t.logger.Error("Failed to complete transfer task during shutdown", tag.Error(err))
76✔
357
        }
76✔
358
}
359

360
func (t *transferQueueProcessor) completeTransferLoop() {
76✔
361
        defer t.shutdownWG.Done()
76✔
362

76✔
363
        completeTimer := time.NewTimer(t.config.TransferProcessorCompleteTransferInterval())
76✔
364
        defer completeTimer.Stop()
76✔
365

76✔
366
        for {
260✔
367
                select {
184✔
368
                case <-t.shutdownChan:
76✔
369
                        t.drain()
76✔
370
                        return
76✔
371
                case <-completeTimer.C:
110✔
372
                        for attempt := 0; attempt < t.config.TransferProcessorCompleteTransferFailureRetryCount(); attempt++ {
220✔
373
                                err := t.completeTransfer()
110✔
374
                                if err == nil {
220✔
375
                                        break
110✔
376
                                }
377

UNCOV
378
                                t.logger.Error("Failed to complete transfer task", tag.Error(err))
×
UNCOV
379
                                var errShardClosed *shard.ErrShardClosed
×
380
                                if errors.As(err, &errShardClosed) {
×
381
                                        // shard closed, trigger shutdown and bail out
×
382
                                        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
×
383
                                                go t.Stop()
×
384
                                                return
×
UNCOV
385
                                        }
×
386

UNCOV
387
                                        t.Stop()
×
UNCOV
388
                                        return
×
389
                                }
390

UNCOV
391
                                select {
×
UNCOV
392
                                case <-t.shutdownChan:
×
UNCOV
393
                                        t.drain()
×
UNCOV
394
                                        return
×
UNCOV
395
                                case <-time.After(time.Duration(attempt*100) * time.Millisecond):
×
396
                                        // do nothing. retry loop will continue
397
                                }
398
                        }
399

400
                        completeTimer.Reset(t.config.TransferProcessorCompleteTransferInterval())
110✔
401
                }
402
        }
403
}
404

405
func (t *transferQueueProcessor) completeTransfer() error {
184✔
406
        newAckLevel := maximumTransferTaskKey
184✔
407
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
184✔
408
        if err != nil {
260✔
409
                return err
76✔
410
        }
76✔
411
        for _, queueState := range actionResult.GetStateActionResult.States {
220✔
412
                newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
110✔
413
        }
110✔
414

415
        for standbyClusterName := range t.standbyQueueProcessors {
220✔
416
                actionResult, err := t.HandleAction(context.Background(), standbyClusterName, NewGetStateAction())
110✔
417
                if err != nil {
110✔
418
                        return err
×
419
                }
×
420
                for _, queueState := range actionResult.GetStateActionResult.States {
220✔
421
                        newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
110✔
422
                }
110✔
423
        }
424

425
        for _, failoverInfo := range t.shard.GetAllTransferFailoverLevels() {
110✔
UNCOV
426
                failoverLevel := newTransferTaskKey(failoverInfo.MinLevel)
×
UNCOV
427
                if newAckLevel == nil {
×
UNCOV
428
                        newAckLevel = failoverLevel
×
UNCOV
429
                } else {
×
UNCOV
430
                        newAckLevel = minTaskKey(newAckLevel, failoverLevel)
×
UNCOV
431
                }
×
432
        }
433

434
        if newAckLevel == nil {
110✔
UNCOV
435
                panic("Unable to get transfer queue processor ack level")
×
436
        }
437

438
        newAckLevelTaskID := newAckLevel.(transferTaskKey).taskID
110✔
439
        t.logger.Debug(fmt.Sprintf("Start completing transfer task from: %v, to %v.", t.ackLevel, newAckLevelTaskID))
110✔
440
        if t.ackLevel >= newAckLevelTaskID {
116✔
441
                return nil
6✔
442
        }
6✔
443

444
        t.metricsClient.Scope(metrics.TransferQueueProcessorScope).
104✔
445
                Tagged(metrics.ShardIDTag(t.shard.GetShardID())).
104✔
446
                IncCounter(metrics.TaskBatchCompleteCounter)
104✔
447

104✔
448
        for {
208✔
449
                pageSize := t.config.TransferTaskDeleteBatchSize()
104✔
450
                resp, err := t.shard.GetExecutionManager().RangeCompleteTransferTask(context.Background(), &persistence.RangeCompleteTransferTaskRequest{
104✔
451
                        ExclusiveBeginTaskID: t.ackLevel,
104✔
452
                        InclusiveEndTaskID:   newAckLevelTaskID,
104✔
453
                        PageSize:             pageSize, // pageSize may or may not be honored
104✔
454
                })
104✔
455
                if err != nil {
104✔
UNCOV
456
                        return err
×
UNCOV
457
                }
×
458
                if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, pageSize) {
208✔
459
                        break
104✔
460
                }
461
        }
462

463
        t.ackLevel = newAckLevelTaskID
104✔
464

104✔
465
        return t.shard.UpdateTransferAckLevel(newAckLevelTaskID)
104✔
466
}
467

468
func newTransferQueueActiveProcessor(
469
        shard shard.Context,
470
        taskProcessor task.Processor,
471
        taskAllocator TaskAllocator,
472
        taskExecutor task.Executor,
473
        logger log.Logger,
474
) *transferQueueProcessorBase {
77✔
475
        config := shard.GetConfig()
77✔
476
        options := newTransferQueueProcessorOptions(config, true, false)
77✔
477

77✔
478
        currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
77✔
479
        logger = logger.WithTags(tag.ClusterName(currentClusterName))
77✔
480

77✔
481
        taskFilter := func(taskInfo task.Info) (bool, error) {
3,237✔
482
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
3,160✔
483
                if !ok {
3,160✔
UNCOV
484
                        return false, errUnexpectedQueueTask
×
UNCOV
485
                }
×
486
                if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil {
3,160✔
UNCOV
487
                        logger.Info("Domain is not in registered status, skip task in active transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
488
                        return false, nil
×
489
                }
×
490
                return taskAllocator.VerifyActiveTask(task.DomainID, task)
3,160✔
491
        }
492

493
        updateMaxReadLevel := func() task.Key {
2,633✔
494
                return newTransferTaskKey(shard.GetTransferMaxReadLevel())
2,556✔
495
        }
2,556✔
496

497
        updateClusterAckLevel := func(ackLevel task.Key) error {
77✔
498
                taskID := ackLevel.(transferTaskKey).taskID
×
499
                return shard.UpdateTransferClusterAckLevel(currentClusterName, taskID)
×
UNCOV
500
        }
×
501

502
        updateProcessingQueueStates := func(states []ProcessingQueueState) error {
307✔
503
                pStates := convertToPersistenceTransferProcessingQueueStates(states)
230✔
504
                return shard.UpdateTransferProcessingQueueStates(currentClusterName, pStates)
230✔
505
        }
230✔
506

507
        queueShutdown := func() error {
77✔
UNCOV
508
                return nil
×
UNCOV
509
        }
×
510

511
        return newTransferQueueProcessorBase(
77✔
512
                shard,
77✔
513
                loadTransferProcessingQueueStates(currentClusterName, shard, options, logger),
77✔
514
                taskProcessor,
77✔
515
                options,
77✔
516
                updateMaxReadLevel,
77✔
517
                updateClusterAckLevel,
77✔
518
                updateProcessingQueueStates,
77✔
519
                queueShutdown,
77✔
520
                taskFilter,
77✔
521
                taskExecutor,
77✔
522
                logger,
77✔
523
                shard.GetMetricsClient(),
77✔
524
        )
77✔
525
}
526

527
func newTransferQueueStandbyProcessor(
528
        clusterName string,
529
        shard shard.Context,
530
        taskProcessor task.Processor,
531
        taskAllocator TaskAllocator,
532
        taskExecutor task.Executor,
533
        logger log.Logger,
534
) *transferQueueProcessorBase {
77✔
535
        config := shard.GetConfig()
77✔
536
        options := newTransferQueueProcessorOptions(config, false, false)
77✔
537

77✔
538
        logger = logger.WithTags(tag.ClusterName(clusterName))
77✔
539

77✔
540
        taskFilter := func(taskInfo task.Info) (bool, error) {
2,814✔
541
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
2,737✔
542
                if !ok {
2,737✔
UNCOV
543
                        return false, errUnexpectedQueueTask
×
UNCOV
544
                }
×
545
                if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil {
2,737✔
UNCOV
546
                        logger.Info("Domain is not in registered status, skip task in standby transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
UNCOV
547
                        return false, nil
×
548
                }
×
549
                if task.TaskType == persistence.TransferTaskTypeCloseExecution ||
2,737✔
550
                        task.TaskType == persistence.TransferTaskTypeRecordWorkflowClosed {
3,205✔
551
                        domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID)
468✔
552
                        if err == nil {
936✔
553
                                if domainEntry.HasReplicationCluster(clusterName) {
471✔
554
                                        // guarantee the processing of workflow execution close
3✔
555
                                        return true, nil
3✔
556
                                }
3✔
UNCOV
557
                        } else {
×
UNCOV
558
                                if _, ok := err.(*types.EntityNotExistsError); !ok {
×
UNCOV
559
                                        // retry the task if failed to find the domain
×
UNCOV
560
                                        logger.Warn("Cannot find domain", tag.WorkflowDomainID(task.DomainID))
×
UNCOV
561
                                        return false, err
×
UNCOV
562
                                }
×
UNCOV
563
                                logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(task.DomainID), tag.Value(task))
×
UNCOV
564
                                return false, nil
×
565
                        }
566
                }
567
                return taskAllocator.VerifyStandbyTask(clusterName, task.DomainID, task)
2,737✔
568
        }
569

570
        updateMaxReadLevel := func() task.Key {
372✔
571
                return newTransferTaskKey(shard.GetTransferMaxReadLevel())
295✔
572
        }
295✔
573

574
        updateClusterAckLevel := func(ackLevel task.Key) error {
77✔
UNCOV
575
                taskID := ackLevel.(transferTaskKey).taskID
×
576
                return shard.UpdateTransferClusterAckLevel(clusterName, taskID)
×
577
        }
×
578

579
        updateProcessingQueueStates := func(states []ProcessingQueueState) error {
303✔
580
                pStates := convertToPersistenceTransferProcessingQueueStates(states)
226✔
581
                return shard.UpdateTransferProcessingQueueStates(clusterName, pStates)
226✔
582
        }
226✔
583

584
        queueShutdown := func() error {
77✔
UNCOV
585
                return nil
×
UNCOV
586
        }
×
587

588
        return newTransferQueueProcessorBase(
77✔
589
                shard,
77✔
590
                loadTransferProcessingQueueStates(clusterName, shard, options, logger),
77✔
591
                taskProcessor,
77✔
592
                options,
77✔
593
                updateMaxReadLevel,
77✔
594
                updateClusterAckLevel,
77✔
595
                updateProcessingQueueStates,
77✔
596
                queueShutdown,
77✔
597
                taskFilter,
77✔
598
                taskExecutor,
77✔
599
                logger,
77✔
600
                shard.GetMetricsClient(),
77✔
601
        )
77✔
602
}
603

604
func newTransferQueueFailoverProcessor(
605
        shardContext shard.Context,
606
        taskProcessor task.Processor,
607
        taskAllocator TaskAllocator,
608
        taskExecutor task.Executor,
609
        logger log.Logger,
610
        minLevel, maxLevel int64,
611
        domainIDs map[string]struct{},
612
        standbyClusterName string,
613
) (updateClusterAckLevelFn, *transferQueueProcessorBase) {
×
614
        config := shardContext.GetConfig()
×
615
        options := newTransferQueueProcessorOptions(config, true, true)
×
616

×
617
        currentClusterName := shardContext.GetService().GetClusterMetadata().GetCurrentClusterName()
×
618
        failoverUUID := uuid.New()
×
619
        logger = logger.WithTags(
×
620
                tag.ClusterName(currentClusterName),
×
621
                tag.WorkflowDomainIDs(domainIDs),
×
622
                tag.FailoverMsg("from: "+standbyClusterName),
×
623
        )
×
624

×
625
        taskFilter := func(taskInfo task.Info) (bool, error) {
×
626
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
×
UNCOV
627
                if !ok {
×
UNCOV
628
                        return false, errUnexpectedQueueTask
×
629
                }
×
630
                if notRegistered, err := isDomainNotRegistered(shardContext, task.DomainID); notRegistered && err == nil {
×
631
                        logger.Info("Domain is not in registered status, skip task in failover transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
632
                        return false, nil
×
UNCOV
633
                }
×
634
                return taskAllocator.VerifyFailoverActiveTask(domainIDs, task.DomainID, task)
×
635
        }
636

637
        maxReadLevelTaskKey := newTransferTaskKey(maxLevel)
×
638
        updateMaxReadLevel := func() task.Key {
×
639
                return maxReadLevelTaskKey // this is a const
×
640
        }
×
641

642
        updateClusterAckLevel := func(ackLevel task.Key) error {
×
643
                taskID := ackLevel.(transferTaskKey).taskID
×
644
                return shardContext.UpdateTransferFailoverLevel(
×
645
                        failoverUUID,
×
646
                        shard.TransferFailoverLevel{
×
UNCOV
647
                                StartTime:    shardContext.GetTimeSource().Now(),
×
648
                                MinLevel:     minLevel,
×
649
                                CurrentLevel: taskID,
×
650
                                MaxLevel:     maxLevel,
×
UNCOV
651
                                DomainIDs:    domainIDs,
×
652
                        },
×
653
                )
×
654
        }
×
655

656
        queueShutdown := func() error {
×
657
                return shardContext.DeleteTransferFailoverLevel(failoverUUID)
×
658
        }
×
659

660
        processingQueueStates := []ProcessingQueueState{
×
661
                NewProcessingQueueState(
×
662
                        defaultProcessingQueueLevel,
×
663
                        newTransferTaskKey(minLevel),
×
664
                        maxReadLevelTaskKey,
×
665
                        NewDomainFilter(domainIDs, false),
×
666
                ),
×
667
        }
×
668

×
669
        return updateClusterAckLevel, newTransferQueueProcessorBase(
×
670
                shardContext,
×
671
                processingQueueStates,
×
672
                taskProcessor,
×
673
                options,
×
674
                updateMaxReadLevel,
×
UNCOV
675
                updateClusterAckLevel,
×
UNCOV
676
                nil,
×
UNCOV
677
                queueShutdown,
×
UNCOV
678
                taskFilter,
×
UNCOV
679
                taskExecutor,
×
UNCOV
680
                logger,
×
UNCOV
681
                shardContext.GetMetricsClient(),
×
UNCOV
682
        )
×
683
}
684

685
func loadTransferProcessingQueueStates(
686
        clusterName string,
687
        shard shard.Context,
688
        options *queueProcessorOptions,
689
        logger log.Logger,
690
) []ProcessingQueueState {
151✔
691
        ackLevel := shard.GetTransferClusterAckLevel(clusterName)
151✔
692
        if options.EnableLoadQueueStates() {
302✔
693
                pStates := shard.GetTransferProcessingQueueStates(clusterName)
151✔
694
                if validateProcessingQueueStates(pStates, ackLevel) {
302✔
695
                        return convertFromPersistenceTransferProcessingQueueStates(pStates)
151✔
696
                }
151✔
697

698
                logger.Error("Incompatible processing queue states and ackLevel",
×
699
                        tag.Value(pStates),
×
700
                        tag.ShardTransferAcks(ackLevel),
×
701
                )
×
702
        }
703

704
        // LoadQueueStates is disabled or sanity check failed
705
        // fallback to use ackLevel
UNCOV
706
        return []ProcessingQueueState{
×
UNCOV
707
                NewProcessingQueueState(
×
UNCOV
708
                        defaultProcessingQueueLevel,
×
UNCOV
709
                        newTransferTaskKey(ackLevel),
×
UNCOV
710
                        maximumTransferTaskKey,
×
UNCOV
711
                        NewDomainFilter(nil, true),
×
UNCOV
712
                ),
×
UNCOV
713
        }
×
714
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc