• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 019056dc-98d1-4fa6-b475-a7aef51f4b90

26 Jun 2024 11:23PM UTC coverage: 71.557% (-0.001%) from 71.558%
019056dc-98d1-4fa6-b475-a7aef51f4b90

push

buildkite

web-flow
Fix encoding bug to index context header in search attributes (#6148)

What changed?

json marshal raw string bytes before store in search attributes

Why?

Context Header stores the raw string bytes; but search attributes should store json strings rather than raw string bytes. Otherwise, it will cause unmarshal error in creating visibility message.

How did you test it?

unit test

15 of 19 new or added lines in 3 files covered. (78.95%)

24 existing lines in 7 files now uncovered.

107133 of 149716 relevant lines covered (71.56%)

2587.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.4
/common/persistence/sql/sqlplugin/postgres/task.go
1
// Copyright (c) 2019 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package postgres
22

23
import (
24
        "context"
25
        "database/sql"
26
        "fmt"
27

28
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
29
)
30

31
const (
32
        taskListCreatePart = `INTO task_lists(shard_id, domain_id, name, task_type, range_id, data, data_encoding) ` +
33
                `VALUES (:shard_id, :domain_id, :name, :task_type, :range_id, :data, :data_encoding)`
34

35
        // (default range ID: initialRangeID == 1)
36
        createTaskListQry = `INSERT ` + taskListCreatePart
37

38
        updateTaskListQry = `UPDATE task_lists SET
39
range_id = :range_id,
40
data = :data,
41
data_encoding = :data_encoding
42
WHERE
43
shard_id = :shard_id AND
44
domain_id = :domain_id AND
45
name = :name AND
46
task_type = :task_type
47
`
48

49
        // This query uses pagination that is best understood by analogy to simple numbers.
50
        // Given a list of numbers
51
        //         111
52
        //        112
53
        //        121
54
        //        211
55
        // where the hundreds digit corresponds to domain_id, the tens digit
56
        // corresponds to name, and the ones digit corresponds to task_type,
57
        // Imagine recurring queries with a limit of 1.
58
        // For the second query to skip the first result and return 112, it must allow equal values in hundreds & tens, but it's OK because the ones digit is higher.
59
        // For the third query, the ones digit is now lower but that's irrelevant because the tens digit is greater.
60
        // For the fourth query, the tens digit is now lower but that's again irrelevant because now the hundreds digit is higher.
61
        // This technique is useful since the size of the table can easily change between calls, making SKIP an unreliable method, while other db-specific things like rowids are not portable
62
        listTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` +
63
                `FROM task_lists ` +
64
                `WHERE shard_id = $1 AND ((domain_id = $2 AND name = $3 AND task_type > $4) OR (domain_id=$2 AND name > $3) OR (domain_id > $2)) ` +
65
                `ORDER BY domain_id,name,task_type LIMIT $5`
66

67
        getTaskListQry = `SELECT domain_id, range_id, name, task_type, data, data_encoding ` +
68
                `FROM task_lists ` +
69
                `WHERE shard_id = $1 AND domain_id = $2 AND name = $3 AND task_type = $4`
70

71
        deleteTaskListQry = `DELETE FROM task_lists WHERE shard_id=$1 AND domain_id=$2 AND name=$3 AND task_type=$4 AND range_id=$5`
72

73
        lockTaskListQry = `SELECT range_id FROM task_lists ` +
74
                `WHERE shard_id = $1 AND domain_id = $2 AND name = $3 AND task_type = $4 FOR UPDATE`
75

76
        getTaskMinMaxQry = `SELECT task_id, data, data_encoding ` +
77
                `FROM tasks ` +
78
                `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id > $4 AND task_id <= $5 ` +
79
                ` ORDER BY task_id LIMIT $6`
80

81
        getTaskMinQry = `SELECT task_id, data, data_encoding ` +
82
                `FROM tasks ` +
83
                `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id > $4 ORDER BY task_id LIMIT $5`
84

85
        getTasksCountQry = `SELECT count(1) as count ` +
86
                `FROM tasks ` +
87
                `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id > $4`
88

89
        createTaskQry = `INSERT INTO ` +
90
                `tasks(domain_id, task_list_name, task_type, task_id, data, data_encoding) ` +
91
                `VALUES(:domain_id, :task_list_name, :task_type, :task_id, :data, :data_encoding)`
92

93
        deleteTaskQry = `DELETE FROM tasks ` +
94
                `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id = $4`
95

96
        rangeDeleteTaskQry = `DELETE FROM tasks ` +
97
                `WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id IN (SELECT task_id FROM
98
                 tasks WHERE domain_id = $1 AND task_list_name = $2 AND task_type = $3 AND task_id <= $4 ` +
99
                `ORDER BY domain_id,task_list_name,task_type,task_id LIMIT $5 )`
100

101
        getOrphanTaskQry = `SELECT task_id, domain_id, task_list_name, task_type FROM tasks AS t ` +
102
                `WHERE NOT EXISTS ( ` +
103
                `        SELECT domain_id, name, task_type FROM task_lists AS tl ` +
104
                `        WHERE t.domain_id=tl.domain_id and t.task_list_name=tl.name and t.task_type=tl.task_type ` +
105
                `) LIMIT $1;`
106
)
107

108
// InsertIntoTasks inserts one or more rows into tasks table
109
func (pdb *db) InsertIntoTasks(ctx context.Context, rows []sqlplugin.TasksRow) (sql.Result, error) {
289✔
110
        if len(rows) == 0 {
289✔
111
                return nil, nil
×
112
        }
×
113
        return pdb.driver.NamedExecContext(ctx, rows[0].ShardID, createTaskQry, rows)
289✔
114
}
115

116
// SelectFromTasks reads one or more rows from tasks table
117
func (pdb *db) SelectFromTasks(ctx context.Context, filter *sqlplugin.TasksFilter) ([]sqlplugin.TasksRow, error) {
283✔
118
        var err error
283✔
119
        var rows []sqlplugin.TasksRow
283✔
120
        switch {
283✔
121
        case filter.MaxTaskID != nil:
280✔
122
                err = pdb.driver.SelectContext(ctx, filter.ShardID, &rows, getTaskMinMaxQry, filter.DomainID,
280✔
123
                        filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.MaxTaskID, *filter.PageSize)
280✔
124
        default:
3✔
125
                err = pdb.driver.SelectContext(ctx, filter.ShardID, &rows, getTaskMinQry, filter.DomainID,
3✔
126
                        filter.TaskListName, filter.TaskType, *filter.MinTaskID, *filter.PageSize)
3✔
127
        }
128
        if err != nil {
283✔
129
                return nil, err
×
130
        }
×
131
        return rows, err
283✔
132
}
133

134
// DeleteFromTasks deletes one or more rows from tasks table
135
func (pdb *db) DeleteFromTasks(ctx context.Context, filter *sqlplugin.TasksFilter) (sql.Result, error) {
143✔
136
        if filter.TaskIDLessThanEquals != nil {
281✔
137
                if filter.Limit == nil || *filter.Limit == 0 {
138✔
138
                        return nil, fmt.Errorf("missing limit parameter")
×
139
                }
×
140
                return pdb.driver.ExecContext(ctx, filter.ShardID, rangeDeleteTaskQry,
138✔
141
                        filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskIDLessThanEquals, *filter.Limit)
138✔
142
        }
143
        return pdb.driver.ExecContext(ctx, filter.ShardID, deleteTaskQry, filter.DomainID, filter.TaskListName, filter.TaskType, *filter.TaskID)
5✔
144
}
145

146
func (pdb *db) GetTasksCount(ctx context.Context, filter *sqlplugin.TasksFilter) (int64, error) {
1,284✔
147
        var size []int64
1,284✔
148
        if err := pdb.driver.SelectContext(ctx, filter.ShardID, &size, getTasksCountQry, filter.DomainID, filter.TaskListName, filter.TaskType, *filter.MinTaskID); err != nil {
1,284✔
UNCOV
149
                return 0, err
×
UNCOV
150
        }
×
151
        return size[0], nil
1,284✔
152
}
153

154
func (pdb *db) GetOrphanTasks(ctx context.Context, filter *sqlplugin.OrphanTasksFilter) ([]sqlplugin.TaskKeyRow, error) {
3✔
155
        if filter.Limit == nil || *filter.Limit == 0 {
3✔
156
                return nil, fmt.Errorf("missing limit parameter")
×
157
        }
×
158
        var rows []sqlplugin.TaskKeyRow
3✔
159
        err := pdb.driver.SelectContext(ctx, sqlplugin.DbAllShards, &rows, getOrphanTaskQry, *filter.Limit)
3✔
160
        if err != nil {
3✔
161
                return nil, err
×
162
        }
×
163
        return rows, nil
3✔
164
}
165

166
// InsertIntoTaskLists inserts one or more rows into task_lists table
167
func (pdb *db) InsertIntoTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
464✔
168
        return pdb.driver.NamedExecContext(ctx, row.ShardID, createTaskListQry, row)
464✔
169
}
464✔
170

171
// UpdateTaskLists updates a row in task_lists table
172
func (pdb *db) UpdateTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
2,216✔
173
        return pdb.driver.NamedExecContext(ctx, row.ShardID, updateTaskListQry, row)
2,216✔
174
}
2,216✔
175

176
// SelectFromTaskLists reads one or more rows from task_lists table
177
func (pdb *db) SelectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
632✔
178
        switch {
632✔
179
        case filter.DomainID != nil && filter.Name != nil && filter.TaskType != nil:
481✔
180
                return pdb.selectFromTaskLists(ctx, filter)
481✔
181
        case filter.DomainIDGreaterThan != nil && filter.NameGreaterThan != nil && filter.TaskTypeGreaterThan != nil && filter.PageSize != nil:
151✔
182
                return pdb.rangeSelectFromTaskLists(ctx, filter)
151✔
183
        default:
×
184
                return nil, fmt.Errorf("invalid set of query filter params")
×
185
        }
186
}
187

188
func (pdb *db) selectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
481✔
189
        var err error
481✔
190
        var row sqlplugin.TaskListsRow
481✔
191
        err = pdb.driver.GetContext(ctx, filter.ShardID, &row, getTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType)
481✔
192
        if err != nil {
945✔
193
                return nil, err
464✔
194
        }
464✔
195
        return []sqlplugin.TaskListsRow{row}, err
17✔
196
}
197

198
func (pdb *db) rangeSelectFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) ([]sqlplugin.TaskListsRow, error) {
151✔
199
        var err error
151✔
200
        var rows []sqlplugin.TaskListsRow
151✔
201
        err = pdb.driver.SelectContext(ctx, filter.ShardID, &rows, listTaskListQry,
151✔
202
                filter.ShardID, *filter.DomainIDGreaterThan, *filter.NameGreaterThan, *filter.TaskTypeGreaterThan, *filter.PageSize)
151✔
203
        if err != nil {
151✔
204
                return nil, err
×
205
        }
×
206
        for i := range rows {
262✔
207
                rows[i].ShardID = filter.ShardID
111✔
208
        }
111✔
209
        return rows, nil
151✔
210
}
211

212
// DeleteFromTaskLists deletes a row from task_lists table
213
func (pdb *db) DeleteFromTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) (sql.Result, error) {
26✔
214
        return pdb.driver.ExecContext(ctx, filter.ShardID, deleteTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType, *filter.RangeID)
26✔
215
}
26✔
216

217
// LockTaskLists locks a row in task_lists table
218
func (pdb *db) LockTaskLists(ctx context.Context, filter *sqlplugin.TaskListsFilter) (int64, error) {
2,506✔
219
        var rangeID int64
2,506✔
220
        err := pdb.driver.GetContext(ctx, filter.ShardID, &rangeID, lockTaskListQry, filter.ShardID, *filter.DomainID, *filter.Name, *filter.TaskType)
2,506✔
221
        return rangeID, err
2,506✔
222
}
2,506✔
223

224
// InsertIntoTasksWithTTL is not supported in Postgres
225
func (pdb *db) InsertIntoTasksWithTTL(_ context.Context, _ []sqlplugin.TasksRowWithTTL) (sql.Result, error) {
×
226
        return nil, sqlplugin.ErrTTLNotSupported
×
227
}
×
228

229
// InsertIntoTaskListsWithTTL is not supported in Postgres
230
func (pdb *db) InsertIntoTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
×
231
        return nil, sqlplugin.ErrTTLNotSupported
×
232
}
×
233

234
// UpdateTaskListsWithTTL is not supported in Postgres
235
func (pdb *db) UpdateTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
×
236
        return nil, sqlplugin.ErrTTLNotSupported
×
237
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc