• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018d9fa8-75f8-405b-9b1e-38f93e6b0a11

12 Feb 2024 11:30PM UTC coverage: 62.748% (+0.05%) from 62.701%
018d9fa8-75f8-405b-9b1e-38f93e6b0a11

Pull #5657

buildkite

Shaddoll
Implement SignalWithStartWorkflowExecutionAsync API
Pull Request #5657: Implement SignalWithStartWorkflowExecutionAsync API

96 of 142 new or added lines in 5 files covered. (67.61%)

60 existing lines in 8 files now uncovered.

92596 of 147569 relevant lines covered (62.75%)

2318.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.4
/common/persistence/sql/sqlplugin/postgres/task.go
1
// Copyright (c) 2019 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package postgres
22

23
import (
24
        "context"
25
        "database/sql"
26
        "fmt"
27

28
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
29
)
30

31
const (
32
        taskListCreatePart = `INTO task_lists(shard_id, domain_id, name, task_type, range_id, data, data_encoding) ` +
33
                `VALUES (:shard_id, :domain_id, :name, :task_type, :range_id, :data, :data_encoding)`
34

35
        // (default range ID: initialRangeID == 1)
36
        createTaskListQry = `INSERT ` + taskListCreatePart
37

38
        updateTaskListQry = `UPDATE task_lists SET
39
range_id = :range_id,
40
data = :data,
41
data_encoding = :data_encoding
42
WHERE
43
shard_id = :shard_id AND
44
domain_id = :domain_id AND
45
name = :name AND
46
task_type = :task_type
47
`
48

49
        // This query uses pagination that is best understood by analogy to simple numbers.
50
        // Given a list of numbers
51
        //         111
52
        //        112
53
        //        121
54
        //        211
55
        // where the hundreds digit corresponds to domain_id, the tens digit
56
        // corresponds to name, and the ones digit corresponds to task_type,
57
        // Imagine recurring queries with a limit of 1.
58
        // For the second query to skip the first result and return 112, it must allow equal values in hundreds & tens, but it's OK because the ones digit is higher.
59
        // For the third query, the ones digit is now lower but that's irrelevant because the tens digit is greater.
60
        // For the fourth query, the tens digit is now lower but that's again irrelevant because now the hundreds digit is higher.
61
        // This technique is useful since the size of the table can easily change between calls, making SKIP an unreliable method, while other db-specific things like rowids are not portable
62
        listTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` +
63
                `FROM task_lists ` +
64
                `WHERE shard_id = $1 AND ((domain_id = $2 AND name = $3 AND task_type > $4) OR (domain_id=$2 AND name > $3) OR (domain_id > $2)) ` +
65
                `ORDER BY domain_id,name,task_type LIMIT $5`
66

67
        getTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` +
68
                `FROM task_lists ` +
69
                `WHERE shard_id = $1 AND domain_id = $2 AND name = $3 AND task_type = $4`
70

71
        deleteTaskListQry = `DELETE FROM task_lists WHERE shard_id=$1 AND domain_id=$2 AND name=$3 AND task_type=$4 AND range_id=$5`
72

73
        lockTaskListQry = `SELECT range_id FROM task_lists ` +
74
                `WHERE shard_id = $1 AND domain_id = $2 AND name = $3 AND task_type = $4 FOR UPDATE`
75

76
        getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` +
77
                `FROM tasks ` +
78
                `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id > $4 AND task_id <= $5 ` +
79
                ` ORDER BY task_id LIMIT $6`
80

81
        getTaskMinQry = `SELECT task_id, data, data_encoding ` +
82
                `FROM tasks ` +
83
                `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id > $4 ORDER BY task_id LIMIT $5`
84

85
        getTasksCountQry = `SELECT count(1) as count ` +
86
                `FROM tasks ` +
87
                `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id > $4`
88

89
        createTaskQry = `INSERT INTO ` +
90
                `tasks(domain_id, task_list_name, task_type, task_id, data, data_encoding) ` +
91
                `VALUES(:domain_id, :task_list_name, :task_type, :task_id, :data, :data_encoding)`
92

93
        deleteTaskQry = `DELETE FROM tasks ` +
94
                `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id = $4`
95

96
        rangeDeleteTaskQry = `DELETE FROM tasks ` +
97
                `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id IN (SELECT task_id FROM
98
                 tasks WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id <= $4 ` +
99
                `ORDER BY domain_id,task_list_name,task_type,task_id LIMIT $5 )`
100

101
        getOrphanTaskQry = `SELECT task_id, domain_id, task_list_name, task_type FROM tasks AS t ` +
102
                `WHERE NOT EXISTS ( ` +
103
                `        SELECT domain_id, name, task_type FROM task_lists AS tl ` +
104
                `        WHERE t.domain_id=tl.domain_id and t.task_list_name=tl.name and t.task_type=tl.task_type ` +
105
                `) LIMIT $1;`
106
)
107

108
// InsertIntoTasks inserts one or more rows into tasks table
109
func (pdb *db) InsertIntoTasks(ctx context.Context, rows []sqlplugin.TasksRow) (sql.Result, error) {
297✔
110
        if len(rows) == 0 {
297✔
111
                return nil, nil
×
112
        }
×
113
        return pdb.driver.NamedExecContext(ctx, rows[0].ShardID, createTaskQry, rows)
297✔
114
}
115

116
// SelectFromTasks reads one or more rows from tasks table
117
func (pdb *db) SelectFromTasks(ctx context.Context, filter *sqlplugin.TasksFilter) ([]sqlplugin.TasksRow, error) {
290✔
118
        var err error
290✔
119
        var rows []sqlplugin.TasksRow
290✔
120
        switch {
290✔
121
        case filter.MaxTaskID != nil:
287✔
122
                err = pdb.driver.SelectContext(ctx, filter.ShardID, &rows, getTaskMinMaxQry, filter.DomainID,
287✔
123
                        filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.MaxTaskID, *filter.PageSize)
287✔
124
        default:
3✔
125
                err = pdb.driver.SelectContext(ctx, filter.ShardID, &rows, getTaskMinQry, filter.DomainID,
3✔
126
                        filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.PageSize)
3✔
127
        }
128
        if err != nil {
290✔
129
                return nil, err
×
130
        }
×
131
        return rows, err
290✔
132
}
133

134
// DeleteFromTasks deletes one or more rows from tasks table
135
func (pdb *db) DeleteFromTasks(ctx context.Context, filter *sqlplugin.TasksFilter) (sql.Result, error) {
146✔
136
        if filter.TaskIDLessThanEquals != nil {
287✔
137
                if filter.Limit == nil || *filter.Limit == 0 {
141✔
138
                        return nil, fmt.Errorf("missing limit parameter")
×
139
                }
×
140
                return pdb.driver.ExecContext(ctx, filter.ShardID, rangeDeleteTaskQry,
141✔
141
                        filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskIDLessThanEquals, *filter.Limit)
141✔
142
        }
143
        return pdb.driver.ExecContext(ctx, filter.ShardID, deleteTaskQry, filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskID)
5✔
144
}
145

146
func (pdb *db) GetTasksCount(ctx context.Context, filter *sqlplugin.TasksFilter) (int64, error) {
1,010✔
147
        var size []int64
1,010✔
148
        if err := pdb.driver.SelectContext(ctx, filter.ShardID, &size, getTasksCountQry, filter.DomainID, filter.TaskListName, filter.TaskType, *filter.MinTaskID); err != nil {
1,010✔
UNCOV
149
                return 0, err
×
UNCOV
150
        }
×
151
        return size[0], nil
1,010✔
152
}
153

154
func (pdb *db) GetOrphanTasks(ctx context.Context, filter *sqlplugin.OrphanTasksFilter) ([]sqlplugin.TaskKeyRow, error) {
3✔
155
        if filter.Limit == nil || *filter.Limit == 0 {
3✔
156
                return nil, fmt.Errorf("missing limit parameter")
×
157
        }
×
158
        var rows []sqlplugin.TaskKeyRow
3✔
159
        err := pdb.driver.SelectContext(ctx, sqlplugin.DbAllShards, &rows, getOrphanTaskQry, *filter.Limit)
3✔
160
        if err != nil {
3✔
161
                return nil, err
×
162
        }
×
163
        return rows, nil
3✔
164
}
165

166
// InsertIntoTaskLists inserts one or more rows into task_lists table
167
func (pdb *db) InsertIntoTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
461✔
168
        return pdb.driver.NamedExecContext(ctx, row.ShardID, createTaskListQry, row)
461✔
169
}
461✔
170

171
// UpdateTaskLists updates a row in task_lists table
172
func (pdb *db) UpdateTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
1,933✔
173
        return pdb.driver.NamedExecContext(ctx, row.ShardID, updateTaskListQry, row)
1,933✔
174
}
1,933✔
175

176
// SelectFromTaskLists reads one or more rows from task_lists table
177
func (pdb *db) SelectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
668✔
178
        switch {
668✔
179
        case filter.DomainID != nil && filter.Name != nil && filter.TaskType != nil:
477✔
180
                return pdb.selectFromTaskLists(ctx, filter)
477✔
181
        case filter.DomainIDGreaterThan != nil && filter.NameGreaterThan != nil && filter.TaskTypeGreaterThan != nil && filter.PageSize != nil:
191✔
182
                return pdb.rangeSelectFromTaskLists(ctx, filter)
191✔
183
        default:
×
184
                return nil, fmt.Errorf("invalid set of query filter params")
×
185
        }
186
}
187

188
func (pdb *db) selectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
477✔
189
        var err error
477✔
190
        var row sqlplugin.TaskListsRow
477✔
191
        err = pdb.driver.GetContext(ctx, filter.ShardID, &row, getTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType)
477✔
192
        if err != nil {
938✔
193
                return nil, err
461✔
194
        }
461✔
195
        return []sqlplugin.TaskListsRow{row}, err
16✔
196
}
197

198
func (pdb *db) rangeSelectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
191✔
199
        var err error
191✔
200
        var rows []sqlplugin.TaskListsRow
191✔
201
        err = pdb.driver.SelectContext(ctx, filter.ShardID, &rows, listTaskListQry,
191✔
202
                filter.ShardID, *filter.DomainIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.PageSize)
191✔
203
        if err != nil {
191✔
204
                return nil, err
×
205
        }
×
206
        for i := range rows {
302✔
207
                rows[i].ShardID = filter.ShardID
111✔
208
        }
111✔
209
        return rows, nil
191✔
210
}
211

212
// DeleteFromTaskLists deletes a row from task_lists table
213
func (pdb *db) DeleteFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) (sql.Result, error) {
26✔
214
        return pdb.driver.ExecContext(ctx, filter.ShardID, deleteTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType, *filter.RangeID)
26✔
215
}
26✔
216

217
// LockTaskLists locks a row in task_lists table
218
func (pdb *db) LockTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) (int64, error) {
2,231✔
219
        var rangeID int64
2,231✔
220
        err := pdb.driver.GetContext(ctx, filter.ShardID, &rangeID, lockTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType)
2,231✔
221
        return rangeID, err
2,231✔
222
}
2,231✔
223

224
// InsertIntoTasksWithTTL is not supported in Postgres
225
func (pdb *db) InsertIntoTasksWithTTL(_ context.Context, _ []sqlplugin.TasksRowWithTTL) (sql.Result, error) {
×
226
        return nil, sqlplugin.ErrTTLNotSupported
×
227
}
×
228

229
// InsertIntoTaskListsWithTTL is not supported in Postgres
230
func (pdb *db) InsertIntoTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
×
231
        return nil, sqlplugin.ErrTTLNotSupported
×
232
}
×
233

234
// UpdateTaskListsWithTTL is not supported in Postgres
235
func (pdb *db) UpdateTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
×
236
        return nil, sqlplugin.ErrTTLNotSupported
×
237
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc