• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01907465-3a13-43f8-89cc-21fc60d6d529

02 Jul 2024 05:01PM UTC coverage: 71.52% (-0.002%) from 71.522%
01907465-3a13-43f8-89cc-21fc60d6d529

push

buildkite

web-flow
bugfix: bad dynamicconfig filter/string mapping (#6151)

This logic desperately needs to be refactored, it's incredibly error-prone :\
We should probably just use enumer's codegen tbh.  Or something similar.

Previously `String()` missed both `workflowType` and `ratelimitKey`.
The `String()` impl is now rewritten so it won't be missed with future additions, and there's a test to check it too.

3 of 3 new or added lines in 1 file covered. (100.0%)

25 existing lines in 10 files now uncovered.

105314 of 147252 relevant lines covered (71.52%)

2653.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.68
/common/persistence/sql/sqlplugin/mysql/task.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package mysql
22

23
import (
24
        "context"
25
        "database/sql"
26
        "fmt"
27

28
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
29
)
30

31
const (
32
        taskListCreatePart = `INTO task_lists(shard_id, domain_id, name, task_type, range_id, data, data_encoding) ` +
33
                `VALUES (:shard_id, :domain_id, :name, :task_type, :range_id, :data, :data_encoding)`
34

35
        // (default range ID: initialRangeID == 1)
36
        createTaskListQry = `INSERT ` + taskListCreatePart
37

38
        updateTaskListQry = `UPDATE task_lists SET
39
range_id = :range_id,
40
data = :data,
41
data_encoding = :data_encoding
42
WHERE
43
shard_id = :shard_id AND
44
domain_id = :domain_id AND
45
name = :name AND
46
task_type = :task_type
47
`
48

49
        // This query uses pagination that is best understood by analogy to simple numbers.
50
        // Given a list of numbers
51
        //         111
52
        //        113
53
        //        122
54
        //        211
55
        // where the hundreds digit corresponds to domain_id, the tens digit
56
        // corresponds to name, and the ones digit corresponds to task_type,
57
        // Imagine recurring queries with a limit of 1.
58
        // For the second query to skip the first result and return 112, it must allow equal values in hundreds & tens, but it's OK because the ones digit is higher.
59
        // For the third query, the ones digit is now lower but that's irrelevant because the tens digit is greater.
60
        // For the fourth query, both the tens digit and ones digit are now lower but that's again irrelevant because now the hundreds digit is higher.
61
        // This technique is useful since the size of the table can easily change between calls, making SKIP an unreliable method, while other db-specific things like rowids are not portable
62
        listTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` +
63
                `FROM task_lists ` +
64
                `WHERE shard_id = ? AND ((domain_id = ? AND name = ? AND task_type > ?) OR (domain_id=? AND name > ?) OR (domain_id > ?)) ` +
65
                `ORDER BY domain_id,name,task_type LIMIT ?`
66

67
        getTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` +
68
                `FROM task_lists ` +
69
                `WHERE shard_id = ? AND domain_id = ? AND name = ? AND task_type = ?`
70

71
        deleteTaskListQry = `DELETE FROM task_lists WHERE shard_id=? AND domain_id=? AND name=? AND task_type=? AND range_id=?`
72

73
        lockTaskListQry = `SELECT range_id FROM task_lists ` +
74
                `WHERE shard_id = ? AND domain_id = ? AND name = ? AND task_type = ? FOR UPDATE`
75

76
        getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` +
77
                `FROM tasks ` +
78
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? AND task_id <= ? ` +
79
                ` ORDER BY task_id LIMIT ?`
80

81
        getTaskMinQry = `SELECT task_id, data, data_encoding ` +
82
                `FROM tasks ` +
83
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? ORDER BY task_id LIMIT ?`
84

85
        getTasksCountQry = `SELECT count(1) as count ` +
86
                `FROM tasks ` +
87
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ?`
88

89
        createTaskQry = `INSERT INTO ` +
90
                `tasks(domain_id, task_list_name, task_type, task_id, data, data_encoding) ` +
91
                `VALUES(:domain_id, :task_list_name, :task_type, :task_id, :data, :data_encoding)`
92

93
        deleteTaskQry = `DELETE FROM tasks ` +
94
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id = ?`
95

96
        rangeDeleteTaskQry = `DELETE FROM tasks ` +
97
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id <= ? ` +
98
                `ORDER BY domain_id,task_list_name,task_type,task_id LIMIT ?`
99

100
        getOrphanTaskQry = `SELECT task_id, domain_id, task_list_name, task_type FROM tasks AS t ` +
101
                `WHERE NOT EXISTS ( ` +
102
                `        SELECT domain_id, name, task_type FROM task_lists AS tl ` +
103
                `        WHERE t.domain_id=tl.domain_id and t.task_list_name=tl.name and t.task_type=tl.task_type ` +
104
                `) LIMIT ?;`
105
)
106

107
// InsertIntoTasks inserts one or more rows into tasks table
108
func (mdb *db) InsertIntoTasks(ctx context.Context, rows []sqlplugin.TasksRow) (sql.Result, error) {
291✔
109
        if len(rows) == 0 {
291✔
110
                return nil, nil
×
111
        }
×
112
        return mdb.driver.NamedExecContext(ctx, rows[0].ShardID, createTaskQry, rows)
291✔
113
}
114

115
// SelectFromTasks reads one or more rows from tasks table
116
func (mdb *db) SelectFromTasks(ctx context.Context, filter *sqlplugin.TasksFilter) ([]sqlplugin.TasksRow, error) {
295✔
117
        var err error
295✔
118
        var rows []sqlplugin.TasksRow
295✔
119
        switch {
295✔
120
        case filter.MaxTaskID != nil:
292✔
121
                err = mdb.driver.SelectContext(ctx, filter.ShardID, &rows, getTaskMinMaxQry, filter.DomainID,
292✔
122
                        filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.MaxTaskID, *filter.PageSize)
292✔
123
        default:
3✔
124
                err = mdb.driver.SelectContext(ctx, filter.ShardID, &rows, getTaskMinQry, filter.DomainID,
3✔
125
                        filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.PageSize)
3✔
126
        }
127
        if err != nil {
295✔
128
                return nil, err
×
129
        }
×
130
        return rows, err
295✔
131
}
132

133
// DeleteFromTasks deletes one or more rows from tasks table
134
func (mdb *db) DeleteFromTasks(ctx context.Context, filter *sqlplugin.TasksFilter) (sql.Result, error) {
154✔
135
        if filter.TaskIDLessThanEquals != nil {
303✔
136
                if filter.Limit == nil || *filter.Limit == 0 {
149✔
137
                        return nil, fmt.Errorf("missing limit parameter")
×
138
                }
×
139
                return mdb.driver.ExecContext(ctx, filter.ShardID, rangeDeleteTaskQry,
149✔
140
                        filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskIDLessThanEquals, *filter.Limit)
149✔
141
        }
142
        return mdb.driver.ExecContext(ctx, filter.ShardID, deleteTaskQry, filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskID)
5✔
143
}
144

145
func (mdb *db) GetOrphanTasks(ctx context.Context, filter *sqlplugin.OrphanTasksFilter) ([]sqlplugin.TaskKeyRow, error) {
3✔
146
        if filter.Limit == nil || *filter.Limit == 0 {
3✔
147
                return nil, fmt.Errorf("missing limit parameter")
×
148
        }
×
149
        var rows []sqlplugin.TaskKeyRow
3✔
150

3✔
151
        err := mdb.driver.SelectContext(ctx, sqlplugin.DbAllShards, &rows, getOrphanTaskQry, *filter.Limit)
3✔
152
        if err != nil {
3✔
153
                return nil, err
×
154
        }
×
155
        return rows, nil
3✔
156
}
157

158
// InsertIntoTaskLists inserts one or more rows into task_lists table
159
func (mdb *db) InsertIntoTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
462✔
160
        return mdb.driver.NamedExecContext(ctx, row.ShardID, createTaskListQry, row)
462✔
161
}
462✔
162

163
// UpdateTaskLists updates a row in task_lists table
164
func (mdb *db) UpdateTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
2,253✔
165
        return mdb.driver.NamedExecContext(ctx, row.ShardID, updateTaskListQry, row)
2,253✔
166
}
2,253✔
167

168
// SelectFromTaskLists reads one or more rows from task_lists table
169
func (mdb *db) SelectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
658✔
170
        switch {
658✔
171
        case filter.DomainID != nil && filter.Name != nil && filter.TaskType != nil:
487✔
172
                return mdb.selectFromTaskLists(ctx, filter)
487✔
173
        case filter.DomainIDGreaterThan != nil && filter.NameGreaterThan != nil && filter.TaskTypeGreaterThan != nil && filter.PageSize != nil:
171✔
174
                return mdb.rangeSelectFromTaskLists(ctx, filter)
171✔
175
        default:
×
176
                return nil, fmt.Errorf("invalid set of query filter params")
×
177
        }
178
}
179

180
func (mdb *db) selectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
487✔
181
        var err error
487✔
182
        var row sqlplugin.TaskListsRow
487✔
183
        err = mdb.driver.GetContext(ctx, filter.ShardID, &row, getTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType)
487✔
184
        if err != nil {
949✔
185
                return nil, err
462✔
186
        }
462✔
187
        return []sqlplugin.TaskListsRow{row}, err
25✔
188
}
189

190
func (mdb *db) rangeSelectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
171✔
191
        var err error
171✔
192
        var rows []sqlplugin.TaskListsRow
171✔
193
        err = mdb.driver.SelectContext(ctx, filter.ShardID, &rows, listTaskListQry,
171✔
194
                filter.ShardID, *filter.DomainIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.DomainIDGreaterThan, *filter.NameGreaterThan, *filter.DomainIDGreaterThan, *filter.PageSize)
171✔
195
        if err != nil {
171✔
196
                return nil, err
×
197
        }
×
198
        for i := range rows {
282✔
199
                rows[i].ShardID = filter.ShardID
111✔
200
        }
111✔
201
        return rows, nil
171✔
202
}
203

204
// DeleteFromTaskLists deletes a row from task_lists table
205
func (mdb *db) DeleteFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) (sql.Result, error) {
26✔
206
        return mdb.driver.ExecContext(ctx, filter.ShardID, deleteTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType, *filter.RangeID)
26✔
207
}
26✔
208

209
// LockTaskLists locks a row in task_lists table
210
func (mdb *db) LockTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) (int64, error) {
2,545✔
211
        var rangeID int64
2,545✔
212
        err := mdb.driver.GetContext(ctx, filter.ShardID, &rangeID, lockTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType)
2,545✔
213
        return rangeID, err
2,545✔
214
}
2,545✔
215

216
func (mdb *db) GetTasksCount(ctx context.Context, filter *sqlplugin.TasksFilter) (int64, error) {
1,317✔
217
        var size []int64
1,317✔
218
        if err := mdb.driver.SelectContext(ctx, filter.ShardID, &size, getTasksCountQry, filter.DomainID, filter.TaskListName, filter.TaskType, *filter.MinTaskID); err != nil {
1,317✔
UNCOV
219
                return 0, err
×
UNCOV
220
        }
×
221
        return size[0], nil
1,317✔
222
}
223

224
// InsertIntoTasksWithTTL is not supported in MySQL
225
func (mdb *db) InsertIntoTasksWithTTL(_ context.Context, _ []sqlplugin.TasksRowWithTTL) (sql.Result, error) {
×
226
        return nil, sqlplugin.ErrTTLNotSupported
×
227
}
×
228

229
// InsertIntoTaskListsWithTTL is not supported in MySQL
230
func (mdb *db) InsertIntoTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
×
231
        return nil, sqlplugin.ErrTTLNotSupported
×
232
}
×
233

234
// UpdateTaskListsWithTTL is not supported in MySQL
235
func (mdb *db) UpdateTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
×
236
        return nil, sqlplugin.ErrTTLNotSupported
×
237
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc