• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01909e15-00cc-4a94-bf01-3784d6403d00

27 Jun 2024 01:09AM UTC coverage: 71.431% (-0.1%) from 71.557%
01909e15-00cc-4a94-bf01-3784d6403d00

push

buildkite

web-flow
Refactor/removing cross cluster feature (#6121)

## What changed?

This mostly* removes the cross-cluster feature.

## Background
The Cross-cluster feature was the ability to launch and interact with child workflows in another domain. It included the ability to start child workflows and signal them. The feature allowed child workflows to be launched in the target domain even if it was active in another region.

## Problems
The feature itself was something that very very few of our customers apparently needed, with very few customers interested in the problem of launching child workflows in another cluster, and zero who weren’t able to simply use an activity to make an RPC call to the other domain as one would with any normal workflow.
The feature-itself was quite resource intensive: It was pull-based; spinning up a polling stack which polled the other cluster for work, similar to the replication stack. This polling behaviour made the latency characteristics fairly unpredictable and used considerable DB resources, to the point that we just turned it off. The Uber/Cadence team resolved that were there sufficient demand for the feature in the future, a push based mechanism would probably be significantly preferable.
The feature itself added a nontrivial amount of complexity to the codebase in a few areas such as task processing and domain error handling which introduced difficult to understand bugs such as the child workflow dropping error #5919

Decision to deprecate and alternatives
As of releases June 2024, the feature will be removed. The Cadence team is not aware of any users of the feature outside Uber (as it was broken until mid 2021 anyway), but as an FYI, it will cease to be available.

If this behaviour is desirable, an easy workaround is as previously mentioned: Use an activity to launch or signal the workflows in the other domain and block as needed.

PR details
This is a fairly high-risk refactor so it'll take some time to ... (continued)

118 of 134 new or added lines in 9 files covered. (88.06%)

330 existing lines in 30 files now uncovered.

104674 of 146539 relevant lines covered (71.43%)

2619.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.2
/common/persistence/sql/sqlplugin/mysql/execution.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package mysql
22

23
import (
24
        "context"
25
        "database/sql"
26

27
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
28
)
29

30
const (
31
        executionsColumns = `shard_id, domain_id, workflow_id, run_id, next_event_id, last_write_version, data, data_encoding`
32

33
        createExecutionQuery = `INSERT INTO executions(` + executionsColumns + `)
34
 VALUES(:shard_id, :domain_id, :workflow_id, :run_id, :next_event_id, :last_write_version, :data, :data_encoding)`
35

36
        updateExecutionQuery = `UPDATE executions SET
37
 next_event_id = :next_event_id, last_write_version = :last_write_version, data = :data, data_encoding = :data_encoding
38
 WHERE shard_id = :shard_id AND domain_id = :domain_id AND workflow_id = :workflow_id AND run_id = :run_id`
39

40
        getExecutionQuery = `SELECT ` + executionsColumns + ` FROM executions
41
 WHERE shard_id = ? AND domain_id = ? AND workflow_id = ? AND run_id = ?`
42

43
        listExecutionQuery = `SELECT ` + executionsColumns + ` FROM executions
44
 WHERE shard_id = ? AND workflow_id > ? ORDER BY workflow_id LIMIT ?`
45

46
        deleteExecutionQuery = `DELETE FROM executions
47
 WHERE shard_id = ? AND domain_id = ? AND workflow_id = ? AND run_id = ?`
48

49
        lockExecutionQueryBase = `SELECT next_event_id FROM executions
50
 WHERE shard_id = ? AND domain_id = ? AND workflow_id = ? AND run_id = ?`
51

52
        writeLockExecutionQuery = lockExecutionQueryBase + ` FOR UPDATE`
53
        readLockExecutionQuery  = lockExecutionQueryBase + ` LOCK IN SHARE MODE`
54

55
        createCurrentExecutionQuery = `INSERT INTO current_executions
56
(shard_id, domain_id, workflow_id, run_id, create_request_id, state, close_status, start_version, last_write_version) VALUES
57
(:shard_id, :domain_id, :workflow_id, :run_id, :create_request_id, :state, :close_status, :start_version, :last_write_version)`
58

59
        deleteCurrentExecutionQuery = "DELETE FROM current_executions WHERE shard_id=? AND domain_id=? AND workflow_id=? AND run_id=?"
60

61
        getCurrentExecutionQuery = `SELECT
62
shard_id, domain_id, workflow_id, run_id, create_request_id, state, close_status, start_version, last_write_version
63
FROM current_executions WHERE shard_id = ? AND domain_id = ? AND workflow_id = ?`
64

65
        lockCurrentExecutionJoinExecutionsQuery = `SELECT
66
ce.shard_id, ce.domain_id, ce.workflow_id, ce.run_id, ce.create_request_id, ce.state, ce.close_status, ce.start_version, e.last_write_version
67
FROM current_executions ce
68
INNER JOIN executions e ON e.shard_id = ce.shard_id AND e.domain_id = ce.domain_id AND e.workflow_id = ce.workflow_id AND e.run_id = ce.run_id
69
WHERE ce.shard_id = ? AND ce.domain_id = ? AND ce.workflow_id = ? FOR UPDATE`
70

71
        lockCurrentExecutionQuery = getCurrentExecutionQuery + ` FOR UPDATE`
72

73
        updateCurrentExecutionsQuery = `UPDATE current_executions SET
74
run_id = :run_id,
75
create_request_id = :create_request_id,
76
state = :state,
77
close_status = :close_status,
78
start_version = :start_version,
79
last_write_version = :last_write_version
80
WHERE
81
shard_id = :shard_id AND
82
domain_id = :domain_id AND
83
workflow_id = :workflow_id
84
`
85

86
        getTransferTasksQuery = `SELECT task_id, data, data_encoding
87
FROM transfer_tasks WHERE shard_id = ? AND task_id > ? AND task_id <= ? ORDER BY shard_id, task_id LIMIT ?`
88

89
        createTransferTasksQuery = `INSERT INTO transfer_tasks(shard_id, task_id, data, data_encoding)
90
 VALUES(:shard_id, :task_id, :data, :data_encoding)`
91

92
        deleteTransferTaskQuery             = `DELETE FROM transfer_tasks WHERE shard_id = ? AND task_id = ?`
93
        rangeDeleteTransferTaskQuery        = `DELETE FROM transfer_tasks WHERE shard_id = ? AND task_id > ? AND task_id <= ?`
94
        rangeDeleteTransferTaskByBatchQuery = rangeDeleteTransferTaskQuery + ` ORDER BY task_id LIMIT ?`
95

96
        getCrossClusterTasksQuery = `SELECT task_id, data, data_encoding
97
FROM cross_cluster_tasks WHERE target_cluster = ? AND shard_id = ? AND task_id > ? AND task_id <= ? ORDER BY task_id LIMIT ?`
98

99
        createCrossClusterTasksQuery = `INSERT INTO cross_cluster_tasks(target_cluster, shard_id, task_id, data, data_encoding)
100
VALUES(:target_cluster, :shard_id, :task_id, :data, :data_encoding)`
101

102
        deleteCrossClusterTaskQuery             = `DELETE FROM cross_cluster_tasks WHERE target_cluster = ? AND shard_id = ? AND task_id = ?`
103
        rangeDeleteCrossClusterTaskQuery        = `DELETE FROM cross_cluster_tasks WHERE target_cluster = ? AND shard_id = ? AND task_id > ? AND task_id <= ?`
104
        rangeDeleteCrossClusterTaskByBatchQuery = rangeDeleteCrossClusterTaskQuery + ` ORDER BY task_id LIMIT ?`
105

106
        createTimerTasksQuery = `INSERT INTO timer_tasks (shard_id, visibility_timestamp, task_id, data, data_encoding)
107
  VALUES (:shard_id, :visibility_timestamp, :task_id, :data, :data_encoding)`
108

109
        getTimerTasksQuery = `SELECT visibility_timestamp, task_id, data, data_encoding FROM timer_tasks
110
  WHERE shard_id = ?
111
  AND ((visibility_timestamp >= ? AND task_id >= ?) OR visibility_timestamp > ?)
112
  AND visibility_timestamp < ?
113
  ORDER BY visibility_timestamp,task_id LIMIT ?`
114

115
        deleteTimerTaskQuery             = `DELETE FROM timer_tasks WHERE shard_id = ? AND visibility_timestamp = ? AND task_id = ?`
116
        rangeDeleteTimerTaskQuery        = `DELETE FROM timer_tasks WHERE shard_id = ? AND visibility_timestamp >= ? AND visibility_timestamp < ?`
117
        rangeDeleteTimerTaskByBatchQuery = rangeDeleteTimerTaskQuery + ` ORDER BY visibility_timestamp,task_id LIMIT ?`
118

119
        createReplicationTasksQuery = `INSERT INTO replication_tasks (shard_id, task_id, data, data_encoding)
120
  VALUES(:shard_id, :task_id, :data, :data_encoding)`
121

122
        getReplicationTasksQuery = `SELECT task_id, data, data_encoding FROM replication_tasks WHERE
123
shard_id = ? AND
124
task_id > ? AND
125
task_id <= ?
126
ORDER BY task_id LIMIT ?`
127

128
        deleteReplicationTaskQuery             = `DELETE FROM replication_tasks WHERE shard_id = ? AND task_id = ?`
129
        rangeDeleteReplicationTaskQuery        = `DELETE FROM replication_tasks WHERE shard_id = ? AND task_id <= ?`
130
        rangeDeleteReplicationTaskByBatchQuery = rangeDeleteReplicationTaskQuery + ` ORDER BY task_id LIMIT ?`
131

132
        getReplicationTasksDLQQuery = `SELECT task_id, data, data_encoding FROM replication_tasks_dlq WHERE
133
source_cluster_name = ? AND
134
shard_id = ? AND
135
task_id > ? AND
136
task_id <= ?
137
ORDER BY task_id LIMIT ?`
138

139
        getReplicationTaskDLQQuery = `SELECT count(1) as count FROM replication_tasks_dlq WHERE
140
source_cluster_name = ? AND
141
shard_id = ?`
142

143
        bufferedEventsColumns     = `shard_id, domain_id, workflow_id, run_id, data, data_encoding`
144
        createBufferedEventsQuery = `INSERT INTO buffered_events(` + bufferedEventsColumns + `)
145
VALUES (:shard_id, :domain_id, :workflow_id, :run_id, :data, :data_encoding)`
146

147
        deleteBufferedEventsQuery = `DELETE FROM buffered_events WHERE shard_id=? AND domain_id=? AND workflow_id=? AND run_id=?`
148
        getBufferedEventsQuery    = `SELECT data, data_encoding FROM buffered_events WHERE
149
shard_id=? AND domain_id=? AND workflow_id=? AND run_id=?`
150

151
        insertReplicationTaskDLQQuery = `
152
INSERT INTO replication_tasks_dlq
153
            (source_cluster_name,
154
             shard_id,
155
             task_id,
156
             data,
157
             data_encoding)
158
VALUES     (:source_cluster_name,
159
            :shard_id,
160
            :task_id,
161
            :data,
162
            :data_encoding)
163
`
164
        deleteReplicationTaskFromDLQQuery = `
165
        DELETE FROM replication_tasks_dlq
166
                WHERE source_cluster_name = ?
167
                AND shard_id = ?
168
                AND task_id = ?`
169

170
        rangeDeleteReplicationTaskFromDLQQuery = `
171
        DELETE FROM replication_tasks_dlq
172
                WHERE source_cluster_name = ?
173
                AND shard_id = ?
174
                AND task_id > ?
175
                AND task_id <= ?`
176
        rangeDeleteReplicationTaskFromDLQByBatchQuery = rangeDeleteReplicationTaskFromDLQQuery + ` ORDER BY task_id LIMIT ?`
177
)
178

179
// InsertIntoExecutions inserts a row into executions table
180
func (mdb *db) InsertIntoExecutions(ctx context.Context, row *sqlplugin.ExecutionsRow) (sql.Result, error) {
279✔
181
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(row.ShardID, mdb.GetTotalNumDBShards())
279✔
182
        return mdb.driver.NamedExecContext(ctx, dbShardID, createExecutionQuery, row)
279✔
183
}
279✔
184

185
// UpdateExecutions updates a single row in executions table
186
func (mdb *db) UpdateExecutions(ctx context.Context, row *sqlplugin.ExecutionsRow) (sql.Result, error) {
1,561✔
187
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(row.ShardID, mdb.GetTotalNumDBShards())
1,561✔
188
        return mdb.driver.NamedExecContext(ctx, dbShardID, updateExecutionQuery, row)
1,561✔
189
}
1,561✔
190

191
// SelectFromExecutions reads a single row from executions table
192
// The list execution query result is order by workflow ID only. It may returns duplicate record with pagination.
193
func (mdb *db) SelectFromExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) ([]sqlplugin.ExecutionsRow, error) {
493✔
194
        var rows []sqlplugin.ExecutionsRow
493✔
195
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
493✔
196
        var err error
493✔
197
        if len(filter.DomainID) == 0 && filter.Size > 0 {
493✔
198
                err = mdb.driver.SelectContext(ctx, dbShardID, &rows, listExecutionQuery, filter.ShardID, filter.WorkflowID, filter.Size)
×
199
                if err != nil {
×
200
                        return nil, err
×
201
                }
×
202
        } else {
493✔
203
                var row sqlplugin.ExecutionsRow
493✔
204
                err = mdb.driver.GetContext(ctx, dbShardID, &row, getExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
493✔
205
                if err != nil {
629✔
206
                        return nil, err
136✔
207
                }
136✔
208
                rows = append(rows, row)
358✔
209
        }
210

211
        return rows, err
358✔
212
}
213

214
// DeleteFromExecutions deletes a single row from executions table
215
func (mdb *db) DeleteFromExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) (sql.Result, error) {
23✔
216
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
23✔
217
        return mdb.driver.ExecContext(ctx, dbShardID, deleteExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
23✔
218
}
23✔
219

220
// ReadLockExecutions acquires a write lock on a single row in executions table
221
func (mdb *db) ReadLockExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) (int, error) {
×
222
        var nextEventID int
×
223
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
×
224
        err := mdb.driver.GetContext(ctx, dbShardID, &nextEventID, readLockExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
×
225
        return nextEventID, err
×
226
}
×
227

228
// WriteLockExecutions acquires a write lock on a single row in executions table
229
func (mdb *db) WriteLockExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) (int, error) {
1,575✔
230
        var nextEventID int
1,575✔
231
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
1,575✔
232
        err := mdb.driver.GetContext(ctx, dbShardID, &nextEventID, writeLockExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
1,575✔
233
        return nextEventID, err
1,575✔
234
}
1,575✔
235

236
// InsertIntoCurrentExecutions inserts a single row into current_executions table
237
func (mdb *db) InsertIntoCurrentExecutions(ctx context.Context, row *sqlplugin.CurrentExecutionsRow) (sql.Result, error) {
206✔
238
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(row.ShardID), mdb.GetTotalNumDBShards())
206✔
239
        return mdb.driver.NamedExecContext(ctx, dbShardID, createCurrentExecutionQuery, row)
206✔
240
}
206✔
241

242
// UpdateCurrentExecutions updates a single row in current_executions table
243
func (mdb *db) UpdateCurrentExecutions(ctx context.Context, row *sqlplugin.CurrentExecutionsRow) (sql.Result, error) {
1,575✔
244
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(row.ShardID), mdb.GetTotalNumDBShards())
1,575✔
245
        return mdb.driver.NamedExecContext(ctx, dbShardID, updateCurrentExecutionsQuery, row)
1,575✔
246
}
1,575✔
247

248
// SelectFromCurrentExecutions reads one or more rows from current_executions table
249
func (mdb *db) SelectFromCurrentExecutions(ctx context.Context, filter *sqlplugin.CurrentExecutionsFilter) (*sqlplugin.CurrentExecutionsRow, error) {
92✔
250
        var row sqlplugin.CurrentExecutionsRow
92✔
251
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), mdb.GetTotalNumDBShards())
92✔
252
        err := mdb.driver.GetContext(ctx, dbShardID, &row, getCurrentExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID)
92✔
253
        return &row, err
92✔
254
}
92✔
255

256
// DeleteFromCurrentExecutions deletes a single row in current_executions table
257
func (mdb *db) DeleteFromCurrentExecutions(ctx context.Context, filter *sqlplugin.CurrentExecutionsFilter) (sql.Result, error) {
20✔
258
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), mdb.GetTotalNumDBShards())
20✔
259
        return mdb.driver.ExecContext(ctx, dbShardID, deleteCurrentExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
20✔
260
}
20✔
261

262
// LockCurrentExecutions acquires a write lock on a single row in current_executions table
263
func (mdb *db) LockCurrentExecutions(ctx context.Context, filter *sqlplugin.CurrentExecutionsFilter) (*sqlplugin.CurrentExecutionsRow, error) {
1,578✔
264
        var row sqlplugin.CurrentExecutionsRow
1,578✔
265
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), mdb.GetTotalNumDBShards())
1,578✔
266
        err := mdb.driver.GetContext(ctx, dbShardID, &row, lockCurrentExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID)
1,578✔
267
        return &row, err
1,578✔
268
}
1,578✔
269

270
// LockCurrentExecutionsJoinExecutions joins a row in current_executions with executions table and acquires a
271
// write lock on the result
272
func (mdb *db) LockCurrentExecutionsJoinExecutions(ctx context.Context, filter *sqlplugin.CurrentExecutionsFilter) ([]sqlplugin.CurrentExecutionsRow, error) {
252✔
273
        var rows []sqlplugin.CurrentExecutionsRow
252✔
274
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), mdb.GetTotalNumDBShards())
252✔
275
        err := mdb.driver.SelectContext(ctx, dbShardID, &rows, lockCurrentExecutionJoinExecutionsQuery, filter.ShardID, filter.DomainID, filter.WorkflowID)
252✔
276
        return rows, err
252✔
277
}
252✔
278

279
// InsertIntoTransferTasks inserts one or more rows into transfer_tasks table
280
func (mdb *db) InsertIntoTransferTasks(ctx context.Context, rows []sqlplugin.TransferTasksRow) (sql.Result, error) {
832✔
281
        if len(rows) == 0 {
832✔
282
                return nil, nil
×
283
        }
×
284
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(rows[0].ShardID, mdb.GetTotalNumDBShards())
832✔
285
        return mdb.driver.NamedExecContext(ctx, dbShardID, createTransferTasksQuery, rows)
832✔
286
}
287

288
// SelectFromTransferTasks reads one or more rows from transfer_tasks table
289
func (mdb *db) SelectFromTransferTasks(ctx context.Context, filter *sqlplugin.TransferTasksFilter) ([]sqlplugin.TransferTasksRow, error) {
905✔
290
        var rows []sqlplugin.TransferTasksRow
905✔
291
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
905✔
292
        err := mdb.driver.SelectContext(ctx, dbShardID, &rows, getTransferTasksQuery, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize)
905✔
293
        if err != nil {
905✔
294
                return nil, err
×
295
        }
×
296
        return rows, err
905✔
297
}
298

299
// DeleteFromTransferTasks deletes one row from transfer_tasks table
300
func (mdb *db) DeleteFromTransferTasks(ctx context.Context, filter *sqlplugin.TransferTasksFilter) (sql.Result, error) {
70✔
301
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
70✔
302
        return mdb.driver.ExecContext(ctx, dbShardID, deleteTransferTaskQuery, filter.ShardID, filter.TaskID)
70✔
303
}
70✔
304

305
// RangeDeleteFromTransferTasks deletes multi rows from transfer_tasks table
306
func (mdb *db) RangeDeleteFromTransferTasks(ctx context.Context, filter *sqlplugin.TransferTasksFilter) (sql.Result, error) {
42✔
307
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
42✔
308
        if filter.PageSize > 0 {
84✔
309
                return mdb.driver.ExecContext(ctx, dbShardID, rangeDeleteTransferTaskByBatchQuery, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize)
42✔
310
        }
42✔
311
        return mdb.driver.ExecContext(ctx, dbShardID, rangeDeleteTransferTaskQuery, filter.ShardID, filter.MinTaskID, filter.MaxTaskID)
×
312
}
313

314
// InsertIntoCrossClusterTasks inserts one or more rows into cross_cluster_tasks table
315
func (mdb *db) InsertIntoCrossClusterTasks(ctx context.Context, rows []sqlplugin.CrossClusterTasksRow) (sql.Result, error) {
1✔
316
        if len(rows) == 0 {
1✔
317
                return nil, nil
×
318
        }
×
319
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(rows[0].ShardID, mdb.GetTotalNumDBShards())
1✔
320
        return mdb.driver.NamedExecContext(ctx, dbShardID, createCrossClusterTasksQuery, rows)
1✔
321
}
322

323
// SelectFromCrossClusterTasks reads one or more rows from cross_cluster_tasks table
UNCOV
324
func (mdb *db) SelectFromCrossClusterTasks(ctx context.Context, filter *sqlplugin.CrossClusterTasksFilter) ([]sqlplugin.CrossClusterTasksRow, error) {
×
UNCOV
325
        var rows []sqlplugin.CrossClusterTasksRow
×
UNCOV
326
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
×
UNCOV
327
        err := mdb.driver.SelectContext(ctx, dbShardID, &rows, getCrossClusterTasksQuery, filter.TargetCluster, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize)
×
UNCOV
328
        if err != nil {
×
329
                return nil, err
×
330
        }
×
UNCOV
331
        return rows, err
×
332
}
333

334
// DeleteFromCrossClusterTasks deletes one row from cross_cluster_tasks table
UNCOV
335
func (mdb *db) DeleteFromCrossClusterTasks(ctx context.Context, filter *sqlplugin.CrossClusterTasksFilter) (sql.Result, error) {
×
UNCOV
336
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
×
UNCOV
337
        return mdb.driver.ExecContext(ctx, dbShardID, deleteCrossClusterTaskQuery, filter.TargetCluster, filter.ShardID, filter.TaskID)
×
UNCOV
338
}
×
339

340
// RangeDeleteFromCrossClusterTasks deletes multi rows from cross_cluster_tasks table
UNCOV
341
func (mdb *db) RangeDeleteFromCrossClusterTasks(ctx context.Context, filter *sqlplugin.CrossClusterTasksFilter) (sql.Result, error) {
×
UNCOV
342
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
×
UNCOV
343
        if filter.PageSize > 0 {
×
UNCOV
344
                return mdb.driver.ExecContext(ctx, dbShardID, rangeDeleteCrossClusterTaskByBatchQuery, filter.TargetCluster, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize)
×
UNCOV
345
        }
×
346
        return mdb.driver.ExecContext(ctx, dbShardID, rangeDeleteCrossClusterTaskQuery, filter.TargetCluster, filter.ShardID, filter.MinTaskID, filter.MaxTaskID)
×
347
}
348

349
// InsertIntoTimerTasks inserts one or more rows into timer_tasks table
350
func (mdb *db) InsertIntoTimerTasks(ctx context.Context, rows []sqlplugin.TimerTasksRow) (sql.Result, error) {
1,044✔
351
        if len(rows) == 0 {
1,044✔
352
                return nil, nil
×
353
        }
×
354
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(rows[0].ShardID, mdb.GetTotalNumDBShards())
1,044✔
355
        for i := range rows {
2,126✔
356
                rows[i].VisibilityTimestamp = mdb.converter.ToMySQLDateTime(rows[i].VisibilityTimestamp)
1,082✔
357
        }
1,082✔
358
        return mdb.driver.NamedExecContext(ctx, dbShardID, createTimerTasksQuery, rows)
1,044✔
359
}
360

361
// SelectFromTimerTasks reads one or more rows from timer_tasks table
362
func (mdb *db) SelectFromTimerTasks(ctx context.Context, filter *sqlplugin.TimerTasksFilter) ([]sqlplugin.TimerTasksRow, error) {
1,104✔
363
        var rows []sqlplugin.TimerTasksRow
1,104✔
364
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
1,104✔
365
        filter.MinVisibilityTimestamp = mdb.converter.ToMySQLDateTime(filter.MinVisibilityTimestamp)
1,104✔
366
        filter.MaxVisibilityTimestamp = mdb.converter.ToMySQLDateTime(filter.MaxVisibilityTimestamp)
1,104✔
367
        err := mdb.driver.SelectContext(ctx, dbShardID, &rows, getTimerTasksQuery, filter.ShardID, filter.MinVisibilityTimestamp,
1,104✔
368
                filter.TaskID, filter.MinVisibilityTimestamp, filter.MaxVisibilityTimestamp, filter.PageSize)
1,104✔
369
        if err != nil {
1,104✔
370
                return nil, err
×
371
        }
×
372
        for i := range rows {
4,667✔
373
                rows[i].VisibilityTimestamp = mdb.converter.FromMySQLDateTime(rows[i].VisibilityTimestamp)
3,563✔
374
        }
3,563✔
375
        return rows, err
1,104✔
376
}
377

378
// DeleteFromTimerTasks deletes one row from timer_tasks table
379
func (mdb *db) DeleteFromTimerTasks(ctx context.Context, filter *sqlplugin.TimerTasksFilter) (sql.Result, error) {
5✔
380
        filter.VisibilityTimestamp = mdb.converter.ToMySQLDateTime(filter.VisibilityTimestamp)
5✔
381
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
5✔
382
        return mdb.driver.ExecContext(ctx, dbShardID, deleteTimerTaskQuery, filter.ShardID, filter.VisibilityTimestamp, filter.TaskID)
5✔
383
}
5✔
384

385
// RangeDeleteFromTimerTasks deletes multi rows from timer_tasks table
386
func (mdb *db) RangeDeleteFromTimerTasks(ctx context.Context, filter *sqlplugin.TimerTasksFilter) (sql.Result, error) {
19✔
387
        filter.MinVisibilityTimestamp = mdb.converter.ToMySQLDateTime(filter.MinVisibilityTimestamp)
19✔
388
        filter.MaxVisibilityTimestamp = mdb.converter.ToMySQLDateTime(filter.MaxVisibilityTimestamp)
19✔
389
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
19✔
390
        if filter.PageSize > 0 {
38✔
391
                return mdb.driver.ExecContext(ctx, dbShardID, rangeDeleteTimerTaskByBatchQuery, filter.ShardID, filter.MinVisibilityTimestamp, filter.MaxVisibilityTimestamp, filter.PageSize)
19✔
392
        }
19✔
393
        return mdb.driver.ExecContext(ctx, dbShardID, rangeDeleteTimerTaskQuery, filter.ShardID, filter.MinVisibilityTimestamp, filter.MaxVisibilityTimestamp)
×
394
}
395

396
// InsertIntoBufferedEvents inserts one or more rows into buffered_events table
397
func (mdb *db) InsertIntoBufferedEvents(ctx context.Context, rows []sqlplugin.BufferedEventsRow) (sql.Result, error) {
218✔
398
        if len(rows) == 0 {
218✔
399
                return nil, nil
×
400
        }
×
401
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(rows[0].ShardID, mdb.GetTotalNumDBShards())
218✔
402
        return mdb.driver.NamedExecContext(ctx, dbShardID, createBufferedEventsQuery, rows)
218✔
403
}
404

405
// SelectFromBufferedEvents reads one or more rows from buffered_events table
406
func (mdb *db) SelectFromBufferedEvents(ctx context.Context, filter *sqlplugin.BufferedEventsFilter) ([]sqlplugin.BufferedEventsRow, error) {
493✔
407
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
493✔
408
        var rows []sqlplugin.BufferedEventsRow
493✔
409
        err := mdb.driver.SelectContext(ctx, dbShardID, &rows, getBufferedEventsQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
493✔
410
        for i := 0; i < len(rows); i++ {
500✔
411
                rows[i].DomainID = filter.DomainID
7✔
412
                rows[i].WorkflowID = filter.WorkflowID
7✔
413
                rows[i].RunID = filter.RunID
7✔
414
                rows[i].ShardID = filter.ShardID
7✔
415
        }
7✔
416
        return rows, err
493✔
417
}
418

419
// DeleteFromBufferedEvents deletes one or more rows from buffered_events table
420
func (mdb *db) DeleteFromBufferedEvents(ctx context.Context, filter *sqlplugin.BufferedEventsFilter) (sql.Result, error) {
44✔
421
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
44✔
422
        return mdb.driver.ExecContext(ctx, dbShardID, deleteBufferedEventsQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
44✔
423
}
44✔
424

425
// InsertIntoReplicationTasks inserts one or more rows into replication_tasks table
426
func (mdb *db) InsertIntoReplicationTasks(ctx context.Context, rows []sqlplugin.ReplicationTasksRow) (sql.Result, error) {
4✔
427
        if len(rows) == 0 {
4✔
428
                return nil, nil
×
429
        }
×
430
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(rows[0].ShardID, mdb.GetTotalNumDBShards())
4✔
431
        return mdb.driver.NamedExecContext(ctx, dbShardID, createReplicationTasksQuery, rows)
4✔
432
}
433

434
// SelectFromReplicationTasks reads one or more rows from replication_tasks table
435
func (mdb *db) SelectFromReplicationTasks(ctx context.Context, filter *sqlplugin.ReplicationTasksFilter) ([]sqlplugin.ReplicationTasksRow, error) {
100✔
436
        var rows []sqlplugin.ReplicationTasksRow
100✔
437
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
100✔
438
        err := mdb.driver.SelectContext(ctx, dbShardID, &rows, getReplicationTasksQuery, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize)
100✔
439
        return rows, err
100✔
440
}
100✔
441

442
// DeleteFromReplicationTasks deletes one row from replication_tasks table
443
func (mdb *db) DeleteFromReplicationTasks(ctx context.Context, filter *sqlplugin.ReplicationTasksFilter) (sql.Result, error) {
5✔
444
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
5✔
445
        return mdb.driver.ExecContext(ctx, dbShardID, deleteReplicationTaskQuery, filter.ShardID, filter.TaskID)
5✔
446
}
5✔
447

448
// RangeDeleteFromReplicationTasks deletes multi rows from replication_tasks table
449
func (mdb *db) RangeDeleteFromReplicationTasks(ctx context.Context, filter *sqlplugin.ReplicationTasksFilter) (sql.Result, error) {
41✔
450
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
41✔
451
        if filter.PageSize > 0 {
82✔
452
                return mdb.driver.ExecContext(ctx, dbShardID, rangeDeleteReplicationTaskByBatchQuery, filter.ShardID, filter.InclusiveEndTaskID, filter.PageSize)
41✔
453
        }
41✔
454
        return mdb.driver.ExecContext(ctx, dbShardID, rangeDeleteReplicationTaskQuery, filter.ShardID, filter.InclusiveEndTaskID)
×
455
}
456

457
// InsertIntoReplicationTasksDLQ inserts one or more rows into replication_tasks_dlq table
458
func (mdb *db) InsertIntoReplicationTasksDLQ(ctx context.Context, row *sqlplugin.ReplicationTaskDLQRow) (sql.Result, error) {
4✔
459
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(row.ShardID, mdb.GetTotalNumDBShards())
4✔
460
        return mdb.driver.NamedExecContext(ctx, dbShardID, insertReplicationTaskDLQQuery, row)
4✔
461
}
4✔
462

463
// SelectFromReplicationTasksDLQ reads one or more rows from replication_tasks_dlq table
464
func (mdb *db) SelectFromReplicationTasksDLQ(ctx context.Context, filter *sqlplugin.ReplicationTasksDLQFilter) ([]sqlplugin.ReplicationTasksRow, error) {
5✔
465
        var rows []sqlplugin.ReplicationTasksRow
5✔
466
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
5✔
467
        err := mdb.driver.SelectContext(
5✔
468
                ctx,
5✔
469
                dbShardID,
5✔
470
                &rows,
5✔
471
                getReplicationTasksDLQQuery,
5✔
472
                filter.SourceClusterName,
5✔
473
                filter.ShardID,
5✔
474
                filter.MinTaskID,
5✔
475
                filter.MaxTaskID,
5✔
476
                filter.PageSize)
5✔
477
        return rows, err
5✔
478
}
5✔
479

480
// SelectFromReplicationDLQ reads one row from replication_tasks_dlq table
481
func (mdb *db) SelectFromReplicationDLQ(ctx context.Context, filter *sqlplugin.ReplicationTaskDLQFilter) (int64, error) {
5✔
482
        var size []int64
5✔
483
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
5✔
484
        if err := mdb.driver.SelectContext(
5✔
485
                ctx,
5✔
486
                dbShardID,
5✔
487
                &size,
5✔
488
                getReplicationTaskDLQQuery,
5✔
489
                filter.SourceClusterName,
5✔
490
                filter.ShardID,
5✔
491
        ); err != nil {
5✔
492
                return 0, err
×
493
        }
×
494
        return size[0], nil
5✔
495
}
496

497
// DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table
498
func (mdb *db) DeleteMessageFromReplicationTasksDLQ(
499
        ctx context.Context,
500
        filter *sqlplugin.ReplicationTasksDLQFilter,
501
) (sql.Result, error) {
1✔
502
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
1✔
503

1✔
504
        return mdb.driver.ExecContext(
1✔
505
                ctx,
1✔
506
                dbShardID,
1✔
507
                deleteReplicationTaskFromDLQQuery,
1✔
508
                filter.SourceClusterName,
1✔
509
                filter.ShardID,
1✔
510
                filter.TaskID,
1✔
511
        )
1✔
512
}
1✔
513

514
// DeleteMessageFromReplicationTasksDLQ deletes one or more rows from replication_tasks_dlq table
515
func (mdb *db) RangeDeleteMessageFromReplicationTasksDLQ(
516
        ctx context.Context,
517
        filter *sqlplugin.ReplicationTasksDLQFilter,
518
) (sql.Result, error) {
1✔
519
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(filter.ShardID, mdb.GetTotalNumDBShards())
1✔
520
        if filter.PageSize > 0 {
1✔
521
                return mdb.driver.ExecContext(
×
522
                        ctx,
×
523
                        dbShardID,
×
524
                        rangeDeleteReplicationTaskFromDLQByBatchQuery,
×
525
                        filter.SourceClusterName,
×
526
                        filter.ShardID,
×
527
                        filter.TaskID,
×
528
                        filter.InclusiveEndTaskID,
×
529
                        filter.PageSize,
×
530
                )
×
531
        }
×
532

533
        return mdb.driver.ExecContext(
1✔
534
                ctx,
1✔
535
                dbShardID,
1✔
536
                rangeDeleteReplicationTaskFromDLQQuery,
1✔
537
                filter.SourceClusterName,
1✔
538
                filter.ShardID,
1✔
539
                filter.TaskID,
1✔
540
                filter.InclusiveEndTaskID,
1✔
541
        )
1✔
542
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc