• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0190508e-3f7e-4c0d-b546-4eaa2a2045f1

25 Jun 2024 06:00PM UTC coverage: 71.543% (-0.006%) from 71.549%
0190508e-3f7e-4c0d-b546-4eaa2a2045f1

push

buildkite

web-flow
[CLI] add new valid search attribute key cases for context header indexing in visibility (#6144)

What changed?

Allow "." and "-" in search attribute keys.

Why?

This is needed for indexing context header in search attributes

107086 of 149680 relevant lines covered (71.54%)

2615.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.49
/service/history/queue/transfer_queue_processor.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "math"
28
        "sync"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/pborman/uuid"
33

34
        "github.com/uber/cadence/common"
35
        "github.com/uber/cadence/common/dynamicconfig"
36
        "github.com/uber/cadence/common/log"
37
        "github.com/uber/cadence/common/log/tag"
38
        "github.com/uber/cadence/common/metrics"
39
        "github.com/uber/cadence/common/ndc"
40
        "github.com/uber/cadence/common/persistence"
41
        "github.com/uber/cadence/common/reconciliation/invariant"
42
        "github.com/uber/cadence/common/types"
43
        hcommon "github.com/uber/cadence/service/history/common"
44
        "github.com/uber/cadence/service/history/config"
45
        "github.com/uber/cadence/service/history/engine"
46
        "github.com/uber/cadence/service/history/execution"
47
        "github.com/uber/cadence/service/history/reset"
48
        "github.com/uber/cadence/service/history/shard"
49
        "github.com/uber/cadence/service/history/task"
50
        "github.com/uber/cadence/service/history/workflowcache"
51
        "github.com/uber/cadence/service/worker/archiver"
52
)
53

54
var (
55
        errUnexpectedQueueTask = errors.New("unexpected queue task")
56
        errProcessorShutdown   = errors.New("queue processor has been shutdown")
57

58
        maximumTransferTaskKey = newTransferTaskKey(math.MaxInt64)
59
)
60

61
type transferQueueProcessor struct {
62
        shard         shard.Context
63
        historyEngine engine.Engine
64
        taskProcessor task.Processor
65

66
        config             *config.Config
67
        currentClusterName string
68

69
        metricsClient metrics.Client
70
        logger        log.Logger
71

72
        status       int32
73
        shutdownChan chan struct{}
74
        shutdownWG   sync.WaitGroup
75

76
        ackLevel                int64
77
        taskAllocator           TaskAllocator
78
        activeTaskExecutor      task.Executor
79
        activeQueueProcessor    *transferQueueProcessorBase
80
        standbyQueueProcessors  map[string]*transferQueueProcessorBase
81
        failoverQueueProcessors []*transferQueueProcessorBase
82
}
83

84
// NewTransferQueueProcessor creates a new transfer QueueProcessor
85
func NewTransferQueueProcessor(
86
        shard shard.Context,
87
        historyEngine engine.Engine,
88
        taskProcessor task.Processor,
89
        executionCache execution.Cache,
90
        workflowResetter reset.WorkflowResetter,
91
        archivalClient archiver.Client,
92
        executionCheck invariant.Invariant,
93
        wfIDCache workflowcache.WFCache,
94
        ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter,
95
) Processor {
77✔
96
        logger := shard.GetLogger().WithTags(tag.ComponentTransferQueue)
77✔
97
        currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
77✔
98
        config := shard.GetConfig()
77✔
99
        taskAllocator := NewTaskAllocator(shard)
77✔
100

77✔
101
        activeTaskExecutor := task.NewTransferActiveTaskExecutor(
77✔
102
                shard,
77✔
103
                archivalClient,
77✔
104
                executionCache,
77✔
105
                workflowResetter,
77✔
106
                logger,
77✔
107
                config,
77✔
108
                wfIDCache,
77✔
109
                ratelimitInternalPerWorkflowID,
77✔
110
        )
77✔
111

77✔
112
        activeQueueProcessor := newTransferQueueActiveProcessor(
77✔
113
                shard,
77✔
114
                taskProcessor,
77✔
115
                taskAllocator,
77✔
116
                activeTaskExecutor,
77✔
117
                logger,
77✔
118
        )
77✔
119

77✔
120
        standbyQueueProcessors := make(map[string]*transferQueueProcessorBase)
77✔
121
        for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() {
154✔
122
                historyResender := ndc.NewHistoryResender(
77✔
123
                        shard.GetDomainCache(),
77✔
124
                        shard.GetService().GetClientBean().GetRemoteAdminClient(clusterName),
77✔
125
                        func(ctx context.Context, request *types.ReplicateEventsV2Request) error {
77✔
126
                                return historyEngine.ReplicateEventsV2(ctx, request)
×
127
                        },
×
128
                        config.StandbyTaskReReplicationContextTimeout,
129
                        executionCheck,
130
                        shard.GetLogger(),
131
                )
132
                standbyTaskExecutor := task.NewTransferStandbyTaskExecutor(
77✔
133
                        shard,
77✔
134
                        archivalClient,
77✔
135
                        executionCache,
77✔
136
                        historyResender,
77✔
137
                        logger,
77✔
138
                        clusterName,
77✔
139
                        config,
77✔
140
                )
77✔
141
                standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor(
77✔
142
                        clusterName,
77✔
143
                        shard,
77✔
144
                        taskProcessor,
77✔
145
                        taskAllocator,
77✔
146
                        standbyTaskExecutor,
77✔
147
                        logger,
77✔
148
                )
77✔
149
        }
150

151
        return &transferQueueProcessor{
77✔
152
                shard:                  shard,
77✔
153
                historyEngine:          historyEngine,
77✔
154
                taskProcessor:          taskProcessor,
77✔
155
                config:                 config,
77✔
156
                currentClusterName:     currentClusterName,
77✔
157
                metricsClient:          shard.GetMetricsClient(),
77✔
158
                logger:                 logger,
77✔
159
                status:                 common.DaemonStatusInitialized,
77✔
160
                shutdownChan:           make(chan struct{}),
77✔
161
                ackLevel:               shard.GetTransferAckLevel(),
77✔
162
                taskAllocator:          taskAllocator,
77✔
163
                activeTaskExecutor:     activeTaskExecutor,
77✔
164
                activeQueueProcessor:   activeQueueProcessor,
77✔
165
                standbyQueueProcessors: standbyQueueProcessors,
77✔
166
        }
77✔
167
}
168

169
func (t *transferQueueProcessor) Start() {
76✔
170
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
76✔
171
                return
×
172
        }
×
173

174
        t.activeQueueProcessor.Start()
76✔
175
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
152✔
176
                standbyQueueProcessor.Start()
76✔
177
        }
76✔
178

179
        t.shutdownWG.Add(1)
76✔
180
        go t.completeTransferLoop()
76✔
181
}
182

183
func (t *transferQueueProcessor) Stop() {
76✔
184
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
76✔
185
                return
×
186
        }
×
187

188
        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
151✔
189
                t.activeQueueProcessor.Stop()
75✔
190
                for _, standbyQueueProcessor := range t.standbyQueueProcessors {
150✔
191
                        standbyQueueProcessor.Stop()
75✔
192
                }
75✔
193

194
                close(t.shutdownChan)
75✔
195
                common.AwaitWaitGroup(&t.shutdownWG, time.Minute)
75✔
196
                return
75✔
197
        }
198

199
        // close the shutdown channel so processor pump goroutine drains tasks and then stop the processors
200
        close(t.shutdownChan)
1✔
201
        if !common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout) {
1✔
202
                t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout)
×
203
        }
×
204
        t.activeQueueProcessor.Stop()
1✔
205
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
2✔
206
                standbyQueueProcessor.Stop()
1✔
207
        }
1✔
208

209
        if len(t.failoverQueueProcessors) > 0 {
1✔
210
                t.logger.Info("Shutting down failover transfer queues", tag.Counter(len(t.failoverQueueProcessors)))
×
211
                for _, failoverQueueProcessor := range t.failoverQueueProcessors {
×
212
                        failoverQueueProcessor.Stop()
×
213
                }
×
214
        }
215
}
216

217
func (t *transferQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
2,296✔
218
        if len(info.Tasks) == 0 {
2,296✔
219
                return
×
220
        }
×
221

222
        if clusterName == t.currentClusterName {
4,592✔
223
                t.activeQueueProcessor.notifyNewTask(info)
2,296✔
224
                return
2,296✔
225
        }
2,296✔
226

227
        standbyQueueProcessor, ok := t.standbyQueueProcessors[clusterName]
3✔
228
        if !ok {
3✔
229
                panic(fmt.Sprintf("Cannot find transfer processor for %s.", clusterName))
×
230
        }
231
        standbyQueueProcessor.notifyNewTask(info)
3✔
232
}
233

234
func (t *transferQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) {
×
235
        // Failover queue is used to scan all inflight tasks, if queue processor is not
×
236
        // started, there's no inflight task and we don't need to create a failover processor.
×
237
        // Also the HandleAction will be blocked if queue processor processing loop is not running.
×
238
        if atomic.LoadInt32(&t.status) != common.DaemonStatusStarted {
×
239
                return
×
240
        }
×
241

242
        minLevel := t.shard.GetTransferClusterAckLevel(t.currentClusterName)
×
243
        standbyClusterName := t.currentClusterName
×
244
        for clusterName := range t.shard.GetClusterMetadata().GetEnabledClusterInfo() {
×
245
                ackLevel := t.shard.GetTransferClusterAckLevel(clusterName)
×
246
                if ackLevel < minLevel {
×
247
                        minLevel = ackLevel
×
248
                        standbyClusterName = clusterName
×
249
                }
×
250
        }
251

252
        maxReadLevel := int64(0)
×
253
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
×
254
        if err != nil {
×
255
                t.logger.Error("Transfer Failover Failed", tag.WorkflowDomainIDs(domainIDs), tag.Error(err))
×
256
                if err == errProcessorShutdown {
×
257
                        // processor/shard already shutdown, we don't need to create failover queue processor
×
258
                        return
×
259
                }
×
260
                // other errors should never be returned for GetStateAction
261
                panic(fmt.Sprintf("unknown error for GetStateAction: %v", err))
×
262
        }
263
        for _, queueState := range actionResult.GetStateActionResult.States {
×
264
                queueReadLevel := queueState.ReadLevel().(transferTaskKey).taskID
×
265
                if maxReadLevel < queueReadLevel {
×
266
                        maxReadLevel = queueReadLevel
×
267
                }
×
268
        }
269
        // maxReadLevel is exclusive, so add 1
270
        maxReadLevel++
×
271

×
272
        t.logger.Info("Transfer Failover Triggered",
×
273
                tag.WorkflowDomainIDs(domainIDs),
×
274
                tag.MinLevel(minLevel),
×
275
                tag.MaxLevel(maxReadLevel))
×
276

×
277
        updateShardAckLevel, failoverQueueProcessor := newTransferQueueFailoverProcessor(
×
278
                t.shard,
×
279
                t.taskProcessor,
×
280
                t.taskAllocator,
×
281
                t.activeTaskExecutor,
×
282
                t.logger,
×
283
                minLevel,
×
284
                maxReadLevel,
×
285
                domainIDs,
×
286
                standbyClusterName,
×
287
        )
×
288

×
289
        // NOTE: READ REF BEFORE MODIFICATION
×
290
        // ref: historyEngine.go registerDomainFailoverCallback function
×
291
        err = updateShardAckLevel(newTransferTaskKey(minLevel))
×
292
        if err != nil {
×
293
                t.logger.Error("Error update shard ack level", tag.Error(err))
×
294
        }
×
295

296
        // Failover queue processors are started on the fly when domains are failed over.
297
        // Failover queue processors will be stopped when the transfer queue instance is stopped (due to restart or shard movement).
298
        // This means the failover queue processor might not finish its job.
299
        // There is no mechanism to re-start ongoing failover queue processors in the new shard owner.
300
        t.failoverQueueProcessors = append(t.failoverQueueProcessors, failoverQueueProcessor)
×
301
        failoverQueueProcessor.Start()
×
302
}
303

304
func (t *transferQueueProcessor) HandleAction(
305
        ctx context.Context,
306
        clusterName string,
307
        action *Action,
308
) (*ActionResult, error) {
292✔
309
        var resultNotificationCh chan actionResultNotification
292✔
310
        var added bool
292✔
311
        if clusterName == t.currentClusterName {
476✔
312
                resultNotificationCh, added = t.activeQueueProcessor.addAction(ctx, action)
184✔
313
        } else {
293✔
314
                found := false
109✔
315
                for standbyClusterName, standbyProcessor := range t.standbyQueueProcessors {
218✔
316
                        if clusterName == standbyClusterName {
218✔
317
                                resultNotificationCh, added = standbyProcessor.addAction(ctx, action)
109✔
318
                                found = true
109✔
319
                                break
109✔
320
                        }
321
                }
322

323
                if !found {
109✔
324
                        return nil, fmt.Errorf("unknown cluster name: %v", clusterName)
×
325
                }
×
326
        }
327

328
        if !added {
367✔
329
                if ctxErr := ctx.Err(); ctxErr != nil {
75✔
330
                        return nil, ctxErr
×
331
                }
×
332
                return nil, errProcessorShutdown
75✔
333
        }
334

335
        select {
218✔
336
        case resultNotification := <-resultNotificationCh:
217✔
337
                return resultNotification.result, resultNotification.err
217✔
338
        case <-t.shutdownChan:
1✔
339
                return nil, errProcessorShutdown
1✔
340
        case <-ctx.Done():
×
341
                return nil, ctx.Err()
×
342
        }
343
}
344

345
func (t *transferQueueProcessor) LockTaskProcessing() {
804✔
346
        t.taskAllocator.Lock()
804✔
347
}
804✔
348

349
func (t *transferQueueProcessor) UnlockTaskProcessing() {
804✔
350
        t.taskAllocator.Unlock()
804✔
351
}
804✔
352

353
func (t *transferQueueProcessor) drain() {
76✔
354
        // before shutdown, make sure the ack level is up to date
76✔
355
        if err := t.completeTransfer(); err != nil {
152✔
356
                t.logger.Error("Failed to complete transfer task during shutdown", tag.Error(err))
76✔
357
        }
76✔
358
}
359

360
func (t *transferQueueProcessor) completeTransferLoop() {
76✔
361
        defer t.shutdownWG.Done()
76✔
362

76✔
363
        completeTimer := time.NewTimer(t.config.TransferProcessorCompleteTransferInterval())
76✔
364
        defer completeTimer.Stop()
76✔
365

76✔
366
        for {
260✔
367
                select {
184✔
368
                case <-t.shutdownChan:
76✔
369
                        t.drain()
76✔
370
                        return
76✔
371
                case <-completeTimer.C:
109✔
372
                        for attempt := 0; attempt < t.config.TransferProcessorCompleteTransferFailureRetryCount(); attempt++ {
218✔
373
                                err := t.completeTransfer()
109✔
374
                                if err == nil {
218✔
375
                                        break
109✔
376
                                }
377

378
                                t.logger.Error("Failed to complete transfer task", tag.Error(err))
×
379
                                var errShardClosed *shard.ErrShardClosed
×
380
                                if errors.As(err, &errShardClosed) {
×
381
                                        // shard closed, trigger shutdown and bail out
×
382
                                        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
×
383
                                                go t.Stop()
×
384
                                                return
×
385
                                        }
×
386

387
                                        t.Stop()
×
388
                                        return
×
389
                                }
390

391
                                select {
×
392
                                case <-t.shutdownChan:
×
393
                                        t.drain()
×
394
                                        return
×
395
                                case <-time.After(time.Duration(attempt*100) * time.Millisecond):
×
396
                                        // do nothing. retry loop will continue
397
                                }
398
                        }
399

400
                        completeTimer.Reset(t.config.TransferProcessorCompleteTransferInterval())
109✔
401
                }
402
        }
403
}
404

405
func (t *transferQueueProcessor) completeTransfer() error {
184✔
406
        newAckLevel := maximumTransferTaskKey
184✔
407
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
184✔
408
        if err != nil {
260✔
409
                return err
76✔
410
        }
76✔
411
        for _, queueState := range actionResult.GetStateActionResult.States {
218✔
412
                newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
109✔
413
        }
109✔
414

415
        for standbyClusterName := range t.standbyQueueProcessors {
218✔
416
                actionResult, err := t.HandleAction(context.Background(), standbyClusterName, NewGetStateAction())
109✔
417
                if err != nil {
109✔
418
                        return err
×
419
                }
×
420
                for _, queueState := range actionResult.GetStateActionResult.States {
218✔
421
                        newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
109✔
422
                }
109✔
423
        }
424

425
        for _, failoverInfo := range t.shard.GetAllTransferFailoverLevels() {
109✔
426
                failoverLevel := newTransferTaskKey(failoverInfo.MinLevel)
×
427
                if newAckLevel == nil {
×
428
                        newAckLevel = failoverLevel
×
429
                } else {
×
430
                        newAckLevel = minTaskKey(newAckLevel, failoverLevel)
×
431
                }
×
432
        }
433

434
        if newAckLevel == nil {
109✔
435
                panic("Unable to get transfer queue processor ack level")
×
436
        }
437

438
        newAckLevelTaskID := newAckLevel.(transferTaskKey).taskID
109✔
439
        t.logger.Debug(fmt.Sprintf("Start completing transfer task from: %v, to %v.", t.ackLevel, newAckLevelTaskID))
109✔
440
        if t.ackLevel >= newAckLevelTaskID {
116✔
441
                return nil
7✔
442
        }
7✔
443

444
        t.metricsClient.Scope(metrics.TransferQueueProcessorScope).
102✔
445
                Tagged(metrics.ShardIDTag(t.shard.GetShardID())).
102✔
446
                IncCounter(metrics.TaskBatchCompleteCounter)
102✔
447

102✔
448
        for {
204✔
449
                pageSize := t.config.TransferTaskDeleteBatchSize()
102✔
450
                resp, err := t.shard.GetExecutionManager().RangeCompleteTransferTask(context.Background(), &persistence.RangeCompleteTransferTaskRequest{
102✔
451
                        ExclusiveBeginTaskID: t.ackLevel,
102✔
452
                        InclusiveEndTaskID:   newAckLevelTaskID,
102✔
453
                        PageSize:             pageSize, // pageSize may or may not be honored
102✔
454
                })
102✔
455
                if err != nil {
102✔
456
                        return err
×
457
                }
×
458
                if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, pageSize) {
204✔
459
                        break
102✔
460
                }
461
        }
462

463
        t.ackLevel = newAckLevelTaskID
102✔
464

102✔
465
        return t.shard.UpdateTransferAckLevel(newAckLevelTaskID)
102✔
466
}
467

468
func newTransferQueueActiveProcessor(
469
        shard shard.Context,
470
        taskProcessor task.Processor,
471
        taskAllocator TaskAllocator,
472
        taskExecutor task.Executor,
473
        logger log.Logger,
474
) *transferQueueProcessorBase {
77✔
475
        config := shard.GetConfig()
77✔
476
        options := newTransferQueueProcessorOptions(config, true, false)
77✔
477

77✔
478
        currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
77✔
479
        logger = logger.WithTags(tag.ClusterName(currentClusterName))
77✔
480

77✔
481
        taskFilter := func(taskInfo task.Info) (bool, error) {
3,235✔
482
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
3,158✔
483
                if !ok {
3,158✔
484
                        return false, errUnexpectedQueueTask
×
485
                }
×
486
                if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil {
3,158✔
487
                        logger.Info("Domain is not in registered status, skip task in active transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
488
                        return false, nil
×
489
                }
×
490
                return taskAllocator.VerifyActiveTask(task.DomainID, task)
3,158✔
491
        }
492

493
        updateMaxReadLevel := func() task.Key {
2,655✔
494
                return newTransferTaskKey(shard.GetTransferMaxReadLevel())
2,578✔
495
        }
2,578✔
496

497
        updateClusterAckLevel := func(ackLevel task.Key) error {
77✔
498
                taskID := ackLevel.(transferTaskKey).taskID
×
499
                return shard.UpdateTransferClusterAckLevel(currentClusterName, taskID)
×
500
        }
×
501

502
        updateProcessingQueueStates := func(states []ProcessingQueueState) error {
304✔
503
                pStates := convertToPersistenceTransferProcessingQueueStates(states)
227✔
504
                return shard.UpdateTransferProcessingQueueStates(currentClusterName, pStates)
227✔
505
        }
227✔
506

507
        queueShutdown := func() error {
77✔
508
                return nil
×
509
        }
×
510

511
        return newTransferQueueProcessorBase(
77✔
512
                shard,
77✔
513
                loadTransferProcessingQueueStates(currentClusterName, shard, options, logger),
77✔
514
                taskProcessor,
77✔
515
                options,
77✔
516
                updateMaxReadLevel,
77✔
517
                updateClusterAckLevel,
77✔
518
                updateProcessingQueueStates,
77✔
519
                queueShutdown,
77✔
520
                taskFilter,
77✔
521
                taskExecutor,
77✔
522
                logger,
77✔
523
                shard.GetMetricsClient(),
77✔
524
        )
77✔
525
}
526

527
func newTransferQueueStandbyProcessor(
528
        clusterName string,
529
        shard shard.Context,
530
        taskProcessor task.Processor,
531
        taskAllocator TaskAllocator,
532
        taskExecutor task.Executor,
533
        logger log.Logger,
534
) *transferQueueProcessorBase {
77✔
535
        config := shard.GetConfig()
77✔
536
        options := newTransferQueueProcessorOptions(config, false, false)
77✔
537

77✔
538
        logger = logger.WithTags(tag.ClusterName(clusterName))
77✔
539

77✔
540
        taskFilter := func(taskInfo task.Info) (bool, error) {
2,767✔
541
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
2,690✔
542
                if !ok {
2,690✔
543
                        return false, errUnexpectedQueueTask
×
544
                }
×
545
                if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil {
2,690✔
546
                        logger.Info("Domain is not in registered status, skip task in standby transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
547
                        return false, nil
×
548
                }
×
549
                if task.TaskType == persistence.TransferTaskTypeCloseExecution ||
2,690✔
550
                        task.TaskType == persistence.TransferTaskTypeRecordWorkflowClosed {
3,134✔
551
                        domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID)
444✔
552
                        if err == nil {
888✔
553
                                if domainEntry.HasReplicationCluster(clusterName) {
447✔
554
                                        // guarantee the processing of workflow execution close
3✔
555
                                        return true, nil
3✔
556
                                }
3✔
557
                        } else {
×
558
                                if _, ok := err.(*types.EntityNotExistsError); !ok {
×
559
                                        // retry the task if failed to find the domain
×
560
                                        logger.Warn("Cannot find domain", tag.WorkflowDomainID(task.DomainID))
×
561
                                        return false, err
×
562
                                }
×
563
                                logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(task.DomainID), tag.Value(task))
×
564
                                return false, nil
×
565
                        }
566
                }
567
                return taskAllocator.VerifyStandbyTask(clusterName, task.DomainID, task)
2,690✔
568
        }
569

570
        updateMaxReadLevel := func() task.Key {
372✔
571
                return newTransferTaskKey(shard.GetTransferMaxReadLevel())
295✔
572
        }
295✔
573

574
        updateClusterAckLevel := func(ackLevel task.Key) error {
77✔
575
                taskID := ackLevel.(transferTaskKey).taskID
×
576
                return shard.UpdateTransferClusterAckLevel(clusterName, taskID)
×
577
        }
×
578

579
        updateProcessingQueueStates := func(states []ProcessingQueueState) error {
306✔
580
                pStates := convertToPersistenceTransferProcessingQueueStates(states)
229✔
581
                return shard.UpdateTransferProcessingQueueStates(clusterName, pStates)
229✔
582
        }
229✔
583

584
        queueShutdown := func() error {
77✔
585
                return nil
×
586
        }
×
587

588
        return newTransferQueueProcessorBase(
77✔
589
                shard,
77✔
590
                loadTransferProcessingQueueStates(clusterName, shard, options, logger),
77✔
591
                taskProcessor,
77✔
592
                options,
77✔
593
                updateMaxReadLevel,
77✔
594
                updateClusterAckLevel,
77✔
595
                updateProcessingQueueStates,
77✔
596
                queueShutdown,
77✔
597
                taskFilter,
77✔
598
                taskExecutor,
77✔
599
                logger,
77✔
600
                shard.GetMetricsClient(),
77✔
601
        )
77✔
602
}
603

604
func newTransferQueueFailoverProcessor(
605
        shardContext shard.Context,
606
        taskProcessor task.Processor,
607
        taskAllocator TaskAllocator,
608
        taskExecutor task.Executor,
609
        logger log.Logger,
610
        minLevel, maxLevel int64,
611
        domainIDs map[string]struct{},
612
        standbyClusterName string,
613
) (updateClusterAckLevelFn, *transferQueueProcessorBase) {
×
614
        config := shardContext.GetConfig()
×
615
        options := newTransferQueueProcessorOptions(config, true, true)
×
616

×
617
        currentClusterName := shardContext.GetService().GetClusterMetadata().GetCurrentClusterName()
×
618
        failoverUUID := uuid.New()
×
619
        logger = logger.WithTags(
×
620
                tag.ClusterName(currentClusterName),
×
621
                tag.WorkflowDomainIDs(domainIDs),
×
622
                tag.FailoverMsg("from: "+standbyClusterName),
×
623
        )
×
624

×
625
        taskFilter := func(taskInfo task.Info) (bool, error) {
×
626
                task, ok := taskInfo.(*persistence.TransferTaskInfo)
×
627
                if !ok {
×
628
                        return false, errUnexpectedQueueTask
×
629
                }
×
630
                if notRegistered, err := isDomainNotRegistered(shardContext, task.DomainID); notRegistered && err == nil {
×
631
                        logger.Info("Domain is not in registered status, skip task in failover transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo))
×
632
                        return false, nil
×
633
                }
×
634
                return taskAllocator.VerifyFailoverActiveTask(domainIDs, task.DomainID, task)
×
635
        }
636

637
        maxReadLevelTaskKey := newTransferTaskKey(maxLevel)
×
638
        updateMaxReadLevel := func() task.Key {
×
639
                return maxReadLevelTaskKey // this is a const
×
640
        }
×
641

642
        updateClusterAckLevel := func(ackLevel task.Key) error {
×
643
                taskID := ackLevel.(transferTaskKey).taskID
×
644
                return shardContext.UpdateTransferFailoverLevel(
×
645
                        failoverUUID,
×
646
                        shard.TransferFailoverLevel{
×
647
                                StartTime:    shardContext.GetTimeSource().Now(),
×
648
                                MinLevel:     minLevel,
×
649
                                CurrentLevel: taskID,
×
650
                                MaxLevel:     maxLevel,
×
651
                                DomainIDs:    domainIDs,
×
652
                        },
×
653
                )
×
654
        }
×
655

656
        queueShutdown := func() error {
×
657
                return shardContext.DeleteTransferFailoverLevel(failoverUUID)
×
658
        }
×
659

660
        processingQueueStates := []ProcessingQueueState{
×
661
                NewProcessingQueueState(
×
662
                        defaultProcessingQueueLevel,
×
663
                        newTransferTaskKey(minLevel),
×
664
                        maxReadLevelTaskKey,
×
665
                        NewDomainFilter(domainIDs, false),
×
666
                ),
×
667
        }
×
668

×
669
        return updateClusterAckLevel, newTransferQueueProcessorBase(
×
670
                shardContext,
×
671
                processingQueueStates,
×
672
                taskProcessor,
×
673
                options,
×
674
                updateMaxReadLevel,
×
675
                updateClusterAckLevel,
×
676
                nil,
×
677
                queueShutdown,
×
678
                taskFilter,
×
679
                taskExecutor,
×
680
                logger,
×
681
                shardContext.GetMetricsClient(),
×
682
        )
×
683
}
684

685
func loadTransferProcessingQueueStates(
686
        clusterName string,
687
        shard shard.Context,
688
        options *queueProcessorOptions,
689
        logger log.Logger,
690
) []ProcessingQueueState {
151✔
691
        ackLevel := shard.GetTransferClusterAckLevel(clusterName)
151✔
692
        if options.EnableLoadQueueStates() {
302✔
693
                pStates := shard.GetTransferProcessingQueueStates(clusterName)
151✔
694
                if validateProcessingQueueStates(pStates, ackLevel) {
302✔
695
                        return convertFromPersistenceTransferProcessingQueueStates(pStates)
151✔
696
                }
151✔
697

698
                logger.Error("Incompatible processing queue states and ackLevel",
×
699
                        tag.Value(pStates),
×
700
                        tag.ShardTransferAcks(ackLevel),
×
701
                )
×
702
        }
703

704
        // LoadQueueStates is disabled or sanity check failed
705
        // fallback to use ackLevel
706
        return []ProcessingQueueState{
×
707
                NewProcessingQueueState(
×
708
                        defaultProcessingQueueLevel,
×
709
                        newTransferTaskKey(ackLevel),
×
710
                        maximumTransferTaskKey,
×
711
                        NewDomainFilter(nil, true),
×
712
                ),
×
713
        }
×
714
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc