• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018fa9d1-f951-43c8-8e3e-3538fa7befdb

24 May 2024 08:57AM UTC coverage: 69.433% (-0.03%) from 69.466%
018fa9d1-f951-43c8-8e3e-3538fa7befdb

push

buildkite

web-flow
Added tests for GetTasks (#6048)

102357 of 147418 relevant lines covered (69.43%)

2579.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.61
/common/persistence/sql/common.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "bytes"
25
        "context"
26
        "encoding/binary"
27
        "encoding/gob"
28
        "fmt"
29

30
        "github.com/uber/cadence/common/log"
31
        "github.com/uber/cadence/common/log/tag"
32
        "github.com/uber/cadence/common/persistence"
33
        p "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/persistence/serialization"
35
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
36
        "github.com/uber/cadence/common/types"
37
)
38

39
type sqlStore struct {
40
        db     sqlplugin.DB
41
        logger log.Logger
42
        parser serialization.Parser
43
        dc     *p.DynamicConfiguration
44
}
45

46
func (m *sqlStore) GetName() string {
×
47
        return m.db.PluginName()
×
48
}
×
49

50
func (m *sqlStore) Close() {
290✔
51
        if m.db != nil {
580✔
52
                m.db.Close()
290✔
53
        }
290✔
54
}
55

56
func (m *sqlStore) useAsyncTransaction() bool {
×
57
        return m.db.SupportsAsyncTransaction() && m.dc != nil && m.dc.EnableSQLAsyncTransaction()
×
58
}
×
59

60
func (m *sqlStore) txExecute(ctx context.Context, dbShardID int, operation string, f func(tx sqlplugin.Tx) error) error {
8,839✔
61
        tx, err := m.db.BeginTx(ctx, dbShardID)
8,839✔
62
        if err != nil {
8,845✔
63
                return convertCommonErrors(m.db, operation, "Failed to start transaction.", err)
6✔
64
        }
6✔
65
        err = f(tx)
8,833✔
66
        if err != nil {
8,908✔
67
                rollBackErr := tx.Rollback()
75✔
68
                if rollBackErr != nil {
75✔
69
                        m.logger.Error("transaction rollback error", tag.Error(rollBackErr))
×
70
                }
×
71
                return convertCommonErrors(m.db, operation, "", err)
75✔
72
        }
73
        if err := tx.Commit(); err != nil {
8,761✔
74
                return convertCommonErrors(m.db, operation, "Failed to commit transaction.", err)
1✔
75
        }
1✔
76
        return nil
8,759✔
77
}
78

79
func gobSerialize(x interface{}) ([]byte, error) {
9✔
80
        b := bytes.Buffer{}
9✔
81
        e := gob.NewEncoder(&b)
9✔
82
        err := e.Encode(x)
9✔
83
        if err != nil {
10✔
84
                return nil, &types.InternalServiceError{
1✔
85
                        Message: fmt.Sprintf("Error in serialization: %v", err),
1✔
86
                }
1✔
87
        }
1✔
88
        return b.Bytes(), nil
8✔
89
}
90

91
func gobDeserialize(a []byte, x interface{}) error {
5✔
92
        b := bytes.NewBuffer(a)
5✔
93
        d := gob.NewDecoder(b)
5✔
94
        err := d.Decode(x)
5✔
95

5✔
96
        if err != nil {
6✔
97
                return &types.InternalServiceError{
1✔
98
                        Message: fmt.Sprintf("Error in deserialization: %v", err),
1✔
99
                }
1✔
100
        }
1✔
101
        return nil
4✔
102
}
103

104
func serializePageToken(offset int64) []byte {
73✔
105
        b := make([]byte, 8)
73✔
106
        binary.LittleEndian.PutUint64(b, uint64(offset))
73✔
107
        return b
73✔
108
}
73✔
109

110
func deserializePageToken(payload []byte) (int64, error) {
50✔
111
        if len(payload) != 8 {
51✔
112
                return 0, fmt.Errorf("invalid token of %v length", len(payload))
1✔
113
        }
1✔
114
        return int64(binary.LittleEndian.Uint64(payload)), nil
49✔
115
}
116

117
func convertCommonErrors(
118
        errChecker sqlplugin.ErrorChecker,
119
        operation, message string,
120
        err error,
121
) error {
253✔
122
        switch err.(type) {
253✔
123
        case *persistence.ConditionFailedError,
124
                *persistence.CurrentWorkflowConditionFailedError,
125
                *persistence.WorkflowExecutionAlreadyStartedError,
126
                *persistence.ShardOwnershipLostError,
127
                *persistence.TimeoutError,
128
                *types.DomainAlreadyExistsError,
129
                *types.EntityNotExistsError,
130
                *types.ServiceBusyError,
131
                *types.InternalServiceError:
53✔
132
                return err
53✔
133
        }
134
        if errChecker.IsNotFoundError(err) {
386✔
135
                return &types.EntityNotExistsError{
184✔
136
                        Message: fmt.Sprintf("%v failed. %s Error: %v", operation, message, err),
184✔
137
                }
184✔
138
        }
184✔
139

140
        if errChecker.IsTimeoutError(err) {
21✔
141
                return &persistence.TimeoutError{Msg: fmt.Sprintf("%v timed out. %s Error: %v", operation, message, err)}
3✔
142
        }
3✔
143

144
        if errChecker.IsThrottlingError(err) {
16✔
145
                return &types.ServiceBusyError{
1✔
146
                        Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
1✔
147
                }
1✔
148
        }
1✔
149

150
        return &types.InternalServiceError{
14✔
151
                Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
14✔
152
        }
14✔
153
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc