• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e2fd9-a00f-486a-badd-cbc5210ae61c

11 Mar 2024 11:29PM UTC coverage: 64.125% (+0.06%) from 64.068%
018e2fd9-a00f-486a-badd-cbc5210ae61c

push

buildkite

web-flow
refactor common/persistence/pinot tests (#5777)

* refactor common/persistence/pinot tests

* clean up the fmt

93488 of 145791 relevant lines covered (64.12%)

2357.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.75
/common/persistence/sql/common.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "bytes"
25
        "context"
26
        "encoding/binary"
27
        "encoding/gob"
28
        "fmt"
29

30
        "github.com/uber/cadence/common/log"
31
        "github.com/uber/cadence/common/log/tag"
32
        "github.com/uber/cadence/common/persistence"
33
        p "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/persistence/serialization"
35
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
36
        "github.com/uber/cadence/common/types"
37
)
38

39
type sqlStore struct {
40
        db     sqlplugin.DB
41
        logger log.Logger
42
        parser serialization.Parser
43
        dc     *p.DynamicConfiguration
44
}
45

46
func (m *sqlStore) GetName() string {
×
47
        return m.db.PluginName()
×
48
}
×
49

50
func (m *sqlStore) Close() {
194✔
51
        if m.db != nil {
388✔
52
                m.db.Close()
194✔
53
        }
194✔
54
}
55

56
func (m *sqlStore) useAsyncTransaction() bool {
×
57
        return m.db.SupportsAsyncTransaction() && m.dc != nil && m.dc.EnableSQLAsyncTransaction()
×
58
}
×
59

60
func (m *sqlStore) txExecute(ctx context.Context, dbShardID int, operation string, f func(tx sqlplugin.Tx) error) error {
8,154✔
61
        tx, err := m.db.BeginTx(ctx, dbShardID)
8,154✔
62
        if err != nil {
8,154✔
63
                return convertCommonErrors(m.db, operation, "Failed to start transaction.", err)
×
64
        }
×
65
        err = f(tx)
8,154✔
66
        if err != nil {
8,184✔
67
                rollBackErr := tx.Rollback()
30✔
68
                if rollBackErr != nil {
30✔
69
                        m.logger.Error("transaction rollback error", tag.Error(rollBackErr))
×
70
                }
×
71
                return convertCommonErrors(m.db, operation, "", err)
30✔
72
        }
73
        if err := tx.Commit(); err != nil {
8,126✔
74
                return convertCommonErrors(m.db, operation, "Failed to commit transaction.", err)
×
75
        }
×
76
        return nil
8,126✔
77
}
78

79
func gobSerialize(x interface{}) ([]byte, error) {
×
80
        b := bytes.Buffer{}
×
81
        e := gob.NewEncoder(&b)
×
82
        err := e.Encode(x)
×
83
        if err != nil {
×
84
                return nil, &types.InternalServiceError{
×
85
                        Message: fmt.Sprintf("Error in serialization: %v", err),
×
86
                }
×
87
        }
×
88
        return b.Bytes(), nil
×
89
}
90

91
func gobDeserialize(a []byte, x interface{}) error {
×
92
        b := bytes.NewBuffer(a)
×
93
        d := gob.NewDecoder(b)
×
94
        err := d.Decode(x)
×
95

×
96
        if err != nil {
×
97
                return &types.InternalServiceError{
×
98
                        Message: fmt.Sprintf("Error in deserialization: %v", err),
×
99
                }
×
100
        }
×
101
        return nil
×
102
}
103

104
func serializePageToken(offset int64) []byte {
44✔
105
        b := make([]byte, 8)
44✔
106
        binary.LittleEndian.PutUint64(b, uint64(offset))
44✔
107
        return b
44✔
108
}
44✔
109

110
func deserializePageToken(payload []byte) (int64, error) {
30✔
111
        if len(payload) != 8 {
30✔
112
                return 0, fmt.Errorf("invalid token of %v length", len(payload))
×
113
        }
×
114
        return int64(binary.LittleEndian.Uint64(payload)), nil
30✔
115
}
116

117
func convertCommonErrors(
118
        errChecker sqlplugin.ErrorChecker,
119
        operation, message string,
120
        err error,
121
) error {
792✔
122
        switch err.(type) {
792✔
123
        case *persistence.ConditionFailedError,
124
                *persistence.CurrentWorkflowConditionFailedError,
125
                *persistence.WorkflowExecutionAlreadyStartedError,
126
                *persistence.ShardOwnershipLostError,
127
                *persistence.TimeoutError,
128
                *types.DomainAlreadyExistsError,
129
                *types.EntityNotExistsError,
130
                *types.ServiceBusyError,
131
                *types.InternalServiceError:
30✔
132
                return err
30✔
133
        }
134
        if errChecker.IsNotFoundError(err) {
820✔
135
                return &types.EntityNotExistsError{
56✔
136
                        Message: fmt.Sprintf("%v failed. %s Error: %v", operation, message, err),
56✔
137
                }
56✔
138
        }
56✔
139

140
        if errChecker.IsTimeoutError(err) {
710✔
141
                return &persistence.TimeoutError{Msg: fmt.Sprintf("%v timed out. %s Error: %v", operation, message, err)}
×
142
        }
×
143

144
        if errChecker.IsThrottlingError(err) {
710✔
145
                return &types.ServiceBusyError{
×
146
                        Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
×
147
                }
×
148
        }
×
149

150
        return &types.InternalServiceError{
710✔
151
                Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
710✔
152
        }
710✔
153
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc