• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0188b6ba-6aaf-4739-8b30-52eaead5800c

13 Jun 2023 09:47PM UTC coverage: 57.204% (-0.01%) from 57.217%
0188b6ba-6aaf-4739-8b30-52eaead5800c

push

buildkite

web-flow
Bugfix/isolation groups domain drains (#5315)

What changed?
Fixes several bugs in the domain isolation-group handling which wasn't tested properly in replication it notably:

Fixes the problem of upserting configuration from the inactive region, which was previously would error
Fixes the problem of replication of configuration, which was entirely not working
Refactors the domain controller by splitting out this functionality into its own, much simpler function rather than continuing to overload the already incomprehensible domain controller.
Why?

How did you test it?

cadence --env docstore-prod11 --proxy_region dca admin isolation-groups get-domain --domain cadence-canary-global
Isolation Groups State
asdf5            Drained
asfd             Drained
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region dca admin isolation-groups update-domain --domain cadence-canary-global  --remove-all-drains
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region dca admin isolation-groups get-domain --domain cadence-canary-global
-- No groups found --
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region phx admin isolation-groups get-domain --domain cadence-canary-global
-- No groups found --
Potential risks

141 of 141 new or added lines in 5 files covered. (100.0%)

86988 of 152065 relevant lines covered (57.2%)

2482.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.65
/common/persistence/sql/common.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "bytes"
25
        "context"
26
        "encoding/binary"
27
        "encoding/gob"
28
        "fmt"
29

30
        "github.com/uber/cadence/common/log"
31
        "github.com/uber/cadence/common/log/tag"
32
        "github.com/uber/cadence/common/persistence"
33
        p "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/persistence/serialization"
35
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
36
        "github.com/uber/cadence/common/types"
37
)
38

39
// TODO: Rename all SQL Managers to Stores
40
type sqlStore struct {
41
        db     sqlplugin.DB
42
        logger log.Logger
43
        parser serialization.Parser
44
        dc     *p.DynamicConfiguration
45
}
46

47
func (m *sqlStore) GetName() string {
×
48
        return m.db.PluginName()
×
49
}
×
50

51
func (m *sqlStore) Close() {
170✔
52
        if m.db != nil {
340✔
53
                m.db.Close()
170✔
54
        }
170✔
55
}
56

57
func (m *sqlStore) useAsyncTransaction() bool {
2,931✔
58
        return m.db.SupportsAsyncTransaction() && m.dc != nil && m.dc.EnableSQLAsyncTransaction()
2,931✔
59
}
2,931✔
60

61
func (m *sqlStore) txExecute(ctx context.Context, dbShardID int, operation string, f func(tx sqlplugin.Tx) error) error {
9,261✔
62
        tx, err := m.db.BeginTx(ctx, dbShardID)
9,261✔
63
        if err != nil {
9,261✔
64
                return convertCommonErrors(m.db, operation, "Failed to start transaction.", err)
×
65
        }
×
66
        err = f(tx)
9,261✔
67
        if err != nil {
9,291✔
68
                rollBackErr := tx.Rollback()
30✔
69
                if rollBackErr != nil {
30✔
70
                        m.logger.Error("transaction rollback error", tag.Error(rollBackErr))
×
71
                }
×
72
                return convertCommonErrors(m.db, operation, "", err)
30✔
73
        }
74
        if err := tx.Commit(); err != nil {
9,233✔
75
                return convertCommonErrors(m.db, operation, "Failed to commit transaction.", err)
×
76
        }
×
77
        return nil
9,233✔
78
}
79

80
func gobSerialize(x interface{}) ([]byte, error) {
×
81
        b := bytes.Buffer{}
×
82
        e := gob.NewEncoder(&b)
×
83
        err := e.Encode(x)
×
84
        if err != nil {
×
85
                return nil, &types.InternalServiceError{
×
86
                        Message: fmt.Sprintf("Error in serialization: %v", err),
×
87
                }
×
88
        }
×
89
        return b.Bytes(), nil
×
90
}
91

92
func gobDeserialize(a []byte, x interface{}) error {
×
93
        b := bytes.NewBuffer(a)
×
94
        d := gob.NewDecoder(b)
×
95
        err := d.Decode(x)
×
96

×
97
        if err != nil {
×
98
                return &types.InternalServiceError{
×
99
                        Message: fmt.Sprintf("Error in deserialization: %v", err),
×
100
                }
×
101
        }
×
102
        return nil
×
103
}
104

105
func serializePageToken(offset int64) []byte {
44✔
106
        b := make([]byte, 8)
44✔
107
        binary.LittleEndian.PutUint64(b, uint64(offset))
44✔
108
        return b
44✔
109
}
44✔
110

111
func deserializePageToken(payload []byte) (int64, error) {
30✔
112
        if len(payload) != 8 {
30✔
113
                return 0, fmt.Errorf("invalid token of %v length", len(payload))
×
114
        }
×
115
        return int64(binary.LittleEndian.Uint64(payload)), nil
30✔
116
}
117

118
func convertCommonErrors(
119
        errChecker sqlplugin.ErrorChecker,
120
        operation, message string,
121
        err error,
122
) error {
1,217✔
123
        switch err.(type) {
1,217✔
124
        case *persistence.ConditionFailedError,
125
                *persistence.CurrentWorkflowConditionFailedError,
126
                *persistence.WorkflowExecutionAlreadyStartedError,
127
                *persistence.ShardOwnershipLostError,
128
                *persistence.TimeoutError,
129
                *types.DomainAlreadyExistsError,
130
                *types.EntityNotExistsError,
131
                *types.ServiceBusyError,
132
                *types.InternalServiceError:
30✔
133
                return err
30✔
134
        }
135
        if errChecker.IsNotFoundError(err) {
1,245✔
136
                return &types.EntityNotExistsError{
56✔
137
                        Message: fmt.Sprintf("%v failed. %s Error: %v ", operation, message, err),
56✔
138
                }
56✔
139
        }
56✔
140

141
        if errChecker.IsTimeoutError(err) {
1,135✔
142
                return &persistence.TimeoutError{Msg: fmt.Sprintf("%v timed out. %s Error: %v", operation, message, err)}
×
143
        }
×
144

145
        if errChecker.IsThrottlingError(err) {
1,135✔
146
                return &types.ServiceBusyError{
×
147
                        Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
×
148
                }
×
149
        }
×
150

151
        return &types.InternalServiceError{
1,135✔
152
                Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
1,135✔
153
        }
1,135✔
154
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc