• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018f127b-cd0f-43c3-b47a-35b58b7623db

24 Apr 2024 11:40PM UTC coverage: 67.722% (+0.008%) from 67.714%
018f127b-cd0f-43c3-b47a-35b58b7623db

push

buildkite

web-flow
Add double read for latency comparison for Pinot Migration (#5927)

* Add double read for latency comparison for Pinot Migration

* use go routine to make sure the primary read will not fail; update unit tests

* reformat

111 of 112 new or added lines in 4 files covered. (99.11%)

61 existing lines in 13 files now uncovered.

99335 of 146680 relevant lines covered (67.72%)

2377.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.17
/service/history/queue/transfer_queue_processor.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "math"
28
        "sync"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/pborman/uuid"
33

34
        "github.com/uber/cadence/common"
35
        "github.com/uber/cadence/common/dynamicconfig"
36
        "github.com/uber/cadence/common/log"
37
        "github.com/uber/cadence/common/log/tag"
38
        "github.com/uber/cadence/common/metrics"
39
        "github.com/uber/cadence/common/ndc"
40
        "github.com/uber/cadence/common/persistence"
41
        "github.com/uber/cadence/common/reconciliation/invariant"
42
        "github.com/uber/cadence/common/types"
43
        hcommon "github.com/uber/cadence/service/history/common"
44
        "github.com/uber/cadence/service/history/config"
45
        "github.com/uber/cadence/service/history/engine"
46
        "github.com/uber/cadence/service/history/execution"
47
        "github.com/uber/cadence/service/history/reset"
48
        "github.com/uber/cadence/service/history/shard"
49
        "github.com/uber/cadence/service/history/task"
50
        "github.com/uber/cadence/service/history/workflowcache"
51
        "github.com/uber/cadence/service/worker/archiver"
52
)
53

54
var (
55
        errUnexpectedQueueTask = errors.New("unexpected queue task")
56
        errProcessorShutdown   = errors.New("queue processor has been shutdown")
57

58
        maximumTransferTaskKey = newTransferTaskKey(math.MaxInt64)
59
)
60

61
type transferQueueProcessor struct {
62
        shard         shard.Context
63
        historyEngine engine.Engine
64
        taskProcessor task.Processor
65

66
        config             *config.Config
67
        currentClusterName string
68

69
        metricsClient metrics.Client
70
        logger        log.Logger
71

72
        status       int32
73
        shutdownChan chan struct{}
74
        shutdownWG   sync.WaitGroup
75

76
        ackLevel               int64
77
        taskAllocator          TaskAllocator
78
        activeTaskExecutor     task.Executor
79
        activeQueueProcessor   *transferQueueProcessorBase
80
        standbyQueueProcessors map[string]*transferQueueProcessorBase
81
}
82

83
// NewTransferQueueProcessor creates a new transfer QueueProcessor
84
func NewTransferQueueProcessor(
85
        shard shard.Context,
86
        historyEngine engine.Engine,
87
        taskProcessor task.Processor,
88
        executionCache *execution.Cache,
89
        workflowResetter reset.WorkflowResetter,
90
        archivalClient archiver.Client,
91
        executionCheck invariant.Invariant,
92
        wfIDCache workflowcache.WFCache,
93
        ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter,
94
) Processor {
64✔
95
        logger := shard.GetLogger().WithTags(tag.ComponentTransferQueue)
64✔
96
        currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
64✔
97
        config := shard.GetConfig()
64✔
98
        taskAllocator := NewTaskAllocator(shard)
64✔
99

64✔
100
        activeTaskExecutor := task.NewTransferActiveTaskExecutor(
64✔
101
                shard,
64✔
102
                archivalClient,
64✔
103
                executionCache,
64✔
104
                workflowResetter,
64✔
105
                logger,
64✔
106
                config,
64✔
107
                wfIDCache,
64✔
108
                ratelimitInternalPerWorkflowID,
64✔
109
        )
64✔
110

64✔
111
        activeQueueProcessor := newTransferQueueActiveProcessor(
64✔
112
                shard,
64✔
113
                historyEngine,
64✔
114
                taskProcessor,
64✔
115
                taskAllocator,
64✔
116
                activeTaskExecutor,
64✔
117
                logger,
64✔
118
        )
64✔
119

64✔
120
        standbyQueueProcessors := make(map[string]*transferQueueProcessorBase)
64✔
121
        for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() {
128✔
122
                historyResender := ndc.NewHistoryResender(
64✔
123
                        shard.GetDomainCache(),
64✔
124
                        shard.GetService().GetClientBean().GetRemoteAdminClient(clusterName),
64✔
125
                        func(ctx context.Context, request *types.ReplicateEventsV2Request) error {
64✔
126
                                return historyEngine.ReplicateEventsV2(ctx, request)
×
127
                        },
×
128
                        config.StandbyTaskReReplicationContextTimeout,
129
                        executionCheck,
130
                        shard.GetLogger(),
131
                )
132
                standbyTaskExecutor := task.NewTransferStandbyTaskExecutor(
64✔
133
                        shard,
64✔
134
                        archivalClient,
64✔
135
                        executionCache,
64✔
136
                        historyResender,
64✔
137
                        logger,
64✔
138
                        clusterName,
64✔
139
                        config,
64✔
140
                )
64✔
141
                standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor(
64✔
142
                        clusterName,
64✔
143
                        shard,
64✔
144
                        historyEngine,
64✔
145
                        taskProcessor,
64✔
146
                        taskAllocator,
64✔
147
                        standbyTaskExecutor,
64✔
148
                        logger,
64✔
149
                )
64✔
150
        }
151

152
        return &transferQueueProcessor{
64✔
153
                shard:                  shard,
64✔
154
                historyEngine:          historyEngine,
64✔
155
                taskProcessor:          taskProcessor,
64✔
156
                config:                 config,
64✔
157
                currentClusterName:     currentClusterName,
64✔
158
                metricsClient:          shard.GetMetricsClient(),
64✔
159
                logger:                 logger,
64✔
160
                status:                 common.DaemonStatusInitialized,
64✔
161
                shutdownChan:           make(chan struct{}),
64✔
162
                ackLevel:               shard.GetTransferAckLevel(),
64✔
163
                taskAllocator:          taskAllocator,
64✔
164
                activeTaskExecutor:     activeTaskExecutor,
64✔
165
                activeQueueProcessor:   activeQueueProcessor,
64✔
166
                standbyQueueProcessors: standbyQueueProcessors,
64✔
167
        }
64✔
168
}
169

170
func (t *transferQueueProcessor) Start() {
64✔
171
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
64✔
172
                return
×
173
        }
×
174

175
        t.activeQueueProcessor.Start()
64✔
176
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
128✔
177
                standbyQueueProcessor.Start()
64✔
178
        }
64✔
179

180
        t.shutdownWG.Add(1)
64✔
181
        go t.completeTransferLoop()
64✔
182
}
183

184
func (t *transferQueueProcessor) Stop() {
64✔
185
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
64✔
186
                return
×
187
        }
×
188

189
        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
127✔
190
                t.activeQueueProcessor.Stop()
63✔
191
                for _, standbyQueueProcessor := range t.standbyQueueProcessors {
126✔
192
                        standbyQueueProcessor.Stop()
63✔
193
                }
63✔
194

195
                close(t.shutdownChan)
63✔
196
                common.AwaitWaitGroup(&t.shutdownWG, time.Minute)
63✔
197
                return
63✔
198
        }
199

200
        // close the shutdown channel so processor pump goroutine drains tasks and then stop the processors
201
        close(t.shutdownChan)
1✔
202
        if !common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout) {
1✔
203
                t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout)
×
204
        }
×
205
        t.activeQueueProcessor.Stop()
1✔
206
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
2✔
207
                standbyQueueProcessor.Stop()
1✔
208
        }
1✔
209
}
210

211
func (t *transferQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
2,249✔
212
        if len(info.Tasks) == 0 {
2,249✔
213
                return
×
214
        }
×
215

216
        if clusterName == t.currentClusterName {
4,498✔
217
                t.activeQueueProcessor.notifyNewTask(info)
2,249✔
218
                return
2,249✔
219
        }
2,249✔
220

221
        standbyQueueProcessor, ok := t.standbyQueueProcessors[clusterName]
3✔
222
        if !ok {
3✔
223
                panic(fmt.Sprintf("Cannot find transfer processor for %s.", clusterName))
×
224
        }
225
        standbyQueueProcessor.notifyNewTask(info)
3✔
226
}
227

228
func (t *transferQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) {
×
229
        // Failover queue is used to scan all inflight tasks, if queue processor is not
×
230
        // started, there's no inflight task and we don't need to create a failover processor.
×
231
        // Also the HandleAction will be blocked if queue processor processing loop is not running.
×
232
        if atomic.LoadInt32(&t.status) != common.DaemonStatusStarted {
×
233
                return
×
234
        }
×
235

236
        minLevel := t.shard.GetTransferClusterAckLevel(t.currentClusterName)
×
237
        standbyClusterName := t.currentClusterName
×
238
        for clusterName := range t.shard.GetClusterMetadata().GetEnabledClusterInfo() {
×
239
                ackLevel := t.shard.GetTransferClusterAckLevel(clusterName)
×
240
                if ackLevel < minLevel {
×
241
                        minLevel = ackLevel
×
242
                        standbyClusterName = clusterName
×
243
                }
×
244
        }
245

246
        maxReadLevel := int64(0)
×
247
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
×
248
        if err != nil {
×
249
                t.logger.Error("Transfer Failover Failed", tag.WorkflowDomainIDs(domainIDs), tag.Error(err))
×
250
                if err == errProcessorShutdown {
×
251
                        // processor/shard already shutdown, we don't need to create failover queue processor
×
252
                        return
×
253
                }
×
254
                // other errors should never be returned for GetStateAction
255
                panic(fmt.Sprintf("unknown error for GetStateAction: %v", err))
×
256
        }
257
        for _, queueState := range actionResult.GetStateActionResult.States {
×
258
                queueReadLevel := queueState.ReadLevel().(transferTaskKey).taskID
×
259
                if maxReadLevel < queueReadLevel {
×
260
                        maxReadLevel = queueReadLevel
×
261
                }
×
262
        }
263
        // maxReadLevel is exclusive, so add 1
264
        maxReadLevel++
×
265

×
266
        t.logger.Info("Transfer Failover Triggered",
×
267
                tag.WorkflowDomainIDs(domainIDs),
×
268
                tag.MinLevel(minLevel),
×
269
                tag.MaxLevel(maxReadLevel))
×
270

×
271
        updateShardAckLevel, failoverQueueProcessor := newTransferQueueFailoverProcessor(
×
272
                t.shard,
×
273
                t.historyEngine,
×
274
                t.taskProcessor,
×
275
                t.taskAllocator,
×
276
                t.activeTaskExecutor,
×
277
                t.logger,
×
278
                minLevel,
×
279
                maxReadLevel,
×
280
                domainIDs,
×
281
                standbyClusterName,
×
282
        )
×
283

×
284
        // NOTE: READ REF BEFORE MODIFICATION
×
285
        // ref: historyEngine.go registerDomainFailoverCallback function
×
286
        err = updateShardAckLevel(newTransferTaskKey(minLevel))
×
287
        if err != nil {
×
288
                t.logger.Error("Error update shard ack level", tag.Error(err))
×
289
        }
×
290
        failoverQueueProcessor.Start()
×
291
}
292

293
func (t *transferQueueProcessor) HandleAction(
294
        ctx context.Context,
295
        clusterName string,
296
        action *Action,
297
) (*ActionResult, error) {
240✔
298
        var resultNotificationCh chan actionResultNotification
240✔
299
        var added bool
240✔
300
        if clusterName == t.currentClusterName {
392✔
301
                resultNotificationCh, added = t.activeQueueProcessor.addAction(ctx, action)
152✔
302
        } else {
242✔
303
                found := false
90✔
304
                for standbyClusterName, standbyProcessor := range t.standbyQueueProcessors {
180✔
305
                        if clusterName == standbyClusterName {
180✔
306
                                resultNotificationCh, added = standbyProcessor.addAction(ctx, action)
90✔
307
                                found = true
90✔
308
                                break
90✔
309
                        }
310
                }
311

312
                if !found {
90✔
313
                        return nil, fmt.Errorf("unknown cluster name: %v", clusterName)
×
314
                }
×
315
        }
316

317
        if !added {
303✔
318
                if ctxErr := ctx.Err(); ctxErr != nil {
63✔
319
                        return nil, ctxErr
×
320
                }
×
321
                return nil, errProcessorShutdown
63✔
322
        }
323

324
        select {
179✔
325
        case resultNotification := <-resultNotificationCh:
178✔
326
                return resultNotification.result, resultNotification.err
178✔
327
        case <-t.shutdownChan:
1✔
328
                return nil, errProcessorShutdown
1✔
329
        case <-ctx.Done():
×
330
                return nil, ctx.Err()
×
331
        }
332
}
333

334
func (t *transferQueueProcessor) LockTaskProcessing() {
716✔
335
        t.taskAllocator.Lock()
716✔
336
}
716✔
337

338
func (t *transferQueueProcessor) UnlockTaskProcessing() {
716✔
339
        t.taskAllocator.Unlock()
716✔
340
}
716✔
341

342
func (t *transferQueueProcessor) drain() {
64✔
343
        // before shutdown, make sure the ack level is up to date
64✔
344
        if err := t.completeTransfer(); err != nil {
128✔
345
                t.logger.Error("Failed to complete transfer task during shutdown", tag.Error(err))
64✔
346
        }
64✔
347
}
348

349
func (t *transferQueueProcessor) completeTransferLoop() {
64✔
350
        defer t.shutdownWG.Done()
64✔
351

64✔
352
        completeTimer := time.NewTimer(t.config.TransferProcessorCompleteTransferInterval())
64✔
353
        defer completeTimer.Stop()
64✔
354

64✔
355
        for {
216✔
356
                select {
152✔
357
                case <-t.shutdownChan:
64✔
358
                        t.drain()
64✔
359
                        return
64✔
360
                case <-completeTimer.C:
90✔
361
                        for attempt := 0; attempt < t.config.TransferProcessorCompleteTransferFailureRetryCount(); attempt++ {
180✔
362
                                err := t.completeTransfer()
90✔
363
                                if err == nil {
180✔
364
                                        break
90✔
365
                                }
366

367
                                t.logger.Error("Failed to complete transfer task", tag.Error(err))
×
368
                                if err == shard.ErrShardClosed {
×
369
                                        // shard closed, trigger shutdown and bail out
×
370
                                        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
×
371
                                                go t.Stop()
×
372
                                                return
×
373
                                        }
×
374

375
                                        t.Stop()
×
376
                                        return
×
377
                                }
378

379
                                select {
×
380
                                case <-t.shutdownChan:
×
381
                                        t.drain()
×
382
                                        return
×
383
                                case <-time.After(time.Duration(attempt*100) * time.Millisecond):
×
384
                                        // do nothing. retry loop will continue
385
                                }
386
                        }
387

388
                        completeTimer.Reset(t.config.TransferProcessorCompleteTransferInterval())
90✔
389
                }
390
        }
391
}
392

393
func (t *transferQueueProcessor) completeTransfer() error {
152✔
394
        newAckLevel := maximumTransferTaskKey
152✔
395
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
152✔
396
        if err != nil {
216✔
397
                return err
64✔
398
        }
64✔
399
        for _, queueState := range actionResult.GetStateActionResult.States {
180✔
400
                newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
90✔
401
        }
90✔
402

403
        for standbyClusterName := range t.standbyQueueProcessors {
180✔
404
                actionResult, err := t.HandleAction(context.Background(), standbyClusterName, NewGetStateAction())
90✔
405
                if err != nil {
90✔
UNCOV
406
                        return err
×
UNCOV
407
                }
×
408
                for _, queueState := range actionResult.GetStateActionResult.States {
180✔
409
                        newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
90✔
410
                }
90✔
411
        }
412

413
        for _, failoverInfo := range t.shard.GetAllTransferFailoverLevels() {
90✔
414
                failoverLevel := newTransferTaskKey(failoverInfo.MinLevel)
×
415
                if newAckLevel == nil {
×
416
                        newAckLevel = failoverLevel
×
417
                } else {
×
418
                        newAckLevel = minTaskKey(newAckLevel, failoverLevel)
×
419
                }
×
420
        }
421

422
        if newAckLevel == nil {
90✔
423
                panic("Unable to get transfer queue processor ack level")
×
424
        }
425

426
        newAckLevelTaskID := newAckLevel.(transferTaskKey).taskID
90✔
427
        t.logger.Debug(fmt.Sprintf("Start completing transfer task from: %v, to %v.", t.ackLevel, newAckLevelTaskID))
90✔
428
        if t.ackLevel >= newAckLevelTaskID {
102✔
429
                return nil
12✔
430
        }
12✔
431

432
        t.metricsClient.Scope(metrics.TransferQueueProcessorScope).
79✔
433
                Tagged(metrics.ShardIDTag(t.shard.GetShardID())).
79✔
434
                IncCounter(metrics.TaskBatchCompleteCounter)
79✔
435

79✔
436
        for {
158✔
437
                pageSize := t.config.TransferTaskDeleteBatchSize()
79✔
438
                resp, err := t.shard.GetExecutionManager().RangeCompleteTransferTask(context.Background(), &persistence.RangeCompleteTransferTaskRequest{
79✔
439
                        ExclusiveBeginTaskID: t.ackLevel,
79✔
440
                        InclusiveEndTaskID:   newAckLevelTaskID,
79✔
441
                        PageSize:             pageSize, // pageSize may or may not be honored
79✔
442
                })
79✔
443
                if err != nil {
79✔
444
                        return err
×
445
                }
×
446
                if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, pageSize) {
158✔
447
                        break
79✔
448
                }
449
        }
450

451
        t.ackLevel = newAckLevelTaskID
79✔
452

79✔
453
        return t.shard.UpdateTransferAckLevel(newAckLevelTaskID)
79✔
454
}
455

456
func newTransferQueueActiveProcessor(
457
        shard shard.Context,
458
        historyEngine engine.Engine,
459
        taskProcessor task.Processor,
460
        taskAllocator TaskAllocator,
461
        taskExecutor task.Executor,
462
        logger log.Logger,
463
) *transferQueueProcessorBase {
64✔
464
        config := shard.GetConfig()
64✔
465
        options := newTransferQueueProcessorOptions(config, true, false)
64✔
466

64✔
467
        currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
64✔
468
        logger = logger.WithTags(tag.ClusterName(currentClusterName))
64✔
469

64✔
470
        taskFilter := func(taskInfo task.Info) (bool, error) {
3,051✔
471
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
2,987✔
472
                if !ok {
2,987✔
473
                        return false, errUnexpectedQueueTask
×
474
                }
×
475
                if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil {
2,987✔
476
                        logger.Info("Domain is not in registered status, skip task in active transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
477
                        return false, nil
×
478
                }
×
479
                return taskAllocator.VerifyActiveTask(task.DomainID, task)
2,987✔
480
        }
481

482
        updateMaxReadLevel := func() task.Key {
2,541✔
483
                return newTransferTaskKey(shard.GetTransferMaxReadLevel())
2,477✔
484
        }
2,477✔
485

486
        updateClusterAckLevel := func(ackLevel task.Key) error {
64✔
487
                taskID := ackLevel.(transferTaskKey).taskID
×
488
                return shard.UpdateTransferClusterAckLevel(currentClusterName, taskID)
×
489
        }
×
490

491
        updateProcessingQueueStates := func(states []ProcessingQueueState) error {
272✔
492
                pStates := convertToPersistenceTransferProcessingQueueStates(states)
208✔
493
                return shard.UpdateTransferProcessingQueueStates(currentClusterName, pStates)
208✔
494
        }
208✔
495

496
        queueShutdown := func() error {
64✔
497
                return nil
×
498
        }
×
499

500
        return newTransferQueueProcessorBase(
64✔
501
                shard,
64✔
502
                loadTransferProcessingQueueStates(currentClusterName, shard, options, logger),
64✔
503
                taskProcessor,
64✔
504
                options,
64✔
505
                updateMaxReadLevel,
64✔
506
                updateClusterAckLevel,
64✔
507
                updateProcessingQueueStates,
64✔
508
                queueShutdown,
64✔
509
                taskFilter,
64✔
510
                taskExecutor,
64✔
511
                logger,
64✔
512
                shard.GetMetricsClient(),
64✔
513
        )
64✔
514
}
515

516
func newTransferQueueStandbyProcessor(
517
        clusterName string,
518
        shard shard.Context,
519
        historyEngine engine.Engine,
520
        taskProcessor task.Processor,
521
        taskAllocator TaskAllocator,
522
        taskExecutor task.Executor,
523
        logger log.Logger,
524
) *transferQueueProcessorBase {
64✔
525
        config := shard.GetConfig()
64✔
526
        options := newTransferQueueProcessorOptions(config, false, false)
64✔
527

64✔
528
        logger = logger.WithTags(tag.ClusterName(clusterName))
64✔
529

64✔
530
        taskFilter := func(taskInfo task.Info) (bool, error) {
2,507✔
531
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
2,443✔
532
                if !ok {
2,443✔
533
                        return false, errUnexpectedQueueTask
×
534
                }
×
535
                if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil {
2,443✔
536
                        logger.Info("Domain is not in registered status, skip task in standby transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
537
                        return false, nil
×
538
                }
×
539
                if task.TaskType == persistence.TransferTaskTypeCloseExecution ||
2,443✔
540
                        task.TaskType == persistence.TransferTaskTypeRecordWorkflowClosed {
2,862✔
541
                        domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID)
419✔
542
                        if err == nil {
838✔
543
                                if domainEntry.HasReplicationCluster(clusterName) {
422✔
544
                                        // guarantee the processing of workflow execution close
3✔
545
                                        return true, nil
3✔
546
                                }
3✔
547
                        } else {
×
548
                                if _, ok := err.(*types.EntityNotExistsError); !ok {
×
549
                                        // retry the task if failed to find the domain
×
550
                                        logger.Warn("Cannot find domain", tag.WorkflowDomainID(task.DomainID))
×
551
                                        return false, err
×
552
                                }
×
553
                                logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(task.DomainID), tag.Value(task))
×
554
                                return false, nil
×
555
                        }
556
                }
557
                return taskAllocator.VerifyStandbyTask(clusterName, task.DomainID, task)
2,443✔
558
        }
559

560
        updateMaxReadLevel := func() task.Key {
326✔
561
                return newTransferTaskKey(shard.GetTransferMaxReadLevel())
262✔
562
        }
262✔
563

564
        updateClusterAckLevel := func(ackLevel task.Key) error {
64✔
565
                taskID := ackLevel.(transferTaskKey).taskID
×
566
                return shard.UpdateTransferClusterAckLevel(clusterName, taskID)
×
567
        }
×
568

569
        updateProcessingQueueStates := func(states []ProcessingQueueState) error {
273✔
570
                pStates := convertToPersistenceTransferProcessingQueueStates(states)
209✔
571
                return shard.UpdateTransferProcessingQueueStates(clusterName, pStates)
209✔
572
        }
209✔
573

574
        queueShutdown := func() error {
64✔
575
                return nil
×
576
        }
×
577

578
        return newTransferQueueProcessorBase(
64✔
579
                shard,
64✔
580
                loadTransferProcessingQueueStates(clusterName, shard, options, logger),
64✔
581
                taskProcessor,
64✔
582
                options,
64✔
583
                updateMaxReadLevel,
64✔
584
                updateClusterAckLevel,
64✔
585
                updateProcessingQueueStates,
64✔
586
                queueShutdown,
64✔
587
                taskFilter,
64✔
588
                taskExecutor,
64✔
589
                logger,
64✔
590
                shard.GetMetricsClient(),
64✔
591
        )
64✔
592
}
593

594
func newTransferQueueFailoverProcessor(
595
        shardContext shard.Context,
596
        historyEngine engine.Engine,
597
        taskProcessor task.Processor,
598
        taskAllocator TaskAllocator,
599
        taskExecutor task.Executor,
600
        logger log.Logger,
601
        minLevel, maxLevel int64,
602
        domainIDs map[string]struct{},
603
        standbyClusterName string,
604
) (updateClusterAckLevelFn, *transferQueueProcessorBase) {
×
605
        config := shardContext.GetConfig()
×
606
        options := newTransferQueueProcessorOptions(config, true, true)
×
607

×
608
        currentClusterName := shardContext.GetService().GetClusterMetadata().GetCurrentClusterName()
×
609
        failoverUUID := uuid.New()
×
610
        logger = logger.WithTags(
×
611
                tag.ClusterName(currentClusterName),
×
612
                tag.WorkflowDomainIDs(domainIDs),
×
613
                tag.FailoverMsg("from: "+standbyClusterName),
×
614
        )
×
615

×
616
        taskFilter := func(taskInfo task.Info) (bool, error) {
×
617
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
×
618
                if !ok {
×
619
                        return false, errUnexpectedQueueTask
×
620
                }
×
621
                if notRegistered, err := isDomainNotRegistered(shardContext, task.DomainID); notRegistered && err == nil {
×
622
                        logger.Info("Domain is not in registered status, skip task in failover transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
623
                        return false, nil
×
624
                }
×
625
                return taskAllocator.VerifyFailoverActiveTask(domainIDs, task.DomainID, task)
×
626
        }
627

628
        maxReadLevelTaskKey := newTransferTaskKey(maxLevel)
×
629
        updateMaxReadLevel := func() task.Key {
×
630
                return maxReadLevelTaskKey // this is a const
×
631
        }
×
632

633
        updateClusterAckLevel := func(ackLevel task.Key) error {
×
634
                taskID := ackLevel.(transferTaskKey).taskID
×
635
                return shardContext.UpdateTransferFailoverLevel(
×
636
                        failoverUUID,
×
637
                        shard.TransferFailoverLevel{
×
638
                                StartTime:    shardContext.GetTimeSource().Now(),
×
639
                                MinLevel:     minLevel,
×
640
                                CurrentLevel: taskID,
×
641
                                MaxLevel:     maxLevel,
×
642
                                DomainIDs:    domainIDs,
×
643
                        },
×
644
                )
×
645
        }
×
646

647
        queueShutdown := func() error {
×
648
                return shardContext.DeleteTransferFailoverLevel(failoverUUID)
×
649
        }
×
650

651
        processingQueueStates := []ProcessingQueueState{
×
652
                NewProcessingQueueState(
×
653
                        defaultProcessingQueueLevel,
×
654
                        newTransferTaskKey(minLevel),
×
655
                        maxReadLevelTaskKey,
×
656
                        NewDomainFilter(domainIDs, false),
×
657
                ),
×
658
        }
×
659

×
660
        return updateClusterAckLevel, newTransferQueueProcessorBase(
×
661
                shardContext,
×
662
                processingQueueStates,
×
663
                taskProcessor,
×
664
                options,
×
665
                updateMaxReadLevel,
×
666
                updateClusterAckLevel,
×
667
                nil,
×
668
                queueShutdown,
×
669
                taskFilter,
×
670
                taskExecutor,
×
671
                logger,
×
672
                shardContext.GetMetricsClient(),
×
673
        )
×
674
}
675

676
func loadTransferProcessingQueueStates(
677
        clusterName string,
678
        shard shard.Context,
679
        options *queueProcessorOptions,
680
        logger log.Logger,
681
) []ProcessingQueueState {
125✔
682
        ackLevel := shard.GetTransferClusterAckLevel(clusterName)
125✔
683
        if options.EnableLoadQueueStates() {
250✔
684
                pStates := shard.GetTransferProcessingQueueStates(clusterName)
125✔
685
                if validateProcessingQueueStates(pStates, ackLevel) {
250✔
686
                        return convertFromPersistenceTransferProcessingQueueStates(pStates)
125✔
687
                }
125✔
688

689
                logger.Error("Incompatible processing queue states and ackLevel",
×
690
                        tag.Value(pStates),
×
691
                        tag.ShardTransferAcks(ackLevel),
×
692
                )
×
693
        }
694

695
        // LoadQueueStates is disabled or sanity check failed
696
        // fallback to use ackLevel
697
        return []ProcessingQueueState{
×
698
                NewProcessingQueueState(
×
699
                        defaultProcessingQueueLevel,
×
700
                        newTransferTaskKey(ackLevel),
×
701
                        maximumTransferTaskKey,
×
702
                        NewDomainFilter(nil, true),
×
703
                ),
×
704
        }
×
705
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc