• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01894bf6-a954-4eb3-a178-a38598dfe3b0

12 Jul 2023 09:16PM UTC coverage: 57.228% (-0.06%) from 57.284%
01894bf6-a954-4eb3-a178-a38598dfe3b0

push

buildkite

web-flow
[CLI] add domain migration command with domain metadata checker (#5335)

Added a domain migration command. Currently, it checks the domain metadata and long running workflows.

To ensure both domains exist before domain migration happens.
To ensure domain doesn't have long running workflows that migration cannot handle.

How did you test it?
tested locally with docker compose

103 of 103 new or added lines in 3 files covered. (100.0%)

87149 of 152284 relevant lines covered (57.23%)

2494.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.65
/common/persistence/sql/common.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "bytes"
25
        "context"
26
        "encoding/binary"
27
        "encoding/gob"
28
        "fmt"
29

30
        "github.com/uber/cadence/common/log"
31
        "github.com/uber/cadence/common/log/tag"
32
        "github.com/uber/cadence/common/persistence"
33
        p "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/persistence/serialization"
35
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
36
        "github.com/uber/cadence/common/types"
37
)
38

39
// TODO: Rename all SQL Managers to Stores
40
type sqlStore struct {
41
        db     sqlplugin.DB
42
        logger log.Logger
43
        parser serialization.Parser
44
        dc     *p.DynamicConfiguration
45
}
46

47
func (m *sqlStore) GetName() string {
×
48
        return m.db.PluginName()
×
49
}
×
50

51
func (m *sqlStore) Close() {
170✔
52
        if m.db != nil {
340✔
53
                m.db.Close()
170✔
54
        }
170✔
55
}
56

57
func (m *sqlStore) useAsyncTransaction() bool {
2,930✔
58
        return m.db.SupportsAsyncTransaction() && m.dc != nil && m.dc.EnableSQLAsyncTransaction()
2,930✔
59
}
2,930✔
60

61
func (m *sqlStore) txExecute(ctx context.Context, dbShardID int, operation string, f func(tx sqlplugin.Tx) error) error {
9,161✔
62
        tx, err := m.db.BeginTx(ctx, dbShardID)
9,161✔
63
        if err != nil {
9,161✔
64
                return convertCommonErrors(m.db, operation, "Failed to start transaction.", err)
×
65
        }
×
66
        err = f(tx)
9,161✔
67
        if err != nil {
9,191✔
68
                rollBackErr := tx.Rollback()
30✔
69
                if rollBackErr != nil {
30✔
70
                        m.logger.Error("transaction rollback error", tag.Error(rollBackErr))
×
71
                }
×
72
                return convertCommonErrors(m.db, operation, "", err)
30✔
73
        }
74
        if err := tx.Commit(); err != nil {
9,133✔
75
                return convertCommonErrors(m.db, operation, "Failed to commit transaction.", err)
×
76
        }
×
77
        return nil
9,133✔
78
}
79

80
func gobSerialize(x interface{}) ([]byte, error) {
×
81
        b := bytes.Buffer{}
×
82
        e := gob.NewEncoder(&b)
×
83
        err := e.Encode(x)
×
84
        if err != nil {
×
85
                return nil, &types.InternalServiceError{
×
86
                        Message: fmt.Sprintf("Error in serialization: %v", err),
×
87
                }
×
88
        }
×
89
        return b.Bytes(), nil
×
90
}
91

92
func gobDeserialize(a []byte, x interface{}) error {
×
93
        b := bytes.NewBuffer(a)
×
94
        d := gob.NewDecoder(b)
×
95
        err := d.Decode(x)
×
96

×
97
        if err != nil {
×
98
                return &types.InternalServiceError{
×
99
                        Message: fmt.Sprintf("Error in deserialization: %v", err),
×
100
                }
×
101
        }
×
102
        return nil
×
103
}
104

105
func serializePageToken(offset int64) []byte {
44✔
106
        b := make([]byte, 8)
44✔
107
        binary.LittleEndian.PutUint64(b, uint64(offset))
44✔
108
        return b
44✔
109
}
44✔
110

111
func deserializePageToken(payload []byte) (int64, error) {
30✔
112
        if len(payload) != 8 {
30✔
113
                return 0, fmt.Errorf("invalid token of %v length", len(payload))
×
114
        }
×
115
        return int64(binary.LittleEndian.Uint64(payload)), nil
30✔
116
}
117

118
func convertCommonErrors(
119
        errChecker sqlplugin.ErrorChecker,
120
        operation, message string,
121
        err error,
122
) error {
1,135✔
123
        switch err.(type) {
1,135✔
124
        case *persistence.ConditionFailedError,
125
                *persistence.CurrentWorkflowConditionFailedError,
126
                *persistence.WorkflowExecutionAlreadyStartedError,
127
                *persistence.ShardOwnershipLostError,
128
                *persistence.TimeoutError,
129
                *types.DomainAlreadyExistsError,
130
                *types.EntityNotExistsError,
131
                *types.ServiceBusyError,
132
                *types.InternalServiceError:
30✔
133
                return err
30✔
134
        }
135
        if errChecker.IsNotFoundError(err) {
1,163✔
136
                return &types.EntityNotExistsError{
56✔
137
                        Message: fmt.Sprintf("%v failed. %s Error: %v ", operation, message, err),
56✔
138
                }
56✔
139
        }
56✔
140

141
        if errChecker.IsTimeoutError(err) {
1,053✔
142
                return &persistence.TimeoutError{Msg: fmt.Sprintf("%v timed out. %s Error: %v", operation, message, err)}
×
143
        }
×
144

145
        if errChecker.IsThrottlingError(err) {
1,053✔
146
                return &types.ServiceBusyError{
×
147
                        Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
×
148
                }
×
149
        }
×
150

151
        return &types.InternalServiceError{
1,053✔
152
                Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
1,053✔
153
        }
1,053✔
154
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc