• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e8c46-474f-4f6b-80ab-751d576edd29

29 Mar 2024 10:13PM UTC coverage: 65.249% (-0.01%) from 65.259%
018e8c46-474f-4f6b-80ab-751d576edd29

push

buildkite

web-flow
Deadlock fix in acquireShards (#5825)

Fixing the deadlock demonstrated in #5824.

I decided to move the channel-writing entirely before consuming so it's a bit more accidental-change-resistant: some kinds of simple incorrect changes will lead to an _immediate_ deadlock every time, rather than a random chance of one.

And if someone _does_ want to move it after and go back to a smaller buffer, more code will have to be changed, so hopefully people will pay more attention to the concurrency risks involved.

More generally, the atomic shutdown stuff is _highly_ prone to causing this kind of error because there's no way to wait on it safely, and I would really love for us to get rid of it.

8 of 8 new or added lines in 1 file covered. (100.0%)

53 existing lines in 12 files now uncovered.

95414 of 146231 relevant lines covered (65.25%)

2380.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.68
/common/persistence/sql/sqlplugin/mysql/task.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package mysql
22

23
import (
24
        "context"
25
        "database/sql"
26
        "fmt"
27

28
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
29
)
30

31
const (
32
        taskListCreatePart = `INTO task_lists(shard_id, domain_id, name, task_type, range_id, data, data_encoding) ` +
33
                `VALUES (:shard_id, :domain_id, :name, :task_type, :range_id, :data, :data_encoding)`
34

35
        // (default range ID: initialRangeID == 1)
36
        createTaskListQry = `INSERT ` + taskListCreatePart
37

38
        updateTaskListQry = `UPDATE task_lists SET
39
range_id = :range_id,
40
data = :data,
41
data_encoding = :data_encoding
42
WHERE
43
shard_id = :shard_id AND
44
domain_id = :domain_id AND
45
name = :name AND
46
task_type = :task_type
47
`
48

49
        // This query uses pagination that is best understood by analogy to simple numbers.
50
        // Given a list of numbers
51
        //         111
52
        //        113
53
        //        122
54
        //        211
55
        // where the hundreds digit corresponds to domain_id, the tens digit
56
        // corresponds to name, and the ones digit corresponds to task_type,
57
        // Imagine recurring queries with a limit of 1.
58
        // For the second query to skip the first result and return 112, it must allow equal values in hundreds & tens, but it's OK because the ones digit is higher.
59
        // For the third query, the ones digit is now lower but that's irrelevant because the tens digit is greater.
60
        // For the fourth query, both the tens digit and ones digit are now lower but that's again irrelevant because now the hundreds digit is higher.
61
        // This technique is useful since the size of the table can easily change between calls, making SKIP an unreliable method, while other db-specific things like rowids are not portable
62
        listTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` +
63
                `FROM task_lists ` +
64
                `WHERE shard_id = ? AND ((domain_id = ? AND name = ? AND task_type > ?) OR (domain_id=? AND name > ?) OR (domain_id > ?)) ` +
65
                `ORDER BY domain_id,name,task_type LIMIT ?`
66

67
        getTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` +
68
                `FROM task_lists ` +
69
                `WHERE shard_id = ? AND domain_id = ? AND name = ? AND task_type = ?`
70

71
        deleteTaskListQry = `DELETE FROM task_lists WHERE shard_id=? AND domain_id=? AND name=? AND task_type=? AND range_id=?`
72

73
        lockTaskListQry = `SELECT range_id FROM task_lists ` +
74
                `WHERE shard_id = ? AND domain_id = ? AND name = ? AND task_type = ? FOR UPDATE`
75

76
        getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` +
77
                `FROM tasks ` +
78
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? AND task_id <= ? ` +
79
                ` ORDER BY task_id LIMIT ?`
80

81
        getTaskMinQry = `SELECT task_id, data, data_encoding ` +
82
                `FROM tasks ` +
83
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? ORDER BY task_id LIMIT ?`
84

85
        getTasksCountQry = `SELECT count(1) as count ` +
86
                `FROM tasks ` +
87
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ?`
88

89
        createTaskQry = `INSERT INTO ` +
90
                `tasks(domain_id, task_list_name, task_type, task_id, data, data_encoding) ` +
91
                `VALUES(:domain_id, :task_list_name, :task_type, :task_id, :data, :data_encoding)`
92

93
        deleteTaskQry = `DELETE FROM tasks ` +
94
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id = ?`
95

96
        rangeDeleteTaskQry = `DELETE FROM tasks ` +
97
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id <= ? ` +
98
                `ORDER BY domain_id,task_list_name,task_type,task_id LIMIT ?`
99

100
        getOrphanTaskQry = `SELECT task_id, domain_id, task_list_name, task_type FROM tasks AS t ` +
101
                `WHERE NOT EXISTS ( ` +
102
                `        SELECT domain_id, name, task_type FROM task_lists AS tl ` +
103
                `        WHERE t.domain_id=tl.domain_id and t.task_list_name=tl.name and t.task_type=tl.task_type ` +
104
                `) LIMIT ?;`
105
)
106

107
// InsertIntoTasks inserts one or more rows into tasks table
108
func (mdb *db) InsertIntoTasks(ctx context.Context, rows []sqlplugin.TasksRow) (sql.Result, error) {
300✔
109
        if len(rows) == 0 {
300✔
110
                return nil, nil
×
111
        }
×
112
        return mdb.driver.NamedExecContext(ctx, rows[0].ShardID, createTaskQry, rows)
300✔
113
}
114

115
// SelectFromTasks reads one or more rows from tasks table
116
func (mdb *db) SelectFromTasks(ctx context.Context, filter *sqlplugin.TasksFilter) ([]sqlplugin.TasksRow, error) {
299✔
117
        var err error
299✔
118
        var rows []sqlplugin.TasksRow
299✔
119
        switch {
299✔
120
        case filter.MaxTaskID != nil:
296✔
121
                err = mdb.driver.SelectContext(ctx, filter.ShardID, &rows, getTaskMinMaxQry, filter.DomainID,
296✔
122
                        filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.MaxTaskID, *filter.PageSize)
296✔
123
        default:
3✔
124
                err = mdb.driver.SelectContext(ctx, filter.ShardID, &rows, getTaskMinQry, filter.DomainID,
3✔
125
                        filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.PageSize)
3✔
126
        }
127
        if err != nil {
299✔
128
                return nil, err
×
129
        }
×
130
        return rows, err
299✔
131
}
132

133
// DeleteFromTasks deletes one or more rows from tasks table
134
func (mdb *db) DeleteFromTasks(ctx context.Context, filter *sqlplugin.TasksFilter) (sql.Result, error) {
164✔
135
        if filter.TaskIDLessThanEquals != nil {
323✔
136
                if filter.Limit == nil || *filter.Limit == 0 {
159✔
137
                        return nil, fmt.Errorf("missing limit parameter")
×
138
                }
×
139
                return mdb.driver.ExecContext(ctx, filter.ShardID, rangeDeleteTaskQry,
159✔
140
                        filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskIDLessThanEquals, *filter.Limit)
159✔
141
        }
142
        return mdb.driver.ExecContext(ctx, filter.ShardID, deleteTaskQry, filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskID)
5✔
143
}
144

145
func (mdb *db) GetOrphanTasks(ctx context.Context, filter *sqlplugin.OrphanTasksFilter) ([]sqlplugin.TaskKeyRow, error) {
3✔
146
        if filter.Limit == nil || *filter.Limit == 0 {
3✔
147
                return nil, fmt.Errorf("missing limit parameter")
×
148
        }
×
149
        var rows []sqlplugin.TaskKeyRow
3✔
150

3✔
151
        err := mdb.driver.SelectContext(ctx, sqlplugin.DbAllShards, &rows, getOrphanTaskQry, *filter.Limit)
3✔
152
        if err != nil {
3✔
153
                return nil, err
×
154
        }
×
155
        return rows, nil
3✔
156
}
157

158
// InsertIntoTaskLists inserts one or more rows into task_lists table
159
func (mdb *db) InsertIntoTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
453✔
160
        return mdb.driver.NamedExecContext(ctx, row.ShardID, createTaskListQry, row)
453✔
161
}
453✔
162

163
// UpdateTaskLists updates a row in task_lists table
164
func (mdb *db) UpdateTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
2,180✔
165
        return mdb.driver.NamedExecContext(ctx, row.ShardID, updateTaskListQry, row)
2,180✔
166
}
2,180✔
167

168
// SelectFromTaskLists reads one or more rows from task_lists table
169
func (mdb *db) SelectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
685✔
170
        switch {
685✔
171
        case filter.DomainID != nil && filter.Name != nil && filter.TaskType != nil:
474✔
172
                return mdb.selectFromTaskLists(ctx, filter)
474✔
173
        case filter.DomainIDGreaterThan != nil && filter.NameGreaterThan != nil && filter.TaskTypeGreaterThan != nil && filter.PageSize != nil:
211✔
174
                return mdb.rangeSelectFromTaskLists(ctx, filter)
211✔
175
        default:
×
176
                return nil, fmt.Errorf("invalid set of query filter params")
×
177
        }
178
}
179

180
func (mdb *db) selectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
474✔
181
        var err error
474✔
182
        var row sqlplugin.TaskListsRow
474✔
183
        err = mdb.driver.GetContext(ctx, filter.ShardID, &row, getTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType)
474✔
184
        if err != nil {
927✔
185
                return nil, err
453✔
186
        }
453✔
187
        return []sqlplugin.TaskListsRow{row}, err
21✔
188
}
189

190
func (mdb *db) rangeSelectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
211✔
191
        var err error
211✔
192
        var rows []sqlplugin.TaskListsRow
211✔
193
        err = mdb.driver.SelectContext(ctx, filter.ShardID, &rows, listTaskListQry,
211✔
194
                filter.ShardID, *filter.DomainIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.DomainIDGreaterThan, *filter.NameGreaterThan, *filter.DomainIDGreaterThan, *filter.PageSize)
211✔
195
        if err != nil {
211✔
196
                return nil, err
×
197
        }
×
198
        for i := range rows {
322✔
199
                rows[i].ShardID = filter.ShardID
111✔
200
        }
111✔
201
        return rows, nil
211✔
202
}
203

204
// DeleteFromTaskLists deletes a row from task_lists table
205
func (mdb *db) DeleteFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) (sql.Result, error) {
26✔
206
        return mdb.driver.ExecContext(ctx, filter.ShardID, deleteTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType, *filter.RangeID)
26✔
207
}
26✔
208

209
// LockTaskLists locks a row in task_lists table
210
func (mdb *db) LockTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) (int64, error) {
2,481✔
211
        var rangeID int64
2,481✔
212
        err := mdb.driver.GetContext(ctx, filter.ShardID, &rangeID, lockTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType)
2,481✔
213
        return rangeID, err
2,481✔
214
}
2,481✔
215

216
func (mdb *db) GetTasksCount(ctx context.Context, filter *sqlplugin.TasksFilter) (int64, error) {
1,267✔
217
        var size []int64
1,267✔
218
        if err := mdb.driver.SelectContext(ctx, filter.ShardID, &size, getTasksCountQry, filter.DomainID, filter.TaskListName, filter.TaskType, *filter.MinTaskID); err != nil {
1,267✔
UNCOV
219
                return 0, err
×
UNCOV
220
        }
×
221
        return size[0], nil
1,267✔
222
}
223

224
// InsertIntoTasksWithTTL is not supported in MySQL
225
func (mdb *db) InsertIntoTasksWithTTL(_ context.Context, _ []sqlplugin.TasksRowWithTTL) (sql.Result, error) {
×
226
        return nil, sqlplugin.ErrTTLNotSupported
×
227
}
×
228

229
// InsertIntoTaskListsWithTTL is not supported in MySQL
230
func (mdb *db) InsertIntoTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
×
231
        return nil, sqlplugin.ErrTTLNotSupported
×
232
}
×
233

234
// UpdateTaskListsWithTTL is not supported in MySQL
235
func (mdb *db) UpdateTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
×
236
        return nil, sqlplugin.ErrTTLNotSupported
×
237
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc