• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0188b6ba-6aaf-4739-8b30-52eaead5800c

13 Jun 2023 09:47PM UTC coverage: 57.204% (-0.01%) from 57.217%
0188b6ba-6aaf-4739-8b30-52eaead5800c

push

buildkite

web-flow
Bugfix/isolation groups domain drains (#5315)

What changed?
Fixes several bugs in the domain isolation-group handling which wasn't tested properly in replication it notably:

Fixes the problem of upserting configuration from the inactive region, which was previously would error
Fixes the problem of replication of configuration, which was entirely not working
Refactors the domain controller by splitting out this functionality into its own, much simpler function rather than continuing to overload the already incomprehensible domain controller.
Why?

How did you test it?

cadence --env docstore-prod11 --proxy_region dca admin isolation-groups get-domain --domain cadence-canary-global
Isolation Groups State
asdf5            Drained
asfd             Drained
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region dca admin isolation-groups update-domain --domain cadence-canary-global  --remove-all-drains
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region dca admin isolation-groups get-domain --domain cadence-canary-global
-- No groups found --
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region phx admin isolation-groups get-domain --domain cadence-canary-global
-- No groups found --
Potential risks

141 of 141 new or added lines in 5 files covered. (100.0%)

86988 of 152065 relevant lines covered (57.2%)

2482.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.86
/common/domain/replicationTaskExecutor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination replicationTaskHandler_mock.go
22

23
package domain
24

25
import (
26
        "context"
27
        "time"
28

29
        "github.com/uber/cadence/common/log/tag"
30

31
        "github.com/uber/cadence/common/clock"
32
        "github.com/uber/cadence/common/log"
33
        "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/types"
35
)
36

37
var (
38
        // ErrEmptyDomainReplicationTask is the error to indicate empty replication task
39
        ErrEmptyDomainReplicationTask = &types.BadRequestError{Message: "empty domain replication task"}
40
        // ErrInvalidDomainOperation is the error to indicate empty domain operation attribute
41
        ErrInvalidDomainOperation = &types.BadRequestError{Message: "invalid domain operation attribute"}
42
        // ErrInvalidDomainID is the error to indicate empty rID attribute
43
        ErrInvalidDomainID = &types.BadRequestError{Message: "invalid domain ID attribute"}
44
        // ErrInvalidDomainInfo is the error to indicate empty info attribute
45
        ErrInvalidDomainInfo = &types.BadRequestError{Message: "invalid domain info attribute"}
46
        // ErrInvalidDomainConfig is the error to indicate empty config attribute
47
        ErrInvalidDomainConfig = &types.BadRequestError{Message: "invalid domain config attribute"}
48
        // ErrInvalidDomainReplicationConfig is the error to indicate empty replication config attribute
49
        ErrInvalidDomainReplicationConfig = &types.BadRequestError{Message: "invalid domain replication config attribute"}
50
        // ErrInvalidDomainStatus is the error to indicate invalid domain status
51
        ErrInvalidDomainStatus = &types.BadRequestError{Message: "invalid domain status attribute"}
52
        // ErrNameUUIDCollision is the error to indicate domain name / UUID collision
53
        ErrNameUUIDCollision = &types.BadRequestError{Message: "domain replication encounter name / UUID collision"}
54
)
55

56
const (
57
        defaultDomainRepliationTaskContextTimeout = 5 * time.Second
58
)
59

60
// NOTE: the counterpart of domain replication transmission logic is in service/fropntend package
61

62
type (
63
        // ReplicationTaskExecutor is the interface which is to execute domain replication task
64
        ReplicationTaskExecutor interface {
65
                Execute(task *types.DomainTaskAttributes) error
66
        }
67

68
        domainReplicationTaskExecutorImpl struct {
69
                domainManager persistence.DomainManager
70
                timeSource    clock.TimeSource
71
                logger        log.Logger
72
        }
73
)
74

75
// NewReplicationTaskExecutor create a new instance of domain replicator
76
func NewReplicationTaskExecutor(
77
        domainManager persistence.DomainManager,
78
        timeSource clock.TimeSource,
79
        logger log.Logger,
80
) ReplicationTaskExecutor {
34✔
81

34✔
82
        return &domainReplicationTaskExecutorImpl{
34✔
83
                domainManager: domainManager,
34✔
84
                timeSource:    timeSource,
34✔
85
                logger:        logger,
34✔
86
        }
34✔
87
}
34✔
88

89
// Execute handles receiving of the domain replication task
90
func (h *domainReplicationTaskExecutorImpl) Execute(task *types.DomainTaskAttributes) error {
14✔
91
        ctx, cancel := context.WithTimeout(context.Background(), defaultDomainRepliationTaskContextTimeout)
14✔
92
        defer cancel()
14✔
93

14✔
94
        if err := h.validateDomainReplicationTask(task); err != nil {
14✔
95
                return err
×
96
        }
×
97

98
        switch task.GetDomainOperation() {
14✔
99
        case types.DomainOperationCreate:
9✔
100
                return h.handleDomainCreationReplicationTask(ctx, task)
9✔
101
        case types.DomainOperationUpdate:
5✔
102
                return h.handleDomainUpdateReplicationTask(ctx, task)
5✔
103
        default:
×
104
                return ErrInvalidDomainOperation
×
105
        }
106
}
107

108
// handleDomainCreationReplicationTask handles the domain creation replication task
109
func (h *domainReplicationTaskExecutorImpl) handleDomainCreationReplicationTask(ctx context.Context, task *types.DomainTaskAttributes) error {
10✔
110
        // task already validated
10✔
111
        status, err := h.convertDomainStatusFromThrift(task.Info.Status)
10✔
112
        if err != nil {
10✔
113
                return err
×
114
        }
×
115

116
        request := &persistence.CreateDomainRequest{
10✔
117
                Info: &persistence.DomainInfo{
10✔
118
                        ID:          task.GetID(),
10✔
119
                        Name:        task.Info.GetName(),
10✔
120
                        Status:      status,
10✔
121
                        Description: task.Info.GetDescription(),
10✔
122
                        OwnerEmail:  task.Info.GetOwnerEmail(),
10✔
123
                        Data:        task.Info.Data,
10✔
124
                },
10✔
125
                Config: &persistence.DomainConfig{
10✔
126
                        Retention:                task.Config.GetWorkflowExecutionRetentionPeriodInDays(),
10✔
127
                        EmitMetric:               task.Config.GetEmitMetric(),
10✔
128
                        HistoryArchivalStatus:    task.Config.GetHistoryArchivalStatus(),
10✔
129
                        HistoryArchivalURI:       task.Config.GetHistoryArchivalURI(),
10✔
130
                        VisibilityArchivalStatus: task.Config.GetVisibilityArchivalStatus(),
10✔
131
                        VisibilityArchivalURI:    task.Config.GetVisibilityArchivalURI(),
10✔
132
                },
10✔
133
                ReplicationConfig: &persistence.DomainReplicationConfig{
10✔
134
                        ActiveClusterName: task.ReplicationConfig.GetActiveClusterName(),
10✔
135
                        Clusters:          h.convertClusterReplicationConfigFromThrift(task.ReplicationConfig.Clusters),
10✔
136
                },
10✔
137
                IsGlobalDomain:  true, // local domain will not be replicated
10✔
138
                ConfigVersion:   task.GetConfigVersion(),
10✔
139
                FailoverVersion: task.GetFailoverVersion(),
10✔
140
                LastUpdatedTime: h.timeSource.Now().UnixNano(),
10✔
141
        }
10✔
142

10✔
143
        _, err = h.domainManager.CreateDomain(ctx, request)
10✔
144
        if err != nil {
13✔
145
                // SQL and Cassandra handle domain UUID collision differently
3✔
146
                // here, whenever seeing a error replicating a domain
3✔
147
                // do a check if there is a name / UUID collision
3✔
148

3✔
149
                recordExists := true
3✔
150
                resp, getErr := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
3✔
151
                        Name: task.Info.GetName(),
3✔
152
                })
3✔
153
                switch getErr.(type) {
3✔
154
                case nil:
2✔
155
                        if resp.Info.ID != task.GetID() {
3✔
156
                                return ErrNameUUIDCollision
1✔
157
                        }
1✔
158
                case *types.EntityNotExistsError:
1✔
159
                        // no check is necessary
1✔
160
                        recordExists = false
1✔
161
                default:
×
162
                        // return the original err
×
163
                        return err
×
164
                }
165

166
                resp, getErr = h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
2✔
167
                        ID: task.GetID(),
2✔
168
                })
2✔
169
                switch getErr.(type) {
2✔
170
                case nil:
2✔
171
                        if resp.Info.Name != task.Info.GetName() {
3✔
172
                                return ErrNameUUIDCollision
1✔
173
                        }
1✔
174
                case *types.EntityNotExistsError:
×
175
                        // no check is necessary
×
176
                        recordExists = false
×
177
                default:
×
178
                        // return the original err
×
179
                        return err
×
180
                }
181

182
                if recordExists {
2✔
183
                        // name -> id & id -> name check pass, this is duplication request
1✔
184
                        return nil
1✔
185
                }
1✔
186
                return err
×
187
        }
188

189
        return err
7✔
190
}
191

192
// handleDomainUpdateReplicationTask handles the domain update replication task
193
func (h *domainReplicationTaskExecutorImpl) handleDomainUpdateReplicationTask(ctx context.Context, task *types.DomainTaskAttributes) error {
5✔
194
        // task already validated
5✔
195
        status, err := h.convertDomainStatusFromThrift(task.Info.Status)
5✔
196
        if err != nil {
5✔
197
                return err
×
198
        }
×
199

200
        // first we need to get the current notification version since we need to it for conditional update
201
        metadata, err := h.domainManager.GetMetadata(ctx)
5✔
202
        if err != nil {
5✔
203
                h.logger.Error("Error getting metadata while handling replication task", tag.Error(err))
×
204
                return err
×
205
        }
×
206
        notificationVersion := metadata.NotificationVersion
5✔
207

5✔
208
        // plus, we need to check whether the config version is <= the config version set in the input
5✔
209
        // plus, we need to check whether the failover version is <= the failover version set in the input
5✔
210
        resp, err := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
5✔
211
                Name: task.Info.GetName(),
5✔
212
        })
5✔
213
        if err != nil {
6✔
214
                if _, ok := err.(*types.EntityNotExistsError); ok {
2✔
215
                        // this can happen if the create domain replication task is to processed.
1✔
216
                        // e.g. new cluster which does not have anything
1✔
217
                        return h.handleDomainCreationReplicationTask(ctx, task)
1✔
218
                }
1✔
219
                h.logger.Error("Domain update failed, error in fetching domain", tag.Error(err))
×
220
                return err
×
221
        }
222

223
        recordUpdated := false
4✔
224
        request := &persistence.UpdateDomainRequest{
4✔
225
                Info:                        resp.Info,
4✔
226
                Config:                      resp.Config,
4✔
227
                ReplicationConfig:           resp.ReplicationConfig,
4✔
228
                ConfigVersion:               resp.ConfigVersion,
4✔
229
                FailoverVersion:             resp.FailoverVersion,
4✔
230
                FailoverNotificationVersion: resp.FailoverNotificationVersion,
4✔
231
                PreviousFailoverVersion:     resp.PreviousFailoverVersion,
4✔
232
                NotificationVersion:         notificationVersion,
4✔
233
                LastUpdatedTime:             h.timeSource.Now().UnixNano(),
4✔
234
        }
4✔
235

4✔
236
        if resp.ConfigVersion < task.GetConfigVersion() {
6✔
237
                recordUpdated = true
2✔
238
                request.Info = &persistence.DomainInfo{
2✔
239
                        ID:          task.GetID(),
2✔
240
                        Name:        task.Info.GetName(),
2✔
241
                        Status:      status,
2✔
242
                        Description: task.Info.GetDescription(),
2✔
243
                        OwnerEmail:  task.Info.GetOwnerEmail(),
2✔
244
                        Data:        task.Info.Data,
2✔
245
                }
2✔
246
                request.Config = &persistence.DomainConfig{
2✔
247
                        Retention:                task.Config.GetWorkflowExecutionRetentionPeriodInDays(),
2✔
248
                        EmitMetric:               task.Config.GetEmitMetric(),
2✔
249
                        HistoryArchivalStatus:    task.Config.GetHistoryArchivalStatus(),
2✔
250
                        HistoryArchivalURI:       task.Config.GetHistoryArchivalURI(),
2✔
251
                        VisibilityArchivalStatus: task.Config.GetVisibilityArchivalStatus(),
2✔
252
                        VisibilityArchivalURI:    task.Config.GetVisibilityArchivalURI(),
2✔
253
                        IsolationGroups:          task.Config.GetIsolationGroupsConfiguration(),
2✔
254
                }
2✔
255
                if task.Config.GetBadBinaries() != nil {
2✔
256
                        request.Config.BadBinaries = *task.Config.GetBadBinaries()
×
257
                }
×
258
                request.ReplicationConfig.Clusters = h.convertClusterReplicationConfigFromThrift(task.ReplicationConfig.Clusters)
2✔
259
                request.ConfigVersion = task.GetConfigVersion()
2✔
260
        }
261
        if resp.FailoverVersion < task.GetFailoverVersion() {
6✔
262
                recordUpdated = true
2✔
263
                request.ReplicationConfig.ActiveClusterName = task.ReplicationConfig.GetActiveClusterName()
2✔
264
                request.FailoverVersion = task.GetFailoverVersion()
2✔
265
                request.FailoverNotificationVersion = notificationVersion
2✔
266
                request.PreviousFailoverVersion = task.GetPreviousFailoverVersion()
2✔
267
        }
2✔
268

269
        if !recordUpdated {
5✔
270
                return nil
1✔
271
        }
1✔
272

273
        return h.domainManager.UpdateDomain(ctx, request)
3✔
274
}
275

276
func (h *domainReplicationTaskExecutorImpl) validateDomainReplicationTask(task *types.DomainTaskAttributes) error {
14✔
277
        if task == nil {
14✔
278
                return ErrEmptyDomainReplicationTask
×
279
        }
×
280

281
        if task.DomainOperation == nil {
14✔
282
                return ErrInvalidDomainOperation
×
283
        } else if task.ID == "" {
14✔
284
                return ErrInvalidDomainID
×
285
        } else if task.Info == nil {
14✔
286
                return ErrInvalidDomainInfo
×
287
        } else if task.Config == nil {
14✔
288
                return ErrInvalidDomainConfig
×
289
        } else if task.ReplicationConfig == nil {
14✔
290
                return ErrInvalidDomainReplicationConfig
×
291
        }
×
292
        return nil
14✔
293
}
294

295
func (h *domainReplicationTaskExecutorImpl) convertClusterReplicationConfigFromThrift(
296
        input []*types.ClusterReplicationConfiguration) []*persistence.ClusterReplicationConfig {
19✔
297
        output := []*persistence.ClusterReplicationConfig{}
19✔
298
        for _, cluster := range input {
57✔
299
                clusterName := cluster.GetClusterName()
38✔
300
                output = append(output, &persistence.ClusterReplicationConfig{ClusterName: clusterName})
38✔
301
        }
38✔
302
        return output
19✔
303
}
304

305
func (h *domainReplicationTaskExecutorImpl) convertDomainStatusFromThrift(input *types.DomainStatus) (int, error) {
15✔
306
        if input == nil {
15✔
307
                return 0, ErrInvalidDomainStatus
×
308
        }
×
309

310
        switch *input {
15✔
311
        case types.DomainStatusRegistered:
11✔
312
                return persistence.DomainStatusRegistered, nil
11✔
313
        case types.DomainStatusDeprecated:
4✔
314
                return persistence.DomainStatusDeprecated, nil
4✔
315
        default:
×
316
                return 0, ErrInvalidDomainStatus
×
317
        }
318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc