• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

08 May 2023 05:10PM UTC coverage: 57.153% (-0.07%) from 57.225%
0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

push

buildkite

GitHub
Update persistence layer to adopt idl update for isolation (#5254)

204 of 204 new or added lines in 15 files covered. (100.0%)

85781 of 150089 relevant lines covered (57.15%)

2419.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.65
/common/persistence/sql/common.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "bytes"
25
        "context"
26
        "encoding/binary"
27
        "encoding/gob"
28
        "fmt"
29

30
        "github.com/uber/cadence/common/log"
31
        "github.com/uber/cadence/common/log/tag"
32
        "github.com/uber/cadence/common/persistence"
33
        p "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/persistence/serialization"
35
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
36
        "github.com/uber/cadence/common/types"
37
)
38

39
// TODO: Rename all SQL Managers to Stores
40
type sqlStore struct {
41
        db     sqlplugin.DB
42
        logger log.Logger
43
        parser serialization.Parser
44
        dc     *p.DynamicConfiguration
45
}
46

47
func (m *sqlStore) GetName() string {
×
48
        return m.db.PluginName()
×
49
}
×
50

51
func (m *sqlStore) Close() {
170✔
52
        if m.db != nil {
340✔
53
                m.db.Close()
170✔
54
        }
170✔
55
}
56

57
func (m *sqlStore) useAsyncTransaction() bool {
2,934✔
58
        return m.db.SupportsAsyncTransaction() && m.dc != nil && m.dc.EnableSQLAsyncTransaction()
2,934✔
59
}
2,934✔
60

61
func (m *sqlStore) txExecute(ctx context.Context, dbShardID int, operation string, f func(tx sqlplugin.Tx) error) error {
9,184✔
62
        tx, err := m.db.BeginTx(ctx, dbShardID)
9,184✔
63
        if err != nil {
9,184✔
64
                return convertCommonErrors(m.db, operation, "Failed to start transaction.", err)
×
65
        }
×
66
        err = f(tx)
9,184✔
67
        if err != nil {
9,214✔
68
                rollBackErr := tx.Rollback()
30✔
69
                if rollBackErr != nil {
30✔
70
                        m.logger.Error("transaction rollback error", tag.Error(rollBackErr))
×
71
                }
×
72
                return convertCommonErrors(m.db, operation, "", err)
30✔
73
        }
74
        if err := tx.Commit(); err != nil {
9,156✔
75
                return convertCommonErrors(m.db, operation, "Failed to commit transaction.", err)
×
76
        }
×
77
        return nil
9,156✔
78
}
79

80
func gobSerialize(x interface{}) ([]byte, error) {
×
81
        b := bytes.Buffer{}
×
82
        e := gob.NewEncoder(&b)
×
83
        err := e.Encode(x)
×
84
        if err != nil {
×
85
                return nil, &types.InternalServiceError{
×
86
                        Message: fmt.Sprintf("Error in serialization: %v", err),
×
87
                }
×
88
        }
×
89
        return b.Bytes(), nil
×
90
}
91

92
func gobDeserialize(a []byte, x interface{}) error {
×
93
        b := bytes.NewBuffer(a)
×
94
        d := gob.NewDecoder(b)
×
95
        err := d.Decode(x)
×
96

×
97
        if err != nil {
×
98
                return &types.InternalServiceError{
×
99
                        Message: fmt.Sprintf("Error in deserialization: %v", err),
×
100
                }
×
101
        }
×
102
        return nil
×
103
}
104

105
func serializePageToken(offset int64) []byte {
44✔
106
        b := make([]byte, 8)
44✔
107
        binary.LittleEndian.PutUint64(b, uint64(offset))
44✔
108
        return b
44✔
109
}
44✔
110

111
func deserializePageToken(payload []byte) (int64, error) {
30✔
112
        if len(payload) != 8 {
30✔
113
                return 0, fmt.Errorf("invalid token of %v length", len(payload))
×
114
        }
×
115
        return int64(binary.LittleEndian.Uint64(payload)), nil
30✔
116
}
117

118
func convertCommonErrors(
119
        errChecker sqlplugin.ErrorChecker,
120
        operation, message string,
121
        err error,
122
) error {
1,195✔
123
        switch err.(type) {
1,195✔
124
        case *persistence.ConditionFailedError,
125
                *persistence.CurrentWorkflowConditionFailedError,
126
                *persistence.WorkflowExecutionAlreadyStartedError,
127
                *persistence.ShardOwnershipLostError,
128
                *persistence.TimeoutError,
129
                *types.DomainAlreadyExistsError,
130
                *types.EntityNotExistsError,
131
                *types.ServiceBusyError,
132
                *types.InternalServiceError:
30✔
133
                return err
30✔
134
        }
135
        if errChecker.IsNotFoundError(err) {
1,223✔
136
                return &types.EntityNotExistsError{
56✔
137
                        Message: fmt.Sprintf("%v failed. %s Error: %v ", operation, message, err),
56✔
138
                }
56✔
139
        }
56✔
140

141
        if errChecker.IsTimeoutError(err) {
1,113✔
142
                return &persistence.TimeoutError{Msg: fmt.Sprintf("%v timed out. %s Error: %v", operation, message, err)}
×
143
        }
×
144

145
        if errChecker.IsThrottlingError(err) {
1,113✔
146
                return &types.ServiceBusyError{
×
147
                        Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
×
148
                }
×
149
        }
×
150

151
        return &types.InternalServiceError{
1,113✔
152
                Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
1,113✔
153
        }
1,113✔
154
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc