• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018f64f4-c8b4-4dac-8ed7-996896dd65a4

11 May 2024 12:01AM UTC coverage: 69.148% (-0.01%) from 69.162%
018f64f4-c8b4-4dac-8ed7-996896dd65a4

push

buildkite

web-flow
Write tests for replication task processor main loop (#6010)

1 of 1 new or added line in 1 file covered. (100.0%)

229 existing lines in 18 files now uncovered.

101597 of 146926 relevant lines covered (69.15%)

2644.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/service/history/queue/timer_queue_failover_processor.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "time"
25

26
        "github.com/pborman/uuid"
27

28
        "github.com/uber/cadence/common/log"
29
        "github.com/uber/cadence/common/log/tag"
30
        "github.com/uber/cadence/common/persistence"
31
        "github.com/uber/cadence/service/history/shard"
32
        "github.com/uber/cadence/service/history/task"
33
)
34

35
func newTimerQueueFailoverProcessor(
36
        standbyClusterName string,
37
        shardContext shard.Context,
38
        taskProcessor task.Processor,
39
        taskAllocator TaskAllocator,
40
        taskExecutor task.Executor,
41
        logger log.Logger,
42
        minLevel, maxLevel time.Time,
43
        domainIDs map[string]struct{},
UNCOV
44
) (updateClusterAckLevelFn, *timerQueueProcessorBase) {
×
UNCOV
45
        config := shardContext.GetConfig()
×
46
        options := newTimerQueueProcessorOptions(config, true, true)
×
47

×
48
        currentClusterName := shardContext.GetService().GetClusterMetadata().GetCurrentClusterName()
×
49
        failoverStartTime := shardContext.GetTimeSource().Now()
×
50
        failoverUUID := uuid.New()
×
51
        logger = logger.WithTags(
×
52
                tag.ClusterName(currentClusterName),
×
53
                tag.WorkflowDomainIDs(domainIDs),
×
54
                tag.FailoverMsg("from: "+standbyClusterName),
×
55
        )
×
56

×
57
        taskFilter := func(taskInfo task.Info) (bool, error) {
×
58
                timer, ok := taskInfo.(*persistence.TimerTaskInfo)
×
59
                if !ok {
×
60
                        return false, errUnexpectedQueueTask
×
61
                }
×
62
                if notRegistered, err := isDomainNotRegistered(shardContext, timer.DomainID); notRegistered && err == nil {
×
63
                        logger.Info("Domain is not in registered status, skip task in failover timer queue.", tag.WorkflowDomainID(timer.DomainID), tag.Value(taskInfo))
×
64
                        return false, nil
×
65
                }
×
66
                return taskAllocator.VerifyFailoverActiveTask(domainIDs, timer.DomainID, timer)
×
67
        }
68

UNCOV
69
        maxReadLevelTaskKey := newTimerTaskKey(maxLevel, 0)
×
UNCOV
70
        updateMaxReadLevel := func() task.Key {
×
71
                return maxReadLevelTaskKey // this is a const
×
72
        }
×
73

74
        updateClusterAckLevel := func(ackLevel task.Key) error {
×
UNCOV
75
                return shardContext.UpdateTimerFailoverLevel(
×
76
                        failoverUUID,
×
77
                        shard.TimerFailoverLevel{
×
78
                                StartTime:    failoverStartTime,
×
79
                                MinLevel:     minLevel,
×
80
                                CurrentLevel: ackLevel.(timerTaskKey).visibilityTimestamp,
×
81
                                MaxLevel:     maxLevel,
×
82
                                DomainIDs:    domainIDs,
×
83
                        },
×
84
                )
×
85
        }
×
86

87
        queueShutdown := func() error {
×
UNCOV
88
                return shardContext.DeleteTimerFailoverLevel(failoverUUID)
×
89
        }
×
90

91
        processingQueueStates := []ProcessingQueueState{
×
UNCOV
92
                NewProcessingQueueState(
×
93
                        defaultProcessingQueueLevel,
×
94
                        newTimerTaskKey(minLevel, 0),
×
95
                        maxReadLevelTaskKey,
×
96
                        NewDomainFilter(domainIDs, false),
×
97
                ),
×
98
        }
×
99

×
100
        return updateClusterAckLevel, newTimerQueueProcessorBase(
×
101
                currentClusterName, // should use current cluster's time when doing domain failover
×
102
                shardContext,
×
103
                processingQueueStates,
×
104
                taskProcessor,
×
105
                NewLocalTimerGate(shardContext.GetTimeSource()),
×
106
                options,
×
107
                updateMaxReadLevel,
×
108
                updateClusterAckLevel,
×
109
                nil,
×
110
                queueShutdown,
×
111
                taskFilter,
×
112
                taskExecutor,
×
113
                logger,
×
114
                shardContext.GetMetricsClient(),
×
115
        )
×
116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc