• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.47
/common/domain/handler.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination handler_mock.go
22

23
package domain
24

25
import (
26
        "context"
27
        "fmt"
28
        "regexp"
29
        "time"
30

31
        "github.com/pborman/uuid"
32

33
        "github.com/uber/cadence/common"
34
        "github.com/uber/cadence/common/archiver"
35
        "github.com/uber/cadence/common/archiver/provider"
36
        "github.com/uber/cadence/common/clock"
37
        "github.com/uber/cadence/common/cluster"
38
        "github.com/uber/cadence/common/dynamicconfig"
39
        "github.com/uber/cadence/common/log"
40
        "github.com/uber/cadence/common/log/tag"
41
        "github.com/uber/cadence/common/persistence"
42
        "github.com/uber/cadence/common/service"
43
        "github.com/uber/cadence/common/types"
44
)
45

46
var (
47
        errDomainUpdateTooFrequent = &types.ServiceBusyError{Message: "Domain update too frequent."}
48
        errInvalidDomainName       = &types.BadRequestError{Message: "Domain name can only include alphanumeric and dash characters."}
49
)
50

51
type (
52
        // Handler is the domain operation handler
53
        Handler interface {
54
                DeprecateDomain(
55
                        ctx context.Context,
56
                        deprecateRequest *types.DeprecateDomainRequest,
57
                ) error
58
                DescribeDomain(
59
                        ctx context.Context,
60
                        describeRequest *types.DescribeDomainRequest,
61
                ) (*types.DescribeDomainResponse, error)
62
                ListDomains(
63
                        ctx context.Context,
64
                        listRequest *types.ListDomainsRequest,
65
                ) (*types.ListDomainsResponse, error)
66
                RegisterDomain(
67
                        ctx context.Context,
68
                        registerRequest *types.RegisterDomainRequest,
69
                ) error
70
                UpdateDomain(
71
                        ctx context.Context,
72
                        updateRequest *types.UpdateDomainRequest,
73
                ) (*types.UpdateDomainResponse, error)
74
        }
75

76
        // handlerImpl is the domain operation handler implementation
77
        handlerImpl struct {
78
                domainManager       persistence.DomainManager
79
                clusterMetadata     cluster.Metadata
80
                domainReplicator    Replicator
81
                domainAttrValidator *AttrValidatorImpl
82
                archivalMetadata    archiver.ArchivalMetadata
83
                archiverProvider    provider.ArchiverProvider
84
                timeSource          clock.TimeSource
85
                config              Config
86
                logger              log.Logger
87
        }
88

89
        // Config is the domain config for domain handler
90
        Config struct {
91
                MinRetentionDays       dynamicconfig.IntPropertyFn
92
                MaxRetentionDays       dynamicconfig.IntPropertyFn
93
                RequiredDomainDataKeys dynamicconfig.MapPropertyFn
94
                MaxBadBinaryCount      dynamicconfig.IntPropertyFnWithDomainFilter
95
                FailoverCoolDown       dynamicconfig.DurationPropertyFnWithDomainFilter
96
        }
97
)
98

99
var _ Handler = (*handlerImpl)(nil)
100

101
// NewHandler create a new domain handler
102
func NewHandler(
103
        config Config,
104
        logger log.Logger,
105
        domainManager persistence.DomainManager,
106
        clusterMetadata cluster.Metadata,
107
        domainReplicator Replicator,
108
        archivalMetadata archiver.ArchivalMetadata,
109
        archiverProvider provider.ArchiverProvider,
110
        timeSource clock.TimeSource,
111
) Handler {
61✔
112
        return &handlerImpl{
61✔
113
                logger:              logger,
61✔
114
                domainManager:       domainManager,
61✔
115
                clusterMetadata:     clusterMetadata,
61✔
116
                domainReplicator:    domainReplicator,
61✔
117
                domainAttrValidator: newAttrValidator(clusterMetadata, int32(config.MinRetentionDays())),
61✔
118
                archivalMetadata:    archivalMetadata,
61✔
119
                archiverProvider:    archiverProvider,
61✔
120
                timeSource:          timeSource,
61✔
121
                config:              config,
61✔
122
        }
61✔
123
}
61✔
124

125
// RegisterDomain register a new domain
126
func (d *handlerImpl) RegisterDomain(
127
        ctx context.Context,
128
        registerRequest *types.RegisterDomainRequest,
129
) error {
70✔
130

70✔
131
        // cluster global domain enabled
70✔
132
        if !d.clusterMetadata.IsPrimaryCluster() && registerRequest.GetIsGlobalDomain() {
72✔
133
                return errNotPrimaryCluster
2✔
134
        }
2✔
135

136
        // first check if the name is already registered as the local domain
137
        _, err := d.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: registerRequest.GetName()})
68✔
138
        switch err.(type) {
68✔
139
        case nil:
×
140
                // domain already exists, cannot proceed
×
141
                return &types.DomainAlreadyExistsError{Message: "Domain already exists."}
×
142
        case *types.EntityNotExistsError:
68✔
143
                // domain does not exists, proceeds
144
        default:
×
145
                // other err
×
146
                return err
×
147
        }
148

149
        // input validation on domain name
150
        matchedRegex, err := regexp.MatchString("^[a-zA-Z0-9-]+$", registerRequest.GetName())
68✔
151
        if err != nil {
68✔
152
                return err
×
153
        }
×
154
        if !matchedRegex {
69✔
155
                return errInvalidDomainName
1✔
156
        }
1✔
157

158
        activeClusterName := d.clusterMetadata.GetCurrentClusterName()
67✔
159
        // input validation on cluster names
67✔
160
        if registerRequest.ActiveClusterName != "" {
89✔
161
                activeClusterName = registerRequest.GetActiveClusterName()
22✔
162
        }
22✔
163
        clusters := []*persistence.ClusterReplicationConfig{}
67✔
164
        for _, clusterConfig := range registerRequest.Clusters {
101✔
165
                clusterName := clusterConfig.GetClusterName()
34✔
166
                clusters = append(clusters, &persistence.ClusterReplicationConfig{ClusterName: clusterName})
34✔
167
        }
34✔
168
        clusters = cluster.GetOrUseDefaultClusters(activeClusterName, clusters)
67✔
169

67✔
170
        currentHistoryArchivalState := neverEnabledState()
67✔
171
        nextHistoryArchivalState := currentHistoryArchivalState
67✔
172
        clusterHistoryArchivalConfig := d.archivalMetadata.GetHistoryConfig()
67✔
173
        if clusterHistoryArchivalConfig.ClusterConfiguredForArchival() {
76✔
174
                archivalEvent, err := d.toArchivalRegisterEvent(
9✔
175
                        registerRequest.HistoryArchivalStatus,
9✔
176
                        registerRequest.GetHistoryArchivalURI(),
9✔
177
                        clusterHistoryArchivalConfig.GetDomainDefaultStatus(),
9✔
178
                        clusterHistoryArchivalConfig.GetDomainDefaultURI(),
9✔
179
                )
9✔
180
                if err != nil {
9✔
181
                        return err
×
182
                }
×
183

184
                nextHistoryArchivalState, _, err = currentHistoryArchivalState.getNextState(archivalEvent, d.validateHistoryArchivalURI)
9✔
185
                if err != nil {
9✔
186
                        return err
×
187
                }
×
188
        }
189

190
        currentVisibilityArchivalState := neverEnabledState()
67✔
191
        nextVisibilityArchivalState := currentVisibilityArchivalState
67✔
192
        clusterVisibilityArchivalConfig := d.archivalMetadata.GetVisibilityConfig()
67✔
193
        if clusterVisibilityArchivalConfig.ClusterConfiguredForArchival() {
76✔
194
                archivalEvent, err := d.toArchivalRegisterEvent(
9✔
195
                        registerRequest.VisibilityArchivalStatus,
9✔
196
                        registerRequest.GetVisibilityArchivalURI(),
9✔
197
                        clusterVisibilityArchivalConfig.GetDomainDefaultStatus(),
9✔
198
                        clusterVisibilityArchivalConfig.GetDomainDefaultURI(),
9✔
199
                )
9✔
200
                if err != nil {
9✔
201
                        return err
×
202
                }
×
203

204
                nextVisibilityArchivalState, _, err = currentVisibilityArchivalState.getNextState(archivalEvent, d.validateVisibilityArchivalURI)
9✔
205
                if err != nil {
9✔
206
                        return err
×
207
                }
×
208
        }
209

210
        info := &persistence.DomainInfo{
67✔
211
                ID:          uuid.New(),
67✔
212
                Name:        registerRequest.GetName(),
67✔
213
                Status:      persistence.DomainStatusRegistered,
67✔
214
                OwnerEmail:  registerRequest.GetOwnerEmail(),
67✔
215
                Description: registerRequest.GetDescription(),
67✔
216
                Data:        registerRequest.Data,
67✔
217
        }
67✔
218
        config := &persistence.DomainConfig{
67✔
219
                Retention:                registerRequest.GetWorkflowExecutionRetentionPeriodInDays(),
67✔
220
                EmitMetric:               registerRequest.GetEmitMetric(),
67✔
221
                HistoryArchivalStatus:    nextHistoryArchivalState.Status,
67✔
222
                HistoryArchivalURI:       nextHistoryArchivalState.URI,
67✔
223
                VisibilityArchivalStatus: nextVisibilityArchivalState.Status,
67✔
224
                VisibilityArchivalURI:    nextVisibilityArchivalState.URI,
67✔
225
                BadBinaries:              types.BadBinaries{Binaries: map[string]*types.BadBinaryInfo{}},
67✔
226
        }
67✔
227
        replicationConfig := &persistence.DomainReplicationConfig{
67✔
228
                ActiveClusterName: activeClusterName,
67✔
229
                Clusters:          clusters,
67✔
230
        }
67✔
231
        isGlobalDomain := registerRequest.GetIsGlobalDomain()
67✔
232

67✔
233
        if err := d.domainAttrValidator.validateDomainConfig(config); err != nil {
68✔
234
                return err
1✔
235
        }
1✔
236
        if isGlobalDomain {
83✔
237
                if err := d.domainAttrValidator.validateDomainReplicationConfigForGlobalDomain(
17✔
238
                        replicationConfig,
17✔
239
                ); err != nil {
17✔
240
                        return err
×
241
                }
×
242
        } else {
49✔
243
                if err := d.domainAttrValidator.validateDomainReplicationConfigForLocalDomain(
49✔
244
                        replicationConfig,
49✔
245
                ); err != nil {
50✔
246
                        return err
1✔
247
                }
1✔
248
        }
249

250
        failoverVersion := common.EmptyVersion
65✔
251
        if registerRequest.GetIsGlobalDomain() {
82✔
252
                failoverVersion = d.clusterMetadata.GetNextFailoverVersion(activeClusterName, 0, registerRequest.Name)
17✔
253
        }
17✔
254

255
        domainRequest := &persistence.CreateDomainRequest{
65✔
256
                Info:              info,
65✔
257
                Config:            config,
65✔
258
                ReplicationConfig: replicationConfig,
65✔
259
                IsGlobalDomain:    isGlobalDomain,
65✔
260
                ConfigVersion:     0,
65✔
261
                FailoverVersion:   failoverVersion,
65✔
262
                LastUpdatedTime:   d.timeSource.Now().UnixNano(),
65✔
263
        }
65✔
264

65✔
265
        domainResponse, err := d.domainManager.CreateDomain(ctx, domainRequest)
65✔
266
        if err != nil {
65✔
267
                return err
×
268
        }
×
269

270
        if domainRequest.IsGlobalDomain {
82✔
271
                err = d.domainReplicator.HandleTransmissionTask(
17✔
272
                        ctx,
17✔
273
                        types.DomainOperationCreate,
17✔
274
                        domainRequest.Info,
17✔
275
                        domainRequest.Config,
17✔
276
                        domainRequest.ReplicationConfig,
17✔
277
                        domainRequest.ConfigVersion,
17✔
278
                        domainRequest.FailoverVersion,
17✔
279
                        common.InitialPreviousFailoverVersion,
17✔
280
                        domainRequest.IsGlobalDomain,
17✔
281
                )
17✔
282
                if err != nil {
17✔
283
                        return err
×
284
                }
×
285
        }
286

287
        d.logger.Info("Register domain succeeded",
65✔
288
                tag.WorkflowDomainName(registerRequest.GetName()),
65✔
289
                tag.WorkflowDomainID(domainResponse.ID),
65✔
290
        )
65✔
291

65✔
292
        return nil
65✔
293
}
294

295
// ListDomains list all domains
296
func (d *handlerImpl) ListDomains(
297
        ctx context.Context,
298
        listRequest *types.ListDomainsRequest,
299
) (*types.ListDomainsResponse, error) {
5✔
300

5✔
301
        pageSize := 100
5✔
302
        if listRequest.GetPageSize() != 0 {
10✔
303
                pageSize = int(listRequest.GetPageSize())
5✔
304
        }
5✔
305

306
        resp, err := d.domainManager.ListDomains(ctx, &persistence.ListDomainsRequest{
5✔
307
                PageSize:      pageSize,
5✔
308
                NextPageToken: listRequest.NextPageToken,
5✔
309
        })
5✔
310

5✔
311
        if err != nil {
5✔
312
                return nil, err
×
313
        }
×
314

315
        domains := []*types.DescribeDomainResponse{}
5✔
316
        for _, domain := range resp.Domains {
8✔
317
                desc := &types.DescribeDomainResponse{
3✔
318
                        IsGlobalDomain:  domain.IsGlobalDomain,
3✔
319
                        FailoverVersion: domain.FailoverVersion,
3✔
320
                }
3✔
321
                desc.DomainInfo, desc.Configuration, desc.ReplicationConfiguration = d.createResponse(domain.Info, domain.Config, domain.ReplicationConfig)
3✔
322
                domains = append(domains, desc)
3✔
323
        }
3✔
324

325
        response := &types.ListDomainsResponse{
5✔
326
                Domains:       domains,
5✔
327
                NextPageToken: resp.NextPageToken,
5✔
328
        }
5✔
329

5✔
330
        return response, nil
5✔
331
}
332

333
// DescribeDomain describe the domain
334
func (d *handlerImpl) DescribeDomain(
335
        ctx context.Context,
336
        describeRequest *types.DescribeDomainRequest,
337
) (*types.DescribeDomainResponse, error) {
156✔
338

156✔
339
        // TODO, we should migrate the non global domain to new table, see #773
156✔
340
        req := &persistence.GetDomainRequest{
156✔
341
                Name: describeRequest.GetName(),
156✔
342
                ID:   describeRequest.GetUUID(),
156✔
343
        }
156✔
344
        resp, err := d.domainManager.GetDomain(ctx, req)
156✔
345
        if err != nil {
158✔
346
                return nil, err
2✔
347
        }
2✔
348

349
        response := &types.DescribeDomainResponse{
154✔
350
                IsGlobalDomain:  resp.IsGlobalDomain,
154✔
351
                FailoverVersion: resp.FailoverVersion,
154✔
352
        }
154✔
353
        if resp.FailoverEndTime != nil {
154✔
354
                response.FailoverInfo = &types.FailoverInfo{
×
355
                        FailoverVersion: resp.FailoverVersion,
×
356
                        // This reflects that last domain update time. If there is a domain config update, this won't be accurate.
×
357
                        FailoverStartTimestamp:  resp.LastUpdatedTime,
×
358
                        FailoverExpireTimestamp: *resp.FailoverEndTime,
×
359
                }
×
360
        }
×
361
        response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(resp.Info, resp.Config, resp.ReplicationConfig)
154✔
362
        return response, nil
154✔
363
}
364

365
// UpdateDomain update the domain
366
func (d *handlerImpl) UpdateDomain(
367
        ctx context.Context,
368
        updateRequest *types.UpdateDomainRequest,
369
) (*types.UpdateDomainResponse, error) {
23✔
370

23✔
371
        // must get the metadata (notificationVersion) first
23✔
372
        // this version can be regarded as the lock on the v2 domain table
23✔
373
        // and since we do not know which table will return the domain afterwards
23✔
374
        // this call has to be made
23✔
375
        metadata, err := d.domainManager.GetMetadata(ctx)
23✔
376
        if err != nil {
23✔
377
                return nil, err
×
378
        }
×
379
        notificationVersion := metadata.NotificationVersion
23✔
380
        getResponse, err := d.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: updateRequest.GetName()})
23✔
381
        if err != nil {
23✔
382
                return nil, err
×
383
        }
×
384

385
        info := getResponse.Info
23✔
386
        config := getResponse.Config
23✔
387
        replicationConfig := getResponse.ReplicationConfig
23✔
388
        configVersion := getResponse.ConfigVersion
23✔
389
        failoverVersion := getResponse.FailoverVersion
23✔
390
        failoverNotificationVersion := getResponse.FailoverNotificationVersion
23✔
391
        isGlobalDomain := getResponse.IsGlobalDomain
23✔
392
        gracefulFailoverEndTime := getResponse.FailoverEndTime
23✔
393
        currentActiveCluster := replicationConfig.ActiveClusterName
23✔
394
        previousFailoverVersion := getResponse.PreviousFailoverVersion
23✔
395
        lastUpdatedTime := time.Unix(0, getResponse.LastUpdatedTime)
23✔
396

23✔
397
        // whether history archival config changed
23✔
398
        historyArchivalConfigChanged := false
23✔
399
        // whether visibility archival config changed
23✔
400
        visibilityArchivalConfigChanged := false
23✔
401
        // whether active cluster is changed
23✔
402
        activeClusterChanged := false
23✔
403

23✔
404
        // Whether isolation-groups are changed. There's semantic API difference between the
23✔
405
        // domain isolation-group update API which is a flat upsert and this handler API which
23✔
406
        // relies on modifying specified values only, and leaving the rest as-is.
23✔
407
        // for callers of the Domain API wishing to update isolation groups,
23✔
408
        // To distinguish between the operation to remove all drains and
23✔
409
        // take no action, the UpdateDomain handler uses the presence or nil-ness of the IsolationGroup
23✔
410
        // field to indicate either drain removal (where the struct is present), and to take no action
23✔
411
        // where the field is nil.
23✔
412
        //
23✔
413
        // ie, for the purposes of this handler:
23✔
414
        // h.UpdateDomain(ctx, UpdateDomainRequest{
23✔
415
        //   IsolationGroups: nil,                              // take no action for isolation groups
23✔
416
        // })
23✔
417
        // h.UpdateDomain(ctx, UpdateDomainRequest{
23✔
418
        //   IsolationGroups: IsolationGroupConfiguration{},    // remove all isolation groups for domain
23✔
419
        // })
23✔
420
        config, isolationGroupConfigurationChanged, err := d.getIsolationGroupStatus(config, updateRequest)
23✔
421
        // whether anything other than active cluster is changed
23✔
422
        configurationChanged := false
23✔
423

23✔
424
        // Update history archival state
23✔
425
        historyArchivalState, historyArchivalConfigChanged, err := d.getHistoryArchivalState(
23✔
426
                config,
23✔
427
                updateRequest,
23✔
428
        )
23✔
429
        if err != nil {
23✔
430
                return nil, err
×
431
        }
×
432
        if historyArchivalConfigChanged {
23✔
433
                config.HistoryArchivalStatus = historyArchivalState.Status
×
434
                config.HistoryArchivalURI = historyArchivalState.URI
×
435
        }
×
436

437
        // Update visibility archival state
438
        visibilityArchivalState, visibilityArchivalConfigChanged, err := d.getVisibilityArchivalState(
23✔
439
                config,
23✔
440
                updateRequest,
23✔
441
        )
23✔
442
        if err != nil {
23✔
443
                return nil, err
×
444
        }
×
445
        if visibilityArchivalConfigChanged {
23✔
446
                config.VisibilityArchivalStatus = visibilityArchivalState.Status
×
447
                config.VisibilityArchivalURI = visibilityArchivalState.URI
×
448
        }
×
449

450
        // Update domain info
451
        info, domainInfoChanged := d.updateDomainInfo(
23✔
452
                updateRequest,
23✔
453
                info,
23✔
454
        )
23✔
455
        // Update domain config
23✔
456
        config, domainConfigChanged, err := d.updateDomainConfiguration(
23✔
457
                updateRequest.GetName(),
23✔
458
                config,
23✔
459
                updateRequest,
23✔
460
        )
23✔
461
        if err != nil {
23✔
462
                return nil, err
×
463
        }
×
464

465
        // Update domain bad binary
466
        config, deleteBinaryChanged, err := d.updateDeleteBadBinary(
23✔
467
                config,
23✔
468
                updateRequest.DeleteBadBinary,
23✔
469
        )
23✔
470
        if err != nil {
23✔
471
                return nil, err
×
472
        }
×
473

474
        // Update replication config
475
        replicationConfig, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig(
23✔
476
                replicationConfig,
23✔
477
                updateRequest,
23✔
478
        )
23✔
479
        if err != nil {
23✔
480
                return nil, err
×
481
        }
×
482

483
        // Handle graceful failover request
484
        if updateRequest.FailoverTimeoutInSeconds != nil {
29✔
485
                // must update active cluster on a global domain
6✔
486
                if !activeClusterChanged || !isGlobalDomain {
7✔
487
                        return nil, errInvalidGracefulFailover
1✔
488
                }
1✔
489
                // must start with the passive -> active cluster
490
                if replicationConfig.ActiveClusterName != d.clusterMetadata.GetCurrentClusterName() {
6✔
491
                        return nil, errCannotDoGracefulFailoverFromCluster
1✔
492
                }
1✔
493
                if replicationConfig.ActiveClusterName == currentActiveCluster {
5✔
494
                        return nil, errGracefulFailoverInActiveCluster
1✔
495
                }
1✔
496
                // cannot have concurrent failover
497
                if gracefulFailoverEndTime != nil {
3✔
498
                        return nil, errOngoingGracefulFailover
×
499
                }
×
500
                endTime := d.timeSource.Now().Add(time.Duration(updateRequest.GetFailoverTimeoutInSeconds()) * time.Second).UnixNano()
3✔
501
                gracefulFailoverEndTime = &endTime
3✔
502
                previousFailoverVersion = failoverVersion
3✔
503
        }
504

505
        configurationChanged = historyArchivalConfigChanged || visibilityArchivalConfigChanged || domainInfoChanged || domainConfigChanged || deleteBinaryChanged || replicationConfigChanged || isolationGroupConfigurationChanged
20✔
506

20✔
507
        if err := d.domainAttrValidator.validateDomainConfig(config); err != nil {
21✔
508
                return nil, err
1✔
509
        }
1✔
510
        if isGlobalDomain {
34✔
511
                if err := d.domainAttrValidator.validateDomainReplicationConfigForGlobalDomain(
15✔
512
                        replicationConfig,
15✔
513
                ); err != nil {
15✔
514
                        return nil, err
×
515
                }
×
516

517
                if configurationChanged && activeClusterChanged {
15✔
518
                        return nil, errCannotDoDomainFailoverAndUpdate
×
519
                }
×
520

521
                if !activeClusterChanged && !d.clusterMetadata.IsPrimaryCluster() {
17✔
522
                        return nil, errNotPrimaryCluster
2✔
523
                }
2✔
524
        } else {
4✔
525
                if err := d.domainAttrValidator.validateDomainReplicationConfigForLocalDomain(
4✔
526
                        replicationConfig,
4✔
527
                ); err != nil {
4✔
528
                        return nil, err
×
529
                }
×
530
        }
531

532
        if configurationChanged || activeClusterChanged {
31✔
533
                now := d.timeSource.Now()
14✔
534
                // Check the failover cool down time
14✔
535
                if lastUpdatedTime.Add(d.config.FailoverCoolDown(info.Name)).After(now) {
15✔
536
                        return nil, errDomainUpdateTooFrequent
1✔
537
                }
1✔
538

539
                // set the versions
540
                if configurationChanged {
19✔
541
                        configVersion++
6✔
542
                }
6✔
543
                if activeClusterChanged && isGlobalDomain {
20✔
544
                        // Force failover cleans graceful failover state
7✔
545
                        if updateRequest.FailoverTimeoutInSeconds == nil {
11✔
546
                                // force failover cleanup graceful failover state
4✔
547
                                gracefulFailoverEndTime = nil
4✔
548
                                previousFailoverVersion = common.InitialPreviousFailoverVersion
4✔
549
                        }
4✔
550
                        failoverVersion = d.clusterMetadata.GetNextFailoverVersion(
7✔
551
                                replicationConfig.ActiveClusterName,
7✔
552
                                failoverVersion,
7✔
553
                                updateRequest.Name,
7✔
554
                        )
7✔
555
                        failoverNotificationVersion = notificationVersion
7✔
556
                }
557
                lastUpdatedTime = now
13✔
558
                updateReq := &persistence.UpdateDomainRequest{
13✔
559
                        Info:                        info,
13✔
560
                        Config:                      config,
13✔
561
                        ReplicationConfig:           replicationConfig,
13✔
562
                        ConfigVersion:               configVersion,
13✔
563
                        FailoverVersion:             failoverVersion,
13✔
564
                        FailoverNotificationVersion: failoverNotificationVersion,
13✔
565
                        FailoverEndTime:             gracefulFailoverEndTime,
13✔
566
                        PreviousFailoverVersion:     previousFailoverVersion,
13✔
567
                        LastUpdatedTime:             lastUpdatedTime.UnixNano(),
13✔
568
                        NotificationVersion:         notificationVersion,
13✔
569
                }
13✔
570
                err = d.domainManager.UpdateDomain(ctx, updateReq)
13✔
571
                if err != nil {
13✔
572
                        return nil, err
×
573
                }
×
574
        }
575

576
        if isGlobalDomain {
28✔
577
                if err := d.domainReplicator.HandleTransmissionTask(
12✔
578
                        ctx,
12✔
579
                        types.DomainOperationUpdate,
12✔
580
                        info,
12✔
581
                        config,
12✔
582
                        replicationConfig,
12✔
583
                        configVersion,
12✔
584
                        failoverVersion,
12✔
585
                        previousFailoverVersion,
12✔
586
                        isGlobalDomain,
12✔
587
                ); err != nil {
12✔
588
                        return nil, err
×
589
                }
×
590
        }
591

592
        response := &types.UpdateDomainResponse{
16✔
593
                IsGlobalDomain:  isGlobalDomain,
16✔
594
                FailoverVersion: failoverVersion,
16✔
595
        }
16✔
596
        response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(info, config, replicationConfig)
16✔
597

16✔
598
        d.logger.Info("Update domain succeeded",
16✔
599
                tag.WorkflowDomainName(info.Name),
16✔
600
                tag.WorkflowDomainID(info.ID),
16✔
601
        )
16✔
602
        return response, nil
16✔
603
}
604

605
// DeprecateDomain deprecates a domain
606
func (d *handlerImpl) DeprecateDomain(
607
        ctx context.Context,
608
        deprecateRequest *types.DeprecateDomainRequest,
609
) error {
4✔
610

4✔
611
        // must get the metadata (notificationVersion) first
4✔
612
        // this version can be regarded as the lock on the v2 domain table
4✔
613
        // and since we do not know which table will return the domain afterwards
4✔
614
        // this call has to be made
4✔
615
        metadata, err := d.domainManager.GetMetadata(ctx)
4✔
616
        if err != nil {
4✔
617
                return err
×
618
        }
×
619
        notificationVersion := metadata.NotificationVersion
4✔
620
        getResponse, err := d.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: deprecateRequest.GetName()})
4✔
621
        if err != nil {
4✔
622
                return err
×
623
        }
×
624

625
        isGlobalDomain := getResponse.IsGlobalDomain
4✔
626
        if isGlobalDomain && !d.clusterMetadata.IsPrimaryCluster() {
5✔
627
                return errNotPrimaryCluster
1✔
628
        }
1✔
629
        getResponse.ConfigVersion = getResponse.ConfigVersion + 1
3✔
630
        getResponse.Info.Status = persistence.DomainStatusDeprecated
3✔
631

3✔
632
        updateReq := &persistence.UpdateDomainRequest{
3✔
633
                Info:                        getResponse.Info,
3✔
634
                Config:                      getResponse.Config,
3✔
635
                ReplicationConfig:           getResponse.ReplicationConfig,
3✔
636
                ConfigVersion:               getResponse.ConfigVersion,
3✔
637
                FailoverVersion:             getResponse.FailoverVersion,
3✔
638
                FailoverNotificationVersion: getResponse.FailoverNotificationVersion,
3✔
639
                FailoverEndTime:             getResponse.FailoverEndTime,
3✔
640
                PreviousFailoverVersion:     getResponse.PreviousFailoverVersion,
3✔
641
                LastUpdatedTime:             d.timeSource.Now().UnixNano(),
3✔
642
                NotificationVersion:         notificationVersion,
3✔
643
        }
3✔
644
        err = d.domainManager.UpdateDomain(ctx, updateReq)
3✔
645
        if err != nil {
3✔
646
                return err
×
647
        }
×
648

649
        if isGlobalDomain {
4✔
650
                if err := d.domainReplicator.HandleTransmissionTask(
1✔
651
                        ctx,
1✔
652
                        types.DomainOperationUpdate,
1✔
653
                        getResponse.Info,
1✔
654
                        getResponse.Config,
1✔
655
                        getResponse.ReplicationConfig,
1✔
656
                        getResponse.ConfigVersion,
1✔
657
                        getResponse.FailoverVersion,
1✔
658
                        getResponse.PreviousFailoverVersion,
1✔
659
                        isGlobalDomain,
1✔
660
                ); err != nil {
1✔
661
                        return err
×
662
                }
×
663
        }
664

665
        d.logger.Info("DeprecateDomain domain succeeded",
3✔
666
                tag.WorkflowDomainName(getResponse.Info.Name),
3✔
667
                tag.WorkflowDomainID(getResponse.Info.ID),
3✔
668
        )
3✔
669
        return nil
3✔
670
}
671

672
func (d *handlerImpl) createResponse(
673
        info *persistence.DomainInfo,
674
        config *persistence.DomainConfig,
675
        replicationConfig *persistence.DomainReplicationConfig,
676
) (*types.DomainInfo, *types.DomainConfiguration, *types.DomainReplicationConfiguration) {
173✔
677

173✔
678
        infoResult := &types.DomainInfo{
173✔
679
                Name:        info.Name,
173✔
680
                Status:      getDomainStatus(info),
173✔
681
                Description: info.Description,
173✔
682
                OwnerEmail:  info.OwnerEmail,
173✔
683
                Data:        info.Data,
173✔
684
                UUID:        info.ID,
173✔
685
        }
173✔
686

173✔
687
        configResult := &types.DomainConfiguration{
173✔
688
                EmitMetric:                             config.EmitMetric,
173✔
689
                WorkflowExecutionRetentionPeriodInDays: config.Retention,
173✔
690
                HistoryArchivalStatus:                  config.HistoryArchivalStatus.Ptr(),
173✔
691
                HistoryArchivalURI:                     config.HistoryArchivalURI,
173✔
692
                VisibilityArchivalStatus:               config.VisibilityArchivalStatus.Ptr(),
173✔
693
                VisibilityArchivalURI:                  config.VisibilityArchivalURI,
173✔
694
                BadBinaries:                            &config.BadBinaries,
173✔
695
                IsolationGroups:                        &config.IsolationGroups,
173✔
696
        }
173✔
697

173✔
698
        clusters := []*types.ClusterReplicationConfiguration{}
173✔
699
        for _, cluster := range replicationConfig.Clusters {
365✔
700
                clusters = append(clusters, &types.ClusterReplicationConfiguration{
192✔
701
                        ClusterName: cluster.ClusterName,
192✔
702
                })
192✔
703
        }
192✔
704

705
        replicationConfigResult := &types.DomainReplicationConfiguration{
173✔
706
                ActiveClusterName: replicationConfig.ActiveClusterName,
173✔
707
                Clusters:          clusters,
173✔
708
        }
173✔
709

173✔
710
        return infoResult, configResult, replicationConfigResult
173✔
711
}
712

713
func (d *handlerImpl) mergeBadBinaries(
714
        old map[string]*types.BadBinaryInfo,
715
        new map[string]*types.BadBinaryInfo,
716
        createTimeNano int64,
717
) types.BadBinaries {
8✔
718

8✔
719
        if old == nil {
9✔
720
                old = map[string]*types.BadBinaryInfo{}
1✔
721
        }
1✔
722
        for k, v := range new {
14✔
723
                v.CreatedTimeNano = common.Int64Ptr(createTimeNano)
6✔
724
                old[k] = v
6✔
725
        }
6✔
726
        return types.BadBinaries{
8✔
727
                Binaries: old,
8✔
728
        }
8✔
729
}
730

731
func (d *handlerImpl) mergeDomainData(
732
        old map[string]string,
733
        new map[string]string,
734
) map[string]string {
9✔
735

9✔
736
        if old == nil {
11✔
737
                old = map[string]string{}
2✔
738
        }
2✔
739
        for k, v := range new {
20✔
740
                old[k] = v
11✔
741
        }
11✔
742
        return old
9✔
743
}
744

745
func (d *handlerImpl) toArchivalRegisterEvent(
746
        status *types.ArchivalStatus,
747
        URI string,
748
        defaultStatus types.ArchivalStatus,
749
        defaultURI string,
750
) (*ArchivalEvent, error) {
18✔
751

18✔
752
        event := &ArchivalEvent{
18✔
753
                status:     status,
18✔
754
                URI:        URI,
18✔
755
                defaultURI: defaultURI,
18✔
756
        }
18✔
757
        if event.status == nil {
18✔
758
                event.status = defaultStatus.Ptr()
×
759
        }
×
760
        if err := event.validate(); err != nil {
18✔
761
                return nil, err
×
762
        }
×
763
        return event, nil
18✔
764
}
765

766
func (d *handlerImpl) toArchivalUpdateEvent(
767
        status *types.ArchivalStatus,
768
        URI string,
769
        defaultURI string,
770
) (*ArchivalEvent, error) {
×
771

×
772
        event := &ArchivalEvent{
×
773
                status:     status,
×
774
                URI:        URI,
×
775
                defaultURI: defaultURI,
×
776
        }
×
777
        if err := event.validate(); err != nil {
×
778
                return nil, err
×
779
        }
×
780
        return event, nil
×
781
}
782

783
func (d *handlerImpl) validateHistoryArchivalURI(URIString string) error {
×
784
        URI, err := archiver.NewURI(URIString)
×
785
        if err != nil {
×
786
                return err
×
787
        }
×
788

789
        archiver, err := d.archiverProvider.GetHistoryArchiver(URI.Scheme(), service.Frontend)
×
790
        if err != nil {
×
791
                return err
×
792
        }
×
793

794
        return archiver.ValidateURI(URI)
×
795
}
796

797
func (d *handlerImpl) validateVisibilityArchivalURI(URIString string) error {
×
798
        URI, err := archiver.NewURI(URIString)
×
799
        if err != nil {
×
800
                return err
×
801
        }
×
802

803
        archiver, err := d.archiverProvider.GetVisibilityArchiver(URI.Scheme(), service.Frontend)
×
804
        if err != nil {
×
805
                return err
×
806
        }
×
807

808
        return archiver.ValidateURI(URI)
×
809
}
810

811
func (d *handlerImpl) getHistoryArchivalState(
812
        config *persistence.DomainConfig,
813
        updateRequest *types.UpdateDomainRequest,
814
) (*ArchivalState, bool, error) {
23✔
815

23✔
816
        currentHistoryArchivalState := &ArchivalState{
23✔
817
                Status: config.HistoryArchivalStatus,
23✔
818
                URI:    config.HistoryArchivalURI,
23✔
819
        }
23✔
820
        clusterHistoryArchivalConfig := d.archivalMetadata.GetHistoryConfig()
23✔
821

23✔
822
        if clusterHistoryArchivalConfig.ClusterConfiguredForArchival() {
23✔
823
                archivalEvent, err := d.toArchivalUpdateEvent(
×
824
                        updateRequest.HistoryArchivalStatus,
×
825
                        updateRequest.GetHistoryArchivalURI(),
×
826
                        clusterHistoryArchivalConfig.GetDomainDefaultURI(),
×
827
                )
×
828
                if err != nil {
×
829
                        return currentHistoryArchivalState, false, err
×
830
                }
×
831
                return currentHistoryArchivalState.getNextState(archivalEvent, d.validateHistoryArchivalURI)
×
832
        }
833
        return currentHistoryArchivalState, false, nil
23✔
834
}
835

836
func (d *handlerImpl) getVisibilityArchivalState(
837
        config *persistence.DomainConfig,
838
        updateRequest *types.UpdateDomainRequest,
839
) (*ArchivalState, bool, error) {
23✔
840
        currentVisibilityArchivalState := &ArchivalState{
23✔
841
                Status: config.VisibilityArchivalStatus,
23✔
842
                URI:    config.VisibilityArchivalURI,
23✔
843
        }
23✔
844
        clusterVisibilityArchivalConfig := d.archivalMetadata.GetVisibilityConfig()
23✔
845
        if clusterVisibilityArchivalConfig.ClusterConfiguredForArchival() {
23✔
846
                archivalEvent, err := d.toArchivalUpdateEvent(
×
847
                        updateRequest.VisibilityArchivalStatus,
×
848
                        updateRequest.GetVisibilityArchivalURI(),
×
849
                        clusterVisibilityArchivalConfig.GetDomainDefaultURI(),
×
850
                )
×
851
                if err != nil {
×
852
                        return currentVisibilityArchivalState, false, err
×
853
                }
×
854
                return currentVisibilityArchivalState.getNextState(archivalEvent, d.validateVisibilityArchivalURI)
×
855
        }
856
        return currentVisibilityArchivalState, false, nil
23✔
857
}
858

859
func (d *handlerImpl) updateDomainInfo(
860
        updateRequest *types.UpdateDomainRequest,
861
        currentDomainInfo *persistence.DomainInfo,
862
) (*persistence.DomainInfo, bool) {
23✔
863

23✔
864
        isDomainUpdated := false
23✔
865
        if updateRequest.Description != nil {
28✔
866
                isDomainUpdated = true
5✔
867
                currentDomainInfo.Description = *updateRequest.Description
5✔
868
        }
5✔
869
        if updateRequest.OwnerEmail != nil {
28✔
870
                isDomainUpdated = true
5✔
871
                currentDomainInfo.OwnerEmail = *updateRequest.OwnerEmail
5✔
872
        }
5✔
873
        if updateRequest.Data != nil {
28✔
874
                isDomainUpdated = true
5✔
875
                // only do merging
5✔
876
                currentDomainInfo.Data = d.mergeDomainData(currentDomainInfo.Data, updateRequest.Data)
5✔
877
        }
5✔
878
        return currentDomainInfo, isDomainUpdated
23✔
879
}
880

881
func (d *handlerImpl) updateDomainConfiguration(
882
        domainName string,
883
        config *persistence.DomainConfig,
884
        updateRequest *types.UpdateDomainRequest,
885
) (*persistence.DomainConfig, bool, error) {
23✔
886

23✔
887
        isConfigChanged := false
23✔
888
        if updateRequest.EmitMetric != nil {
27✔
889
                isConfigChanged = true
4✔
890
                config.EmitMetric = *updateRequest.EmitMetric
4✔
891
        }
4✔
892
        if updateRequest.WorkflowExecutionRetentionPeriodInDays != nil {
28✔
893
                isConfigChanged = true
5✔
894
                config.Retention = *updateRequest.WorkflowExecutionRetentionPeriodInDays
5✔
895
        }
5✔
896
        if updateRequest.BadBinaries != nil {
27✔
897
                maxLength := d.config.MaxBadBinaryCount(domainName)
4✔
898
                // only do merging
4✔
899
                config.BadBinaries = d.mergeBadBinaries(config.BadBinaries.Binaries, updateRequest.BadBinaries.Binaries, time.Now().UnixNano())
4✔
900
                if len(config.BadBinaries.Binaries) > maxLength {
4✔
901
                        return config, isConfigChanged, &types.BadRequestError{
×
902
                                Message: fmt.Sprintf("Total resetBinaries cannot exceed the max limit: %v", maxLength),
×
903
                        }
×
904
                }
×
905
        }
906
        return config, isConfigChanged, nil
23✔
907
}
908

909
func (d *handlerImpl) updateDeleteBadBinary(
910
        config *persistence.DomainConfig,
911
        deleteBadBinary *string,
912
) (*persistence.DomainConfig, bool, error) {
23✔
913

23✔
914
        if deleteBadBinary != nil {
23✔
915
                _, ok := config.BadBinaries.Binaries[*deleteBadBinary]
×
916
                if !ok {
×
917
                        return config, false, &types.BadRequestError{
×
918
                                Message: fmt.Sprintf("Bad binary checksum %v doesn't exists.", *deleteBadBinary),
×
919
                        }
×
920
                }
×
921
                delete(config.BadBinaries.Binaries, *deleteBadBinary)
×
922
                return config, true, nil
×
923
        }
924
        return config, false, nil
23✔
925
}
926

927
func (d *handlerImpl) updateReplicationConfig(
928
        config *persistence.DomainReplicationConfig,
929
        updateRequest *types.UpdateDomainRequest,
930
) (*persistence.DomainReplicationConfig, bool, bool, error) {
23✔
931

23✔
932
        clusterUpdated := false
23✔
933
        activeClusterUpdated := false
23✔
934
        if len(updateRequest.Clusters) != 0 {
27✔
935
                clusterUpdated = true
4✔
936
                clustersNew := []*persistence.ClusterReplicationConfig{}
4✔
937
                for _, clusterConfig := range updateRequest.Clusters {
10✔
938
                        clustersNew = append(clustersNew, &persistence.ClusterReplicationConfig{
6✔
939
                                ClusterName: clusterConfig.GetClusterName(),
6✔
940
                        })
6✔
941
                }
6✔
942

943
                if err := d.domainAttrValidator.validateDomainReplicationConfigClustersDoesNotRemove(
4✔
944
                        config.Clusters,
4✔
945
                        clustersNew,
4✔
946
                ); err != nil {
4✔
947
                        d.logger.Warn("removing replica clusters from domain replication group", tag.Error(err))
×
948
                }
×
949
                config.Clusters = clustersNew
4✔
950
        }
951

952
        if updateRequest.ActiveClusterName != nil {
34✔
953
                activeClusterUpdated = true
11✔
954
                config.ActiveClusterName = *updateRequest.ActiveClusterName
11✔
955
        }
11✔
956
        return config, clusterUpdated, activeClusterUpdated, nil
23✔
957
}
958

959
func (d *handlerImpl) getIsolationGroupStatus(
960
        incomingCfg *persistence.DomainConfig,
961
        updateRequest *types.UpdateDomainRequest,
962
) (config *persistence.DomainConfig, isolationGroupsChanged bool, err error) {
23✔
963

23✔
964
        if updateRequest == nil || updateRequest.IsolationGroupConfiguration == nil {
44✔
965
                return incomingCfg, false, nil
21✔
966
        }
21✔
967

968
        if incomingCfg == nil {
2✔
969
                return &persistence.DomainConfig{
×
970
                        IsolationGroups: *updateRequest.IsolationGroupConfiguration,
×
971
                }, true, nil
×
972
        }
×
973

974
        // upsert with whatever is present in the request always
975
        incomingCfg.IsolationGroups = *updateRequest.IsolationGroupConfiguration
2✔
976
        return incomingCfg, true, nil
2✔
977
}
978

979
func getDomainStatus(info *persistence.DomainInfo) *types.DomainStatus {
173✔
980
        switch info.Status {
173✔
981
        case persistence.DomainStatusRegistered:
170✔
982
                v := types.DomainStatusRegistered
170✔
983
                return &v
170✔
984
        case persistence.DomainStatusDeprecated:
3✔
985
                v := types.DomainStatusDeprecated
3✔
986
                return &v
3✔
987
        case persistence.DomainStatusDeleted:
×
988
                v := types.DomainStatusDeleted
×
989
                return &v
×
990
        }
991

992
        return nil
×
993
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc