• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0190573d-ff12-4850-94f0-8c77deb099df

27 Jun 2024 01:09AM UTC coverage: 71.434% (-0.1%) from 71.557%
0190573d-ff12-4850-94f0-8c77deb099df

push

buildkite

web-flow
Refactor/removing cross cluster feature (#6121)

## What changed?

This mostly* removes the cross-cluster feature.

## Background
The Cross-cluster feature was the ability to launch and interact with child workflows in another domain. It included the ability to start child workflows and signal them. The feature allowed child workflows to be launched in the target domain even if it was active in another region.

## Problems
The feature itself was something that very very few of our customers apparently needed, with very few customers interested in the problem of launching child workflows in another cluster, and zero who weren’t able to simply use an activity to make an RPC call to the other domain as one would with any normal workflow.
The feature-itself was quite resource intensive: It was pull-based; spinning up a polling stack which polled the other cluster for work, similar to the replication stack. This polling behaviour made the latency characteristics fairly unpredictable and used considerable DB resources, to the point that we just turned it off. The Uber/Cadence team resolved that were there sufficient demand for the feature in the future, a push based mechanism would probably be significantly preferable.
The feature itself added a nontrivial amount of complexity to the codebase in a few areas such as task processing and domain error handling which introduced difficult to understand bugs such as the child workflow dropping error #5919

Decision to deprecate and alternatives
As of releases June 2024, the feature will be removed. The Cadence team is not aware of any users of the feature outside Uber (as it was broken until mid 2021 anyway), but as an FYI, it will cease to be available.

If this behaviour is desirable, an easy workaround is as previously mentioned: Use an activity to launch or signal the workflows in the other domain and block as needed.

PR details
This is a fairly high-risk refactor so it'll take some time to ... (continued)

118 of 134 new or added lines in 9 files covered. (88.06%)

324 existing lines in 28 files now uncovered.

104678 of 146539 relevant lines covered (71.43%)

2642.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.95
/common/persistence/sql/sqlplugin/postgres/execution.go
1
// Copyright (c) 2019 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package postgres
22

23
import (
24
        "context"
25
        "database/sql"
26

27
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
28
)
29

30
const (
31
        executionsColumns = `shard_id, domain_id, workflow_id, run_id, next_event_id, last_write_version, data, data_encoding`
32

33
        createExecutionQuery = `INSERT INTO executions(` + executionsColumns + `)
34
 VALUES(:shard_id, :domain_id, :workflow_id, :run_id, :next_event_id, :last_write_version, :data, :data_encoding)`
35

36
        updateExecutionQuery = `UPDATE executions SET
37
 next_event_id = :next_event_id, last_write_version = :last_write_version, data = :data, data_encoding = :data_encoding
38
 WHERE shard_id = :shard_id AND domain_id = :domain_id AND workflow_id = :workflow_id AND run_id = :run_id`
39

40
        getExecutionQuery = `SELECT ` + executionsColumns + ` FROM executions
41
 WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4`
42

43
        listExecutionQuery = `SELECT ` + executionsColumns + ` FROM executions
44
 WHERE shard_id = $1 AND workflow_id > $2 ORDER BY workflow_id LIMIT $3`
45

46
        deleteExecutionQuery = `DELETE FROM executions
47
 WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4`
48

49
        lockExecutionQueryBase = `SELECT next_event_id FROM executions
50
 WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4`
51

52
        writeLockExecutionQuery = lockExecutionQueryBase + ` FOR UPDATE`
53
        readLockExecutionQuery  = lockExecutionQueryBase + ` FOR SHARE`
54

55
        createCurrentExecutionQuery = `INSERT INTO current_executions
56
(shard_id, domain_id, workflow_id, run_id, create_request_id, state, close_status, start_version, last_write_version) VALUES
57
(:shard_id, :domain_id, :workflow_id, :run_id, :create_request_id, :state, :close_status, :start_version, :last_write_version)`
58

59
        deleteCurrentExecutionQuery = "DELETE FROM current_executions WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4"
60

61
        getCurrentExecutionQuery = `SELECT
62
shard_id, domain_id, workflow_id, run_id, create_request_id, state, close_status, start_version, last_write_version
63
FROM current_executions WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3`
64

65
        lockCurrentExecutionJoinExecutionsQuery = `SELECT
66
ce.shard_id, ce.domain_id, ce.workflow_id, ce.run_id, ce.create_request_id, ce.state, ce.close_status, ce.start_version, e.last_write_version
67
FROM current_executions ce
68
INNER JOIN executions e ON e.shard_id = ce.shard_id AND e.domain_id = ce.domain_id AND e.workflow_id = ce.workflow_id AND e.run_id = ce.run_id
69
WHERE ce.shard_id = $1 AND ce.domain_id = $2 AND ce.workflow_id = $3 FOR UPDATE`
70

71
        lockCurrentExecutionQuery = getCurrentExecutionQuery + ` FOR UPDATE`
72

73
        updateCurrentExecutionsQuery = `UPDATE current_executions SET
74
run_id = :run_id,
75
create_request_id = :create_request_id,
76
state = :state,
77
close_status = :close_status,
78
start_version = :start_version,
79
last_write_version = :last_write_version
80
WHERE
81
shard_id = :shard_id AND
82
domain_id = :domain_id AND
83
workflow_id = :workflow_id
84
`
85

86
        getTransferTasksQuery = `SELECT task_id, data, data_encoding
87
 FROM transfer_tasks WHERE shard_id = $1 AND task_id > $2 AND task_id <= $3 ORDER BY shard_id, task_id LIMIT $4`
88

89
        createTransferTasksQuery = `INSERT INTO transfer_tasks(shard_id, task_id, data, data_encoding)
90
 VALUES(:shard_id, :task_id, :data, :data_encoding)`
91

92
        deleteTransferTaskQuery             = `DELETE FROM transfer_tasks WHERE shard_id = $1 AND task_id = $2`
93
        rangeDeleteTransferTaskQuery        = `DELETE FROM transfer_tasks WHERE shard_id = $1 AND task_id > $2 AND task_id <= $3`
94
        rangeDeleteTransferTaskByBatchQuery = `DELETE FROM transfer_tasks WHERE shard_id = $1 AND task_id IN (SELECT task_id FROM
95
                transfer_tasks WHERE shard_id = $1 AND task_id > $2 AND task_id <= $3 ORDER BY task_id LIMIT $4)`
96

97
        getCrossClusterTasksQuery = `SELECT task_id, data, data_encoding
98
 FROM cross_cluster_tasks WHERE target_cluster = $1 AND shard_id = $2 AND task_id > $3 AND task_id <= $4 ORDER BY task_id LIMIT $5`
99

100
        createCrossClusterTasksQuery = `INSERT INTO cross_cluster_tasks(target_cluster, shard_id, task_id, data, data_encoding)
101
 VALUES(:target_cluster, :shard_id, :task_id, :data, :data_encoding)`
102

103
        deleteCrossClusterTaskQuery             = `DELETE FROM cross_cluster_tasks WHERE target_cluster = $1 AND shard_id = $2 AND task_id = $3`
104
        rangeDeleteCrossClusterTaskQuery        = `DELETE FROM cross_cluster_tasks WHERE target_cluster = $1 AND shard_id = $2 AND task_id > $3 AND task_id <= $4`
105
        rangeDeleteCrossClusterTaskByBatchQuery = `DELETE FROM cross_cluster_tasks WHERE target_cluster = $1 AND shard_id = $2 AND task_id IN (SELECT task_id FROM
106
                cross_cluster_tasks WHERE target_cluster = $1 AND shard_id = $2 AND task_id > $3 AND task_id <= $4 ORDER BY task_id LIMIT $5)`
107

108
        createTimerTasksQuery = `INSERT INTO timer_tasks (shard_id, visibility_timestamp, task_id, data, data_encoding)
109
  VALUES (:shard_id, :visibility_timestamp, :task_id, :data, :data_encoding)`
110

111
        getTimerTasksQuery = `SELECT visibility_timestamp, task_id, data, data_encoding FROM timer_tasks
112
  WHERE shard_id = $1
113
  AND ((visibility_timestamp >= $2 AND task_id >= $3) OR visibility_timestamp > $4)
114
  AND visibility_timestamp < $5
115
  ORDER BY visibility_timestamp,task_id LIMIT $6`
116

117
        deleteTimerTaskQuery             = `DELETE FROM timer_tasks WHERE shard_id = $1 AND visibility_timestamp = $2 AND task_id = $3`
118
        rangeDeleteTimerTaskQuery        = `DELETE FROM timer_tasks WHERE shard_id = $1 AND visibility_timestamp >= $2 AND visibility_timestamp < $3`
119
        rangeDeleteTimerTaskByBatchQuery = `DELETE FROM timer_tasks WHERE shard_id = $1 AND (visibility_timestamp,task_id) IN (SELECT visibility_timestamp,task_id FROM
120
                timer_tasks WHERE shard_id = $1 AND visibility_timestamp >= $2 AND visibility_timestamp < $3 ORDER BY visibility_timestamp,task_id LIMIT $4)`
121

122
        createReplicationTasksQuery = `INSERT INTO replication_tasks (shard_id, task_id, data, data_encoding)
123
  VALUES(:shard_id, :task_id, :data, :data_encoding)`
124

125
        getReplicationTasksQuery = `SELECT task_id, data, data_encoding FROM replication_tasks WHERE
126
shard_id = $1 AND
127
task_id > $2 AND
128
task_id <= $3
129
ORDER BY task_id LIMIT $4`
130

131
        deleteReplicationTaskQuery             = `DELETE FROM replication_tasks WHERE shard_id = $1 AND task_id = $2`
132
        rangeDeleteReplicationTaskQuery        = `DELETE FROM replication_tasks WHERE shard_id = $1 AND task_id <= $2`
133
        rangeDeleteReplicationTaskByBatchQuery = `DELETE FROM replication_tasks WHERE shard_id = $1 AND task_id IN (SELECT task_id FROM
134
                replication_tasks WHERE task_id <= $2 ORDER BY task_id LIMIT $3)`
135

136
        getReplicationTasksDLQQuery = `SELECT task_id, data, data_encoding FROM replication_tasks_dlq WHERE
137
source_cluster_name = $1 AND
138
shard_id = $2 AND
139
task_id > $3 AND
140
task_id <= $4
141
ORDER BY task_id LIMIT $5`
142
        getReplicationTaskDLQQuery = `SELECT count(1) as count FROM replication_tasks_dlq WHERE
143
source_cluster_name = $1 AND
144
shard_id = $2`
145

146
        bufferedEventsColumns     = `shard_id, domain_id, workflow_id, run_id, data, data_encoding`
147
        createBufferedEventsQuery = `INSERT INTO buffered_events(` + bufferedEventsColumns + `)
148
VALUES (:shard_id, :domain_id, :workflow_id, :run_id, :data, :data_encoding)`
149

150
        deleteBufferedEventsQuery = `DELETE FROM buffered_events WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4`
151
        getBufferedEventsQuery    = `SELECT data, data_encoding FROM buffered_events WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4`
152

153
        insertReplicationTaskDLQQuery = `
154
INSERT INTO replication_tasks_dlq
155
            (source_cluster_name,
156
             shard_id,
157
             task_id,
158
             data,
159
             data_encoding)
160
VALUES     (:source_cluster_name,
161
            :shard_id,
162
            :task_id,
163
            :data,
164
            :data_encoding)
165
`
166
        deleteReplicationTaskFromDLQQuery = `
167
        DELETE FROM replication_tasks_dlq
168
                WHERE source_cluster_name = $1
169
                AND shard_id = $2
170
                AND task_id = $3`
171

172
        rangeDeleteReplicationTaskFromDLQQuery = `
173
        DELETE FROM replication_tasks_dlq
174
                WHERE source_cluster_name = $1
175
                AND shard_id = $2
176
                AND task_id > $3
177
                AND task_id <= $4`
178
        rangeDeleteReplicationTaskFromDLQByBatchQuery = `DELETE FROM replication_tasks_dlq WHERE source_cluster_name = $1 AND shard_id = $2 AND task_id IN (SELECT task_id FROM
179
                replication_tasks_dlq WHERE source_cluster_name = $1 AND shard_id = $2 AND task_id > $3 AND task_id <= $4 ORDER BY task_id LIMIT $5)`
180
)
181

182
// InsertIntoExecutions inserts a row into executions table
183
func (pdb *db) InsertIntoExecutions(ctx context.Context, row *sqlplugin.ExecutionsRow) (sql.Result, error) {
279✔
184
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(row.ShardID), pdb.GetTotalNumDBShards())
279✔
185
        return pdb.driver.NamedExecContext(ctx, dbShardID, createExecutionQuery, row)
279✔
186
}
279✔
187

188
// UpdateExecutions updates a single row in executions table
189
func (pdb *db) UpdateExecutions(ctx context.Context, row *sqlplugin.ExecutionsRow) (sql.Result, error) {
1,552✔
190
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(row.ShardID), pdb.GetTotalNumDBShards())
1,552✔
191
        return pdb.driver.NamedExecContext(ctx, dbShardID, updateExecutionQuery, row)
1,552✔
192
}
1,552✔
193

194
// SelectFromExecutions reads a single row from executions table
195
// The list execution query result is order by workflow ID only. It may returns duplicate record with pagination.
196
func (pdb *db) SelectFromExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) ([]sqlplugin.ExecutionsRow, error) {
498✔
197
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
498✔
198
        var rows []sqlplugin.ExecutionsRow
498✔
199
        var err error
498✔
200
        if len(filter.DomainID) == 0 && filter.Size > 0 {
498✔
201
                err = pdb.driver.SelectContext(ctx, dbShardID, &rows, listExecutionQuery, filter.ShardID, filter.WorkflowID, filter.Size)
×
202
                if err != nil {
×
203
                        return nil, err
×
204
                }
×
205
        } else {
498✔
206
                var row sqlplugin.ExecutionsRow
498✔
207
                err = pdb.driver.GetContext(ctx, dbShardID, &row, getExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
498✔
208
                if err != nil {
640✔
209
                        return nil, err
142✔
210
                }
142✔
211
                rows = append(rows, row)
357✔
212
        }
213

214
        return rows, err
357✔
215
}
216

217
// DeleteFromExecutions deletes a single row from executions table
218
func (pdb *db) DeleteFromExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) (sql.Result, error) {
23✔
219
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
23✔
220
        return pdb.driver.ExecContext(ctx, dbShardID, deleteExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
23✔
221
}
23✔
222

223
// ReadLockExecutions acquires a write lock on a single row in executions table
224
func (pdb *db) ReadLockExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) (int, error) {
×
225
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
×
226
        var nextEventID int
×
227
        err := pdb.driver.GetContext(ctx, dbShardID, &nextEventID, readLockExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
×
228
        return nextEventID, err
×
229
}
×
230

231
// WriteLockExecutions acquires a write lock on a single row in executions table
232
func (pdb *db) WriteLockExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) (int, error) {
1,566✔
233
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
1,566✔
234
        var nextEventID int
1,566✔
235
        err := pdb.driver.GetContext(ctx, dbShardID, &nextEventID, writeLockExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
1,566✔
236
        return nextEventID, err
1,566✔
237
}
1,566✔
238

239
// InsertIntoCurrentExecutions inserts a single row into current_executions table
240
func (pdb *db) InsertIntoCurrentExecutions(ctx context.Context, row *sqlplugin.CurrentExecutionsRow) (sql.Result, error) {
206✔
241
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(row.ShardID), pdb.GetTotalNumDBShards())
206✔
242
        return pdb.driver.NamedExecContext(ctx, dbShardID, createCurrentExecutionQuery, row)
206✔
243
}
206✔
244

245
// UpdateCurrentExecutions updates a single row in current_executions table
246
func (pdb *db) UpdateCurrentExecutions(ctx context.Context, row *sqlplugin.CurrentExecutionsRow) (sql.Result, error) {
1,566✔
247
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(row.ShardID), pdb.GetTotalNumDBShards())
1,566✔
248
        return pdb.driver.NamedExecContext(ctx, dbShardID, updateCurrentExecutionsQuery, row)
1,566✔
249
}
1,566✔
250

251
// SelectFromCurrentExecutions reads one or more rows from current_executions table
252
func (pdb *db) SelectFromCurrentExecutions(ctx context.Context, filter *sqlplugin.CurrentExecutionsFilter) (*sqlplugin.CurrentExecutionsRow, error) {
92✔
253
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
92✔
254
        var row sqlplugin.CurrentExecutionsRow
92✔
255
        err := pdb.driver.GetContext(ctx, dbShardID, &row, getCurrentExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID)
92✔
256
        return &row, err
92✔
257
}
92✔
258

259
// DeleteFromCurrentExecutions deletes a single row in current_executions table
260
func (pdb *db) DeleteFromCurrentExecutions(ctx context.Context, filter *sqlplugin.CurrentExecutionsFilter) (sql.Result, error) {
20✔
261
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
20✔
262
        return pdb.driver.ExecContext(ctx, dbShardID, deleteCurrentExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
20✔
263
}
20✔
264

265
// LockCurrentExecutions acquires a write lock on a single row in current_executions table
266
func (pdb *db) LockCurrentExecutions(ctx context.Context, filter *sqlplugin.CurrentExecutionsFilter) (*sqlplugin.CurrentExecutionsRow, error) {
1,569✔
267
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
1,569✔
268
        var row sqlplugin.CurrentExecutionsRow
1,569✔
269
        err := pdb.driver.GetContext(ctx, dbShardID, &row, lockCurrentExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID)
1,569✔
270
        return &row, err
1,569✔
271
}
1,569✔
272

273
// LockCurrentExecutionsJoinExecutions joins a row in current_executions with executions table and acquires a
274
// write lock on the result
275
func (pdb *db) LockCurrentExecutionsJoinExecutions(ctx context.Context, filter *sqlplugin.CurrentExecutionsFilter) ([]sqlplugin.CurrentExecutionsRow, error) {
252✔
276
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
252✔
277
        var rows []sqlplugin.CurrentExecutionsRow
252✔
278
        err := pdb.driver.SelectContext(ctx, dbShardID, &rows, lockCurrentExecutionJoinExecutionsQuery, filter.ShardID, filter.DomainID, filter.WorkflowID)
252✔
279
        return rows, err
252✔
280
}
252✔
281

282
// InsertIntoTransferTasks inserts one or more rows into transfer_tasks table
283
func (pdb *db) InsertIntoTransferTasks(ctx context.Context, rows []sqlplugin.TransferTasksRow) (sql.Result, error) {
832✔
284
        if len(rows) == 0 {
832✔
285
                return nil, nil
×
286
        }
×
287
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(rows[0].ShardID, pdb.GetTotalNumDBShards())
832✔
288
        return pdb.driver.NamedExecContext(ctx, dbShardID, createTransferTasksQuery, rows)
832✔
289
}
290

291
// SelectFromTransferTasks reads one or more rows from transfer_tasks table
292
func (pdb *db) SelectFromTransferTasks(ctx context.Context, filter *sqlplugin.TransferTasksFilter) ([]sqlplugin.TransferTasksRow, error) {
906✔
293
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
906✔
294
        var rows []sqlplugin.TransferTasksRow
906✔
295
        err := pdb.driver.SelectContext(ctx, dbShardID, &rows, getTransferTasksQuery, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize)
906✔
296
        if err != nil {
906✔
297
                return nil, err
×
298
        }
×
299
        return rows, err
906✔
300
}
301

302
// DeleteFromTransferTasks deletes one or more rows from transfer_tasks table
303
func (pdb *db) DeleteFromTransferTasks(ctx context.Context, filter *sqlplugin.TransferTasksFilter) (sql.Result, error) {
70✔
304
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
70✔
305
        return pdb.driver.ExecContext(ctx, dbShardID, deleteTransferTaskQuery, filter.ShardID, filter.TaskID)
70✔
306
}
70✔
307

308
// RangeDeleteFromTransferTasks deletes multi rows from transfer_tasks table
309
func (pdb *db) RangeDeleteFromTransferTasks(ctx context.Context, filter *sqlplugin.TransferTasksFilter) (sql.Result, error) {
41✔
310
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
41✔
311
        if filter.PageSize > 0 {
82✔
312
                return pdb.driver.ExecContext(ctx, dbShardID, rangeDeleteTransferTaskByBatchQuery, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize)
41✔
313
        }
41✔
314
        return pdb.driver.ExecContext(ctx, dbShardID, rangeDeleteTransferTaskQuery, filter.ShardID, filter.MinTaskID, filter.MaxTaskID)
×
315
}
316

317
// InsertIntoCrossClusterTasks inserts one or more rows into cross_cluster_tasks table
318
func (pdb *db) InsertIntoCrossClusterTasks(ctx context.Context, rows []sqlplugin.CrossClusterTasksRow) (sql.Result, error) {
1✔
319
        if len(rows) == 0 {
1✔
320
                return nil, nil
×
321
        }
×
322
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(rows[0].ShardID, pdb.GetTotalNumDBShards())
1✔
323
        return pdb.driver.NamedExecContext(ctx, dbShardID, createCrossClusterTasksQuery, rows)
1✔
324
}
325

326
// SelectFromCrossClusterTasks reads one or more rows from cross_cluster_tasks table
UNCOV
327
func (pdb *db) SelectFromCrossClusterTasks(ctx context.Context, filter *sqlplugin.CrossClusterTasksFilter) ([]sqlplugin.CrossClusterTasksRow, error) {
×
UNCOV
328
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
×
UNCOV
329
        var rows []sqlplugin.CrossClusterTasksRow
×
UNCOV
330
        err := pdb.driver.SelectContext(ctx, dbShardID, &rows, getCrossClusterTasksQuery, filter.TargetCluster, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize)
×
UNCOV
331
        if err != nil {
×
332
                return nil, err
×
333
        }
×
UNCOV
334
        return rows, err
×
335
}
336

337
// DeleteFromCrossClusterTasks deletes one or more rows from cross_cluster_tasks table
UNCOV
338
func (pdb *db) DeleteFromCrossClusterTasks(ctx context.Context, filter *sqlplugin.CrossClusterTasksFilter) (sql.Result, error) {
×
UNCOV
339
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
×
UNCOV
340
        return pdb.driver.ExecContext(ctx, dbShardID, deleteCrossClusterTaskQuery, filter.TargetCluster, filter.ShardID, filter.TaskID)
×
UNCOV
341
}
×
342

343
// RangeDeleteFromCrossClusterTasks deletes multi rows from cross_cluster_tasks table
UNCOV
344
func (pdb *db) RangeDeleteFromCrossClusterTasks(ctx context.Context, filter *sqlplugin.CrossClusterTasksFilter) (sql.Result, error) {
×
UNCOV
345
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
×
UNCOV
346
        if filter.PageSize > 0 {
×
UNCOV
347
                return pdb.driver.ExecContext(ctx, dbShardID, rangeDeleteCrossClusterTaskByBatchQuery, filter.TargetCluster, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize)
×
UNCOV
348
        }
×
349
        return pdb.driver.ExecContext(ctx, dbShardID, rangeDeleteCrossClusterTaskQuery, filter.TargetCluster, filter.ShardID, filter.MinTaskID, filter.MaxTaskID)
×
350
}
351

352
// InsertIntoTimerTasks inserts one or more rows into timer_tasks table
353
func (pdb *db) InsertIntoTimerTasks(ctx context.Context, rows []sqlplugin.TimerTasksRow) (sql.Result, error) {
1,034✔
354
        if len(rows) == 0 {
1,034✔
355
                return nil, nil
×
356
        }
×
357
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(rows[0].ShardID, pdb.GetTotalNumDBShards())
1,034✔
358
        for i := range rows {
2,106✔
359
                rows[i].VisibilityTimestamp = pdb.converter.ToPostgresDateTime(rows[i].VisibilityTimestamp)
1,072✔
360
        }
1,072✔
361
        return pdb.driver.NamedExecContext(ctx, dbShardID, createTimerTasksQuery, rows)
1,034✔
362
}
363

364
// SelectFromTimerTasks reads one or more rows from timer_tasks table
365
func (pdb *db) SelectFromTimerTasks(ctx context.Context, filter *sqlplugin.TimerTasksFilter) ([]sqlplugin.TimerTasksRow, error) {
1,091✔
366
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
1,091✔
367
        var rows []sqlplugin.TimerTasksRow
1,091✔
368
        filter.MinVisibilityTimestamp = pdb.converter.ToPostgresDateTime(filter.MinVisibilityTimestamp)
1,091✔
369
        filter.MaxVisibilityTimestamp = pdb.converter.ToPostgresDateTime(filter.MaxVisibilityTimestamp)
1,091✔
370
        err := pdb.driver.SelectContext(ctx, dbShardID, &rows, getTimerTasksQuery, filter.ShardID, filter.MinVisibilityTimestamp,
1,091✔
371
                filter.TaskID, filter.MinVisibilityTimestamp, filter.MaxVisibilityTimestamp, filter.PageSize)
1,091✔
372
        if err != nil {
1,091✔
373
                return nil, err
×
374
        }
×
375
        for i := range rows {
4,849✔
376
                rows[i].VisibilityTimestamp = pdb.converter.FromPostgresDateTime(rows[i].VisibilityTimestamp)
3,758✔
377
        }
3,758✔
378
        return rows, err
1,091✔
379
}
380

381
// DeleteFromTimerTasks deletes one or more rows from timer_tasks table
382
func (pdb *db) DeleteFromTimerTasks(ctx context.Context, filter *sqlplugin.TimerTasksFilter) (sql.Result, error) {
5✔
383
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
5✔
384
        filter.VisibilityTimestamp = pdb.converter.ToPostgresDateTime(filter.VisibilityTimestamp)
5✔
385
        return pdb.driver.ExecContext(ctx, dbShardID, deleteTimerTaskQuery, filter.ShardID, filter.VisibilityTimestamp, filter.TaskID)
5✔
386
}
5✔
387

388
// RangeDeleteFromTimerTasks deletes multi rows from timer_tasks table
389
func (pdb *db) RangeDeleteFromTimerTasks(ctx context.Context, filter *sqlplugin.TimerTasksFilter) (sql.Result, error) {
18✔
390
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
18✔
391
        filter.MinVisibilityTimestamp = pdb.converter.ToPostgresDateTime(filter.MinVisibilityTimestamp)
18✔
392
        filter.MaxVisibilityTimestamp = pdb.converter.ToPostgresDateTime(filter.MaxVisibilityTimestamp)
18✔
393
        if filter.PageSize > 0 {
36✔
394
                return pdb.driver.ExecContext(ctx, dbShardID, rangeDeleteTimerTaskByBatchQuery, filter.ShardID, filter.MinVisibilityTimestamp, filter.MaxVisibilityTimestamp, filter.PageSize)
18✔
395
        }
18✔
396
        return pdb.driver.ExecContext(ctx, dbShardID, rangeDeleteTimerTaskQuery, filter.ShardID, filter.MinVisibilityTimestamp, filter.MaxVisibilityTimestamp)
×
397
}
398

399
// InsertIntoBufferedEvents inserts one or more rows into buffered_events table
400
func (pdb *db) InsertIntoBufferedEvents(ctx context.Context, rows []sqlplugin.BufferedEventsRow) (sql.Result, error) {
218✔
401
        if len(rows) == 0 {
218✔
402
                return nil, nil
×
403
        }
×
404
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(rows[0].ShardID, pdb.GetTotalNumDBShards())
218✔
405
        return pdb.driver.NamedExecContext(ctx, dbShardID, createBufferedEventsQuery, rows)
218✔
406
}
407

408
// SelectFromBufferedEvents reads one or more rows from buffered_events table
409
func (pdb *db) SelectFromBufferedEvents(ctx context.Context, filter *sqlplugin.BufferedEventsFilter) ([]sqlplugin.BufferedEventsRow, error) {
498✔
410
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
498✔
411
        var rows []sqlplugin.BufferedEventsRow
498✔
412
        err := pdb.driver.SelectContext(ctx, dbShardID, &rows, getBufferedEventsQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
498✔
413
        for i := 0; i < len(rows); i++ {
505✔
414
                rows[i].DomainID = filter.DomainID
7✔
415
                rows[i].WorkflowID = filter.WorkflowID
7✔
416
                rows[i].RunID = filter.RunID
7✔
417
                rows[i].ShardID = filter.ShardID
7✔
418
        }
7✔
419
        return rows, err
498✔
420
}
421

422
// DeleteFromBufferedEvents deletes one or more rows from buffered_events table
423
func (pdb *db) DeleteFromBufferedEvents(ctx context.Context, filter *sqlplugin.BufferedEventsFilter) (sql.Result, error) {
44✔
424
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
44✔
425
        return pdb.driver.ExecContext(ctx, dbShardID, deleteBufferedEventsQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID)
44✔
426
}
44✔
427

428
// InsertIntoReplicationTasks inserts one or more rows into replication_tasks table
429
func (pdb *db) InsertIntoReplicationTasks(ctx context.Context, rows []sqlplugin.ReplicationTasksRow) (sql.Result, error) {
4✔
430
        if len(rows) == 0 {
4✔
431
                return nil, nil
×
432
        }
×
433
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(rows[0].ShardID, pdb.GetTotalNumDBShards())
4✔
434
        return pdb.driver.NamedExecContext(ctx, dbShardID, createReplicationTasksQuery, rows)
4✔
435
}
436

437
// SelectFromReplicationTasks reads one or more rows from replication_tasks table
438
func (pdb *db) SelectFromReplicationTasks(ctx context.Context, filter *sqlplugin.ReplicationTasksFilter) ([]sqlplugin.ReplicationTasksRow, error) {
99✔
439
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
99✔
440
        var rows []sqlplugin.ReplicationTasksRow
99✔
441
        err := pdb.driver.SelectContext(ctx, dbShardID, &rows, getReplicationTasksQuery, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize)
99✔
442
        return rows, err
99✔
443
}
99✔
444

445
// DeleteFromReplicationTasks deletes one rows from replication_tasks table
446
func (pdb *db) DeleteFromReplicationTasks(ctx context.Context, filter *sqlplugin.ReplicationTasksFilter) (sql.Result, error) {
5✔
447
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
5✔
448
        return pdb.driver.ExecContext(ctx, dbShardID, deleteReplicationTaskQuery, filter.ShardID, filter.TaskID)
5✔
449
}
5✔
450

451
// RangeDeleteFromReplicationTasks deletes multi rows from replication_tasks table
452
func (pdb *db) RangeDeleteFromReplicationTasks(ctx context.Context, filter *sqlplugin.ReplicationTasksFilter) (sql.Result, error) {
39✔
453
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
39✔
454
        if filter.PageSize > 0 {
78✔
455
                return pdb.driver.ExecContext(ctx, dbShardID, rangeDeleteReplicationTaskByBatchQuery, filter.ShardID, filter.InclusiveEndTaskID, filter.PageSize)
39✔
456
        }
39✔
457
        return pdb.driver.ExecContext(ctx, dbShardID, rangeDeleteReplicationTaskQuery, filter.ShardID, filter.InclusiveEndTaskID)
×
458
}
459

460
// InsertIntoReplicationTasksDLQ inserts one or more rows into replication_tasks_dlq table
461
func (pdb *db) InsertIntoReplicationTasksDLQ(ctx context.Context, row *sqlplugin.ReplicationTaskDLQRow) (sql.Result, error) {
4✔
462
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(row.ShardID), pdb.GetTotalNumDBShards())
4✔
463
        return pdb.driver.NamedExecContext(ctx, dbShardID, insertReplicationTaskDLQQuery, row)
4✔
464
}
4✔
465

466
// SelectFromReplicationTasksDLQ reads one or more rows from replication_tasks_dlq table
467
func (pdb *db) SelectFromReplicationTasksDLQ(ctx context.Context, filter *sqlplugin.ReplicationTasksDLQFilter) ([]sqlplugin.ReplicationTasksRow, error) {
5✔
468
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
5✔
469
        var rows []sqlplugin.ReplicationTasksRow
5✔
470
        err := pdb.driver.SelectContext(
5✔
471
                ctx,
5✔
472
                dbShardID,
5✔
473
                &rows, getReplicationTasksDLQQuery,
5✔
474
                filter.SourceClusterName,
5✔
475
                filter.ShardID,
5✔
476
                filter.MinTaskID,
5✔
477
                filter.MaxTaskID,
5✔
478
                filter.PageSize)
5✔
479
        return rows, err
5✔
480
}
5✔
481

482
// SelectFromReplicationDLQ reads one row from replication_tasks_dlq table
483
func (pdb *db) SelectFromReplicationDLQ(ctx context.Context, filter *sqlplugin.ReplicationTaskDLQFilter) (int64, error) {
5✔
484
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
5✔
485
        var size []int64
5✔
486
        if err := pdb.driver.SelectContext(
5✔
487
                ctx,
5✔
488
                dbShardID,
5✔
489
                &size, getReplicationTaskDLQQuery,
5✔
490
                filter.SourceClusterName,
5✔
491
                filter.ShardID,
5✔
492
        ); err != nil {
5✔
493
                return 0, err
×
494
        }
×
495
        return size[0], nil
5✔
496
}
497

498
// DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table
499
func (pdb *db) DeleteMessageFromReplicationTasksDLQ(
500
        ctx context.Context,
501
        filter *sqlplugin.ReplicationTasksDLQFilter,
502
) (sql.Result, error) {
1✔
503
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
1✔
504
        return pdb.driver.ExecContext(
1✔
505
                ctx,
1✔
506
                dbShardID,
1✔
507
                deleteReplicationTaskFromDLQQuery,
1✔
508
                filter.SourceClusterName,
1✔
509
                filter.ShardID,
1✔
510
                filter.TaskID,
1✔
511
        )
1✔
512
}
1✔
513

514
// DeleteMessageFromReplicationTasksDLQ deletes one or more rows from replication_tasks_dlq table
515
func (pdb *db) RangeDeleteMessageFromReplicationTasksDLQ(
516
        ctx context.Context,
517
        filter *sqlplugin.ReplicationTasksDLQFilter,
518
) (sql.Result, error) {
1✔
519
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(int(filter.ShardID), pdb.GetTotalNumDBShards())
1✔
520
        if filter.PageSize > 0 {
1✔
521
                return pdb.driver.ExecContext(
×
522
                        ctx,
×
523
                        dbShardID,
×
524
                        rangeDeleteReplicationTaskFromDLQByBatchQuery,
×
525
                        filter.SourceClusterName,
×
526
                        filter.ShardID,
×
527
                        filter.TaskID,
×
528
                        filter.InclusiveEndTaskID,
×
529
                        filter.PageSize,
×
530
                )
×
531
        }
×
532

533
        return pdb.driver.ExecContext(
1✔
534
                ctx,
1✔
535
                dbShardID,
1✔
536
                rangeDeleteReplicationTaskFromDLQQuery,
1✔
537
                filter.SourceClusterName,
1✔
538
                filter.ShardID,
1✔
539
                filter.TaskID,
1✔
540
                filter.InclusiveEndTaskID,
1✔
541
        )
1✔
542
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc