• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01909e15-00cc-4a94-bf01-3784d6403d00

27 Jun 2024 01:09AM UTC coverage: 71.431% (-0.1%) from 71.557%
01909e15-00cc-4a94-bf01-3784d6403d00

push

buildkite

web-flow
Refactor/removing cross cluster feature (#6121)

## What changed?

This mostly* removes the cross-cluster feature.

## Background
The Cross-cluster feature was the ability to launch and interact with child workflows in another domain. It included the ability to start child workflows and signal them. The feature allowed child workflows to be launched in the target domain even if it was active in another region.

## Problems
The feature itself was something that very very few of our customers apparently needed, with very few customers interested in the problem of launching child workflows in another cluster, and zero who weren’t able to simply use an activity to make an RPC call to the other domain as one would with any normal workflow.
The feature-itself was quite resource intensive: It was pull-based; spinning up a polling stack which polled the other cluster for work, similar to the replication stack. This polling behaviour made the latency characteristics fairly unpredictable and used considerable DB resources, to the point that we just turned it off. The Uber/Cadence team resolved that were there sufficient demand for the feature in the future, a push based mechanism would probably be significantly preferable.
The feature itself added a nontrivial amount of complexity to the codebase in a few areas such as task processing and domain error handling which introduced difficult to understand bugs such as the child workflow dropping error #5919

Decision to deprecate and alternatives
As of releases June 2024, the feature will be removed. The Cadence team is not aware of any users of the feature outside Uber (as it was broken until mid 2021 anyway), but as an FYI, it will cease to be available.

If this behaviour is desirable, an easy workaround is as previously mentioned: Use an activity to launch or signal the workflows in the other domain and block as needed.

PR details
This is a fairly high-risk refactor so it'll take some time to ... (continued)

118 of 134 new or added lines in 9 files covered. (88.06%)

330 existing lines in 30 files now uncovered.

104674 of 146539 relevant lines covered (71.43%)

2619.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.68
/common/persistence/sql/sqlplugin/mysql/task.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package mysql
22

23
import (
24
        "context"
25
        "database/sql"
26
        "fmt"
27

28
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
29
)
30

31
const (
32
        taskListCreatePart = `INTO task_lists(shard_id, domain_id, name, task_type, range_id, data, data_encoding) ` +
33
                `VALUES (:shard_id, :domain_id, :name, :task_type, :range_id, :data, :data_encoding)`
34

35
        // (default range ID: initialRangeID == 1)
36
        createTaskListQry = `INSERT ` + taskListCreatePart
37

38
        updateTaskListQry = `UPDATE task_lists SET
39
range_id = :range_id,
40
data = :data,
41
data_encoding = :data_encoding
42
WHERE
43
shard_id = :shard_id AND
44
domain_id = :domain_id AND
45
name = :name AND
46
task_type = :task_type
47
`
48

49
        // This query uses pagination that is best understood by analogy to simple numbers.
50
        // Given a list of numbers
51
        //         111
52
        //        113
53
        //        122
54
        //        211
55
        // where the hundreds digit corresponds to domain_id, the tens digit
56
        // corresponds to name, and the ones digit corresponds to task_type,
57
        // Imagine recurring queries with a limit of 1.
58
        // For the second query to skip the first result and return 112, it must allow equal values in hundreds & tens, but it's OK because the ones digit is higher.
59
        // For the third query, the ones digit is now lower but that's irrelevant because the tens digit is greater.
60
        // For the fourth query, both the tens digit and ones digit are now lower but that's again irrelevant because now the hundreds digit is higher.
61
        // This technique is useful since the size of the table can easily change between calls, making SKIP an unreliable method, while other db-specific things like rowids are not portable
62
        listTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` +
63
                `FROM task_lists ` +
64
                `WHERE shard_id = ? AND ((domain_id = ? AND name = ? AND task_type > ?) OR (domain_id=? AND name > ?) OR (domain_id > ?)) ` +
65
                `ORDER BY domain_id,name,task_type LIMIT ?`
66

67
        getTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` +
68
                `FROM task_lists ` +
69
                `WHERE shard_id = ? AND domain_id = ? AND name = ? AND task_type = ?`
70

71
        deleteTaskListQry = `DELETE FROM task_lists WHERE shard_id=? AND domain_id=? AND name=? AND task_type=? AND range_id=?`
72

73
        lockTaskListQry = `SELECT range_id FROM task_lists ` +
74
                `WHERE shard_id = ? AND domain_id = ? AND name = ? AND task_type = ? FOR UPDATE`
75

76
        getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` +
77
                `FROM tasks ` +
78
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? AND task_id <= ? ` +
79
                ` ORDER BY task_id LIMIT ?`
80

81
        getTaskMinQry = `SELECT task_id, data, data_encoding ` +
82
                `FROM tasks ` +
83
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ? ORDER BY task_id LIMIT ?`
84

85
        getTasksCountQry = `SELECT count(1) as count ` +
86
                `FROM tasks ` +
87
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id > ?`
88

89
        createTaskQry = `INSERT INTO ` +
90
                `tasks(domain_id, task_list_name, task_type, task_id, data, data_encoding) ` +
91
                `VALUES(:domain_id, :task_list_name, :task_type, :task_id, :data, :data_encoding)`
92

93
        deleteTaskQry = `DELETE FROM tasks ` +
94
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id = ?`
95

96
        rangeDeleteTaskQry = `DELETE FROM tasks ` +
97
                `WHERE domain_id = ? AND task_list_name = ? AND task_type = ? AND task_id <= ? ` +
98
                `ORDER BY domain_id,task_list_name,task_type,task_id LIMIT ?`
99

100
        getOrphanTaskQry = `SELECT task_id, domain_id, task_list_name, task_type FROM tasks AS t ` +
101
                `WHERE NOT EXISTS ( ` +
102
                `        SELECT domain_id, name, task_type FROM task_lists AS tl ` +
103
                `        WHERE t.domain_id=tl.domain_id and t.task_list_name=tl.name and t.task_type=tl.task_type ` +
104
                `) LIMIT ?;`
105
)
106

107
// InsertIntoTasks inserts one or more rows into tasks table
108
func (mdb *db) InsertIntoTasks(ctx context.Context, rows []sqlplugin.TasksRow) (sql.Result, error) {
308✔
109
        if len(rows) == 0 {
308✔
110
                return nil, nil
×
111
        }
×
112
        return mdb.driver.NamedExecContext(ctx, rows[0].ShardID, createTaskQry, rows)
308✔
113
}
114

115
// SelectFromTasks reads one or more rows from tasks table
116
func (mdb *db) SelectFromTasks(ctx context.Context, filter *sqlplugin.TasksFilter) ([]sqlplugin.TasksRow, error) {
305✔
117
        var err error
305✔
118
        var rows []sqlplugin.TasksRow
305✔
119
        switch {
305✔
120
        case filter.MaxTaskID != nil:
302✔
121
                err = mdb.driver.SelectContext(ctx, filter.ShardID, &rows, getTaskMinMaxQry, filter.DomainID,
302✔
122
                        filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.MaxTaskID, *filter.PageSize)
302✔
123
        default:
3✔
124
                err = mdb.driver.SelectContext(ctx, filter.ShardID, &rows, getTaskMinQry, filter.DomainID,
3✔
125
                        filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.PageSize)
3✔
126
        }
127
        if err != nil {
305✔
128
                return nil, err
×
129
        }
×
130
        return rows, err
305✔
131
}
132

133
// DeleteFromTasks deletes one or more rows from tasks table
134
func (mdb *db) DeleteFromTasks(ctx context.Context, filter *sqlplugin.TasksFilter) (sql.Result, error) {
169✔
135
        if filter.TaskIDLessThanEquals != nil {
333✔
136
                if filter.Limit == nil || *filter.Limit == 0 {
164✔
137
                        return nil, fmt.Errorf("missing limit parameter")
×
138
                }
×
139
                return mdb.driver.ExecContext(ctx, filter.ShardID, rangeDeleteTaskQry,
164✔
140
                        filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskIDLessThanEquals, *filter.Limit)
164✔
141
        }
142
        return mdb.driver.ExecContext(ctx, filter.ShardID, deleteTaskQry, filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskID)
5✔
143
}
144

145
func (mdb *db) GetOrphanTasks(ctx context.Context, filter *sqlplugin.OrphanTasksFilter) ([]sqlplugin.TaskKeyRow, error) {
3✔
146
        if filter.Limit == nil || *filter.Limit == 0 {
3✔
147
                return nil, fmt.Errorf("missing limit parameter")
×
148
        }
×
149
        var rows []sqlplugin.TaskKeyRow
3✔
150

3✔
151
        err := mdb.driver.SelectContext(ctx, sqlplugin.DbAllShards, &rows, getOrphanTaskQry, *filter.Limit)
3✔
152
        if err != nil {
3✔
153
                return nil, err
×
154
        }
×
155
        return rows, nil
3✔
156
}
157

158
// InsertIntoTaskLists inserts one or more rows into task_lists table
159
func (mdb *db) InsertIntoTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
456✔
160
        return mdb.driver.NamedExecContext(ctx, row.ShardID, createTaskListQry, row)
456✔
161
}
456✔
162

163
// UpdateTaskLists updates a row in task_lists table
164
func (mdb *db) UpdateTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
2,146✔
165
        return mdb.driver.NamedExecContext(ctx, row.ShardID, updateTaskListQry, row)
2,146✔
166
}
2,146✔
167

168
// SelectFromTaskLists reads one or more rows from task_lists table
169
func (mdb *db) SelectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
646✔
170
        switch {
646✔
171
        case filter.DomainID != nil && filter.Name != nil && filter.TaskType != nil:
475✔
172
                return mdb.selectFromTaskLists(ctx, filter)
475✔
173
        case filter.DomainIDGreaterThan != nil && filter.NameGreaterThan != nil && filter.TaskTypeGreaterThan != nil && filter.PageSize != nil:
171✔
174
                return mdb.rangeSelectFromTaskLists(ctx, filter)
171✔
175
        default:
×
176
                return nil, fmt.Errorf("invalid set of query filter params")
×
177
        }
178
}
179

180
func (mdb *db) selectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
475✔
181
        var err error
475✔
182
        var row sqlplugin.TaskListsRow
475✔
183
        err = mdb.driver.GetContext(ctx, filter.ShardID, &row, getTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType)
475✔
184
        if err != nil {
931✔
185
                return nil, err
456✔
186
        }
456✔
187
        return []sqlplugin.TaskListsRow{row}, err
19✔
188
}
189

190
func (mdb *db) rangeSelectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
171✔
191
        var err error
171✔
192
        var rows []sqlplugin.TaskListsRow
171✔
193
        err = mdb.driver.SelectContext(ctx, filter.ShardID, &rows, listTaskListQry,
171✔
194
                filter.ShardID, *filter.DomainIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.DomainIDGreaterThan, *filter.NameGreaterThan, *filter.DomainIDGreaterThan, *filter.PageSize)
171✔
195
        if err != nil {
171✔
196
                return nil, err
×
197
        }
×
198
        for i := range rows {
282✔
199
                rows[i].ShardID = filter.ShardID
111✔
200
        }
111✔
201
        return rows, nil
171✔
202
}
203

204
// DeleteFromTaskLists deletes a row from task_lists table
205
func (mdb *db) DeleteFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) (sql.Result, error) {
26✔
206
        return mdb.driver.ExecContext(ctx, filter.ShardID, deleteTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType, *filter.RangeID)
26✔
207
}
26✔
208

209
// LockTaskLists locks a row in task_lists table
210
func (mdb *db) LockTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) (int64, error) {
2,455✔
211
        var rangeID int64
2,455✔
212
        err := mdb.driver.GetContext(ctx, filter.ShardID, &rangeID, lockTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType)
2,455✔
213
        return rangeID, err
2,455✔
214
}
2,455✔
215

216
func (mdb *db) GetTasksCount(ctx context.Context, filter *sqlplugin.TasksFilter) (int64, error) {
1,230✔
217
        var size []int64
1,230✔
218
        if err := mdb.driver.SelectContext(ctx, filter.ShardID, &size, getTasksCountQry, filter.DomainID, filter.TaskListName, filter.TaskType, *filter.MinTaskID); err != nil {
1,230✔
UNCOV
219
                return 0, err
×
UNCOV
220
        }
×
221
        return size[0], nil
1,230✔
222
}
223

224
// InsertIntoTasksWithTTL is not supported in MySQL
225
func (mdb *db) InsertIntoTasksWithTTL(_ context.Context, _ []sqlplugin.TasksRowWithTTL) (sql.Result, error) {
×
226
        return nil, sqlplugin.ErrTTLNotSupported
×
227
}
×
228

229
// InsertIntoTaskListsWithTTL is not supported in MySQL
230
func (mdb *db) InsertIntoTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
×
231
        return nil, sqlplugin.ErrTTLNotSupported
×
232
}
×
233

234
// UpdateTaskListsWithTTL is not supported in MySQL
235
func (mdb *db) UpdateTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
×
236
        return nil, sqlplugin.ErrTTLNotSupported
×
237
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc