• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.07
/common/persistence/nosql/nosqlTaskStore.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package nosql
23

24
import (
25
        "context"
26
        "fmt"
27
        "math"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/config"
32
        "github.com/uber/cadence/common/log"
33
        p "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
35
        "github.com/uber/cadence/common/types"
36
)
37

38
type (
39
        nosqlTaskStore struct {
40
                shardedNosqlStore
41
        }
42
)
43

44
const (
45
        initialRangeID    = 1 // Id of the first range of a new task list
46
        initialAckLevel   = 0
47
        stickyTaskListTTL = int64(24 * time.Hour / time.Second) // if sticky task_list stopped being updated, remove it in one day
48
)
49

50
var _ p.TaskStore = (*nosqlTaskStore)(nil)
51

52
// newNoSQLTaskStore is used to create an instance of TaskStore implementation
53
func newNoSQLTaskStore(
54
        cfg config.ShardedNoSQL,
55
        logger log.Logger,
56
        dc *p.DynamicConfiguration,
57
) (p.TaskStore, error) {
17✔
58
        s, err := newShardedNosqlStore(cfg, logger, dc)
17✔
59
        if err != nil {
17✔
60
                return nil, err
×
61
        }
×
62
        return &nosqlTaskStore{
17✔
63
                shardedNosqlStore: *s,
17✔
64
        }, nil
17✔
65
}
66

67
func (t *nosqlTaskStore) GetOrphanTasks(ctx context.Context, request *p.GetOrphanTasksRequest) (*p.GetOrphanTasksResponse, error) {
×
68
        // TODO: It's unclear if this's necessary or possible for NoSQL
×
69
        return nil, &types.InternalServiceError{
×
70
                Message: "Unimplemented call to GetOrphanTasks for NoSQL",
×
71
        }
×
72
}
×
73

74
func (t *nosqlTaskStore) LeaseTaskList(
75
        ctx context.Context,
76
        request *p.LeaseTaskListRequest,
77
) (*p.LeaseTaskListResponse, error) {
440✔
78
        if len(request.TaskList) == 0 {
440✔
79
                return nil, &types.InternalServiceError{
×
80
                        Message: "LeaseTaskList requires non empty task list",
×
81
                }
×
82
        }
×
83
        now := time.Now()
440✔
84
        var err, selectErr error
440✔
85
        var currTL *nosqlplugin.TaskListRow
440✔
86
        storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskList, request.TaskType)
440✔
87
        if err != nil {
440✔
88
                return nil, err
×
89
        }
×
90

91
        currTL, selectErr = storeShard.db.SelectTaskList(ctx, &nosqlplugin.TaskListFilter{
440✔
92
                DomainID:     request.DomainID,
440✔
93
                TaskListName: request.TaskList,
440✔
94
                TaskListType: request.TaskType,
440✔
95
        })
440✔
96

440✔
97
        if selectErr != nil {
869✔
98
                if storeShard.db.IsNotFoundError(selectErr) { // First time task list is used
858✔
99
                        currTL = &nosqlplugin.TaskListRow{
429✔
100
                                DomainID:        request.DomainID,
429✔
101
                                TaskListName:    request.TaskList,
429✔
102
                                TaskListType:    request.TaskType,
429✔
103
                                RangeID:         initialRangeID,
429✔
104
                                TaskListKind:    request.TaskListKind,
429✔
105
                                AckLevel:        initialAckLevel,
429✔
106
                                LastUpdatedTime: now,
429✔
107
                        }
429✔
108
                        err = storeShard.db.InsertTaskList(ctx, currTL)
429✔
109
                } else {
429✔
110
                        return nil, convertCommonErrors(storeShard.db, "LeaseTaskList", err)
×
111
                }
×
112
        } else {
11✔
113
                // if request.RangeID is > 0, we are trying to renew an already existing
11✔
114
                // lease on the task list. If request.RangeID=0, we are trying to steal
11✔
115
                // the tasklist from its current owner
11✔
116
                if request.RangeID > 0 && request.RangeID != currTL.RangeID {
11✔
117
                        return nil, &p.ConditionFailedError{
×
118
                                Msg: fmt.Sprintf("leaseTaskList:renew failed: taskList:%v, taskListType:%v, haveRangeID:%v, gotRangeID:%v",
×
119
                                        request.TaskList, request.TaskType, request.RangeID, currTL.RangeID),
×
120
                        }
×
121
                }
×
122

123
                // Update the rangeID as this is an ownership change
124
                currTL.RangeID++
11✔
125

11✔
126
                err = storeShard.db.UpdateTaskList(ctx, &nosqlplugin.TaskListRow{
11✔
127
                        DomainID:        request.DomainID,
11✔
128
                        TaskListName:    request.TaskList,
11✔
129
                        TaskListType:    request.TaskType,
11✔
130
                        RangeID:         currTL.RangeID,
11✔
131
                        TaskListKind:    currTL.TaskListKind,
11✔
132
                        AckLevel:        currTL.AckLevel,
11✔
133
                        LastUpdatedTime: now,
11✔
134
                }, currTL.RangeID-1)
11✔
135
        }
136
        if err != nil {
440✔
137
                conditionFailure, ok := err.(*nosqlplugin.TaskOperationConditionFailure)
×
138
                if ok {
×
139
                        return nil, &p.ConditionFailedError{
×
140
                                Msg: fmt.Sprintf("leaseTaskList: taskList:%v, taskListType:%v, haveRangeID:%v, gotRangeID:%v",
×
141
                                        request.TaskList, request.TaskType, currTL.RangeID, conditionFailure.RangeID),
×
142
                        }
×
143
                }
×
144
                return nil, convertCommonErrors(storeShard.db, "LeaseTaskList", err)
×
145
        }
146
        tli := &p.TaskListInfo{
440✔
147
                DomainID:    request.DomainID,
440✔
148
                Name:        request.TaskList,
440✔
149
                TaskType:    request.TaskType,
440✔
150
                RangeID:     currTL.RangeID,
440✔
151
                AckLevel:    currTL.AckLevel,
440✔
152
                Kind:        request.TaskListKind,
440✔
153
                LastUpdated: now,
440✔
154
        }
440✔
155
        return &p.LeaseTaskListResponse{TaskListInfo: tli}, nil
440✔
156
}
157

158
func (t *nosqlTaskStore) UpdateTaskList(
159
        ctx context.Context,
160
        request *p.UpdateTaskListRequest,
161
) (*p.UpdateTaskListResponse, error) {
1,928✔
162
        tli := request.TaskListInfo
1,928✔
163
        var err error
1,928✔
164
        taskListToUpdate := &nosqlplugin.TaskListRow{
1,928✔
165
                DomainID:        tli.DomainID,
1,928✔
166
                TaskListName:    tli.Name,
1,928✔
167
                TaskListType:    tli.TaskType,
1,928✔
168
                RangeID:         tli.RangeID,
1,928✔
169
                TaskListKind:    tli.Kind,
1,928✔
170
                AckLevel:        tli.AckLevel,
1,928✔
171
                LastUpdatedTime: time.Now(),
1,928✔
172
        }
1,928✔
173
        storeShard, err := t.GetStoreShardByTaskList(tli.DomainID, tli.Name, tli.TaskType)
1,928✔
174
        if err != nil {
1,928✔
175
                return nil, err
×
176
        }
×
177

178
        if tli.Kind == p.TaskListKindSticky { // if task_list is sticky, then update with TTL
1,969✔
179
                err = storeShard.db.UpdateTaskListWithTTL(ctx, stickyTaskListTTL, taskListToUpdate, tli.RangeID)
41✔
180
        } else {
1,928✔
181
                err = storeShard.db.UpdateTaskList(ctx, taskListToUpdate, tli.RangeID)
1,887✔
182
        }
1,887✔
183

184
        if err != nil {
1,928✔
185
                conditionFailure, ok := err.(*nosqlplugin.TaskOperationConditionFailure)
×
186
                if ok {
×
187
                        return nil, &p.ConditionFailedError{
×
188
                                Msg: fmt.Sprintf("Failed to update task list. name: %v, type: %v, rangeID: %v, columns: (%v)",
×
189
                                        tli.Name, tli.TaskType, tli.RangeID, conditionFailure.Details),
×
190
                        }
×
191
                }
×
192
                return nil, convertCommonErrors(storeShard.db, "UpdateTaskList", err)
×
193
        }
194

195
        return &p.UpdateTaskListResponse{}, nil
1,928✔
196
}
197

198
func (t *nosqlTaskStore) ListTaskList(
199
        _ context.Context,
200
        _ *p.ListTaskListRequest,
201
) (*p.ListTaskListResponse, error) {
×
202
        return nil, &types.InternalServiceError{
×
203
                Message: "unsupported operation",
×
204
        }
×
205
}
×
206

207
func (t *nosqlTaskStore) DeleteTaskList(
208
        ctx context.Context,
209
        request *p.DeleteTaskListRequest,
210
) error {
×
211
        storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskListName, request.TaskListType)
×
212
        if err != nil {
×
213
                return err
×
214
        }
×
215

216
        err = storeShard.db.DeleteTaskList(ctx, &nosqlplugin.TaskListFilter{
×
217
                DomainID:     request.DomainID,
×
218
                TaskListName: request.TaskListName,
×
219
                TaskListType: request.TaskListType,
×
220
        }, request.RangeID)
×
221

×
222
        if err != nil {
×
223
                conditionFailure, ok := err.(*nosqlplugin.TaskOperationConditionFailure)
×
224
                if ok {
×
225
                        return &p.ConditionFailedError{
×
226
                                Msg: fmt.Sprintf("Failed to delete task list. name: %v, type: %v, rangeID: %v, columns: (%v)",
×
227
                                        request.TaskListName, request.TaskListType, request.RangeID, conditionFailure.Details),
×
228
                        }
×
229
                }
×
230
                return convertCommonErrors(storeShard.db, "DeleteTaskList", err)
×
231
        }
232

233
        return nil
×
234
}
235

236
func (t *nosqlTaskStore) CreateTasks(
237
        ctx context.Context,
238
        request *p.InternalCreateTasksRequest,
239
) (*p.CreateTasksResponse, error) {
297✔
240
        now := time.Now()
297✔
241
        var tasks []*nosqlplugin.TaskRowForInsert
297✔
242
        for _, t := range request.Tasks {
596✔
243
                task := &nosqlplugin.TaskRow{
299✔
244
                        DomainID:        request.TaskListInfo.DomainID,
299✔
245
                        TaskListName:    request.TaskListInfo.Name,
299✔
246
                        TaskListType:    request.TaskListInfo.TaskType,
299✔
247
                        TaskID:          t.TaskID,
299✔
248
                        WorkflowID:      t.Execution.GetWorkflowID(),
299✔
249
                        RunID:           t.Execution.GetRunID(),
299✔
250
                        ScheduledID:     t.Data.ScheduleID,
299✔
251
                        CreatedTime:     now,
299✔
252
                        PartitionConfig: t.Data.PartitionConfig,
299✔
253
                }
299✔
254
                ttl := int(t.Data.ScheduleToStartTimeout.Seconds())
299✔
255
                tasks = append(tasks, &nosqlplugin.TaskRowForInsert{
299✔
256
                        TaskRow:    *task,
299✔
257
                        TTLSeconds: ttl,
299✔
258
                })
299✔
259
        }
299✔
260

261
        tasklistCondition := &nosqlplugin.TaskListRow{
297✔
262
                DomainID:     request.TaskListInfo.DomainID,
297✔
263
                TaskListName: request.TaskListInfo.Name,
297✔
264
                TaskListType: request.TaskListInfo.TaskType,
297✔
265
                RangeID:      request.TaskListInfo.RangeID,
297✔
266
        }
297✔
267

297✔
268
        tli := request.TaskListInfo
297✔
269
        storeShard, err := t.GetStoreShardByTaskList(tli.DomainID, tli.Name, tli.TaskType)
297✔
270
        if err != nil {
297✔
271
                return nil, err
×
272
        }
×
273

274
        err = storeShard.db.InsertTasks(ctx, tasks, tasklistCondition)
297✔
275

297✔
276
        if err != nil {
297✔
277
                conditionFailure, ok := err.(*nosqlplugin.TaskOperationConditionFailure)
×
278
                if ok {
×
279
                        return nil, &p.ConditionFailedError{
×
280
                                Msg: fmt.Sprintf("Failed to insert tasks. name: %v, type: %v, rangeID: %v, columns: (%v)",
×
281
                                        request.TaskListInfo.Name, request.TaskListInfo.TaskType, request.TaskListInfo.RangeID, conditionFailure.Details),
×
282
                        }
×
283
                }
×
284
                return nil, convertCommonErrors(storeShard.db, "CreateTasks", err)
×
285
        }
286

287
        return &p.CreateTasksResponse{}, nil
297✔
288
}
289

290
func (t *nosqlTaskStore) GetTasks(
291
        ctx context.Context,
292
        request *p.GetTasksRequest,
293
) (*p.InternalGetTasksResponse, error) {
309✔
294
        if request.MaxReadLevel == nil {
309✔
295
                request.MaxReadLevel = common.Int64Ptr(math.MaxInt64)
×
296
        }
×
297

298
        if request.ReadLevel > *request.MaxReadLevel {
309✔
299
                return &p.InternalGetTasksResponse{}, nil
×
300
        }
×
301

302
        storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskList, request.TaskType)
309✔
303
        if err != nil {
309✔
304
                return nil, err
×
305
        }
×
306

307
        resp, err := storeShard.db.SelectTasks(ctx, &nosqlplugin.TasksFilter{
309✔
308
                TaskListFilter: nosqlplugin.TaskListFilter{
309✔
309
                        DomainID:     request.DomainID,
309✔
310
                        TaskListName: request.TaskList,
309✔
311
                        TaskListType: request.TaskType,
309✔
312
                },
309✔
313
                BatchSize: request.BatchSize,
309✔
314

309✔
315
                MinTaskID: request.ReadLevel,
309✔
316
                MaxTaskID: *request.MaxReadLevel,
309✔
317
        })
309✔
318

309✔
319
        if err != nil {
309✔
320
                return nil, convertCommonErrors(storeShard.db, "GetTasks", err)
×
321
        }
×
322

323
        response := &p.InternalGetTasksResponse{}
309✔
324
        for _, t := range resp {
608✔
325
                response.Tasks = append(response.Tasks, toTaskInfo(t))
299✔
326
        }
299✔
327

328
        return response, nil
309✔
329
}
330

331
func toTaskInfo(t *nosqlplugin.TaskRow) *p.InternalTaskInfo {
299✔
332
        return &p.InternalTaskInfo{
299✔
333
                DomainID:        t.DomainID,
299✔
334
                WorkflowID:      t.WorkflowID,
299✔
335
                RunID:           t.RunID,
299✔
336
                TaskID:          t.TaskID,
299✔
337
                ScheduleID:      t.ScheduledID,
299✔
338
                CreatedTime:     t.CreatedTime,
299✔
339
                PartitionConfig: t.PartitionConfig,
299✔
340
        }
299✔
341
}
299✔
342

343
func (t *nosqlTaskStore) CompleteTask(
344
        ctx context.Context,
345
        request *p.CompleteTaskRequest,
346
) error {
×
347
        tli := request.TaskList
×
348
        storeShard, err := t.GetStoreShardByTaskList(tli.DomainID, tli.Name, tli.TaskType)
×
349
        if err != nil {
×
350
                return err
×
351
        }
×
352

353
        _, err = storeShard.db.RangeDeleteTasks(ctx, &nosqlplugin.TasksFilter{
×
354
                TaskListFilter: nosqlplugin.TaskListFilter{
×
355
                        DomainID:     tli.DomainID,
×
356
                        TaskListName: tli.Name,
×
357
                        TaskListType: request.TaskList.TaskType,
×
358
                },
×
359
                // exclusive
×
360
                MinTaskID: request.TaskID - 1,
×
361
                // inclusive
×
362
                MaxTaskID: request.TaskID,
×
363
                BatchSize: 1,
×
364
        })
×
365
        if err != nil {
×
366
                return convertCommonErrors(storeShard.db, "CompleteTask", err)
×
367
        }
×
368

369
        return nil
×
370
}
371

372
// CompleteTasksLessThan deletes all tasks less than or equal to the given task id. This API ignores the
373
// Limit request parameter i.e. either all tasks leq the task_id will be deleted or an error will
374
// be returned to the caller
375
func (t *nosqlTaskStore) CompleteTasksLessThan(
376
        ctx context.Context,
377
        request *p.CompleteTasksLessThanRequest,
378
) (*p.CompleteTasksLessThanResponse, error) {
171✔
379
        storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskListName, request.TaskType)
171✔
380
        if err != nil {
171✔
381
                return nil, err
×
382
        }
×
383

384
        num, err := storeShard.db.RangeDeleteTasks(ctx, &nosqlplugin.TasksFilter{
171✔
385
                TaskListFilter: nosqlplugin.TaskListFilter{
171✔
386
                        DomainID:     request.DomainID,
171✔
387
                        TaskListName: request.TaskListName,
171✔
388
                        TaskListType: request.TaskType,
171✔
389
                },
171✔
390

171✔
391
                // NOTE: MinTaskID is supported in plugin interfaces but not exposed in dataInterfaces/persistenceInterfaces
171✔
392
                // We may want to add it so that we can test it.
171✔
393
                // https://github.com/uber/cadence/issues/4243
171✔
394
                MinTaskID: 0,
171✔
395

171✔
396
                // NOTE: request.TaskID is also inclusive, even though the name is CompleteTasksLessThan
171✔
397
                MaxTaskID: request.TaskID,
171✔
398

171✔
399
                BatchSize: request.Limit,
171✔
400
        })
171✔
401
        if err != nil {
171✔
402
                return nil, convertCommonErrors(storeShard.db, "CompleteTasksLessThan", err)
×
403
        }
×
404
        return &p.CompleteTasksLessThanResponse{TasksCompleted: num}, nil
171✔
405
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc