• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018d9fa8-75f8-405b-9b1e-38f93e6b0a11

12 Feb 2024 11:30PM UTC coverage: 62.748% (+0.05%) from 62.701%
018d9fa8-75f8-405b-9b1e-38f93e6b0a11

Pull #5657

buildkite

Shaddoll
Implement SignalWithStartWorkflowExecutionAsync API
Pull Request #5657: Implement SignalWithStartWorkflowExecutionAsync API

96 of 142 new or added lines in 5 files covered. (67.61%)

60 existing lines in 8 files now uncovered.

92596 of 147569 relevant lines covered (62.75%)

2318.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.1
/common/persistence/nosql/nosqlplugin/cassandra/workflow.go
1
// Copyright (c) 2021 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package cassandra
23

24
import (
25
        "context"
26
        "fmt"
27
        "strings"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        p "github.com/uber/cadence/common/persistence"
32
        "github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
33
        "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
34
        "github.com/uber/cadence/common/types"
35
)
36

37
var _ nosqlplugin.WorkflowCRUD = (*cdb)(nil)
38

39
func (db *cdb) InsertWorkflowExecutionWithTasks(
40
        ctx context.Context,
41
        currentWorkflowRequest *nosqlplugin.CurrentWorkflowWriteRequest,
42
        execution *nosqlplugin.WorkflowExecutionRequest,
43
        transferTasks []*nosqlplugin.TransferTask,
44
        crossClusterTasks []*nosqlplugin.CrossClusterTask,
45
        replicationTasks []*nosqlplugin.ReplicationTask,
46
        timerTasks []*nosqlplugin.TimerTask,
47
        shardCondition *nosqlplugin.ShardCondition,
48
) error {
168✔
49
        shardID := shardCondition.ShardID
168✔
50
        domainID := execution.DomainID
168✔
51
        workflowID := execution.WorkflowID
168✔
52

168✔
53
        batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
168✔
54

168✔
55
        err := createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest)
168✔
56
        if err != nil {
168✔
57
                return err
×
58
        }
×
59

60
        err = createWorkflowExecutionWithMergeMaps(batch, shardID, domainID, workflowID, execution)
168✔
61
        if err != nil {
168✔
62
                return err
×
63
        }
×
64

65
        err = createTransferTasks(batch, shardID, domainID, workflowID, transferTasks)
168✔
66
        if err != nil {
168✔
67
                return err
×
68
        }
×
69
        err = createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks)
168✔
70
        if err != nil {
168✔
71
                return err
×
72
        }
×
73
        err = createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks)
168✔
74
        if err != nil {
168✔
75
                return err
×
76
        }
×
77
        err = createTimerTasks(batch, shardID, domainID, workflowID, timerTasks)
168✔
78
        if err != nil {
168✔
79
                return err
×
80
        }
×
81

82
        err = assertShardRangeID(batch, shardID, shardCondition.RangeID)
168✔
83
        if err != nil {
168✔
84
                return err
×
85
        }
×
86

87
        return executeCreateWorkflowBatchTransaction(db.session, batch, currentWorkflowRequest, execution, shardCondition)
168✔
88
}
89

90
func (db *cdb) SelectCurrentWorkflow(
91
        ctx context.Context,
92
        shardID int, domainID, workflowID string,
93
) (*nosqlplugin.CurrentWorkflowRow, error) {
61✔
94
        query := db.session.Query(templateGetCurrentExecutionQuery,
61✔
95
                shardID,
61✔
96
                rowTypeExecution,
61✔
97
                domainID,
61✔
98
                workflowID,
61✔
99
                permanentRunID,
61✔
100
                defaultVisibilityTimestamp,
61✔
101
                rowTypeExecutionTaskID,
61✔
102
        ).WithContext(ctx)
61✔
103

61✔
104
        result := make(map[string]interface{})
61✔
105
        if err := query.MapScan(result); err != nil {
65✔
106
                return nil, err
4✔
107
        }
4✔
108

109
        currentRunID := result["current_run_id"].(gocql.UUID).String()
58✔
110
        executionInfo := parseWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
58✔
111
        lastWriteVersion := common.EmptyVersion
58✔
112
        if result["workflow_last_write_version"] != nil {
116✔
113
                lastWriteVersion = result["workflow_last_write_version"].(int64)
58✔
114
        }
58✔
115
        return &nosqlplugin.CurrentWorkflowRow{
58✔
116
                ShardID:          shardID,
58✔
117
                DomainID:         domainID,
58✔
118
                WorkflowID:       workflowID,
58✔
119
                RunID:            currentRunID,
58✔
120
                CreateRequestID:  executionInfo.CreateRequestID,
58✔
121
                State:            executionInfo.State,
58✔
122
                CloseStatus:      executionInfo.CloseStatus,
58✔
123
                LastWriteVersion: lastWriteVersion,
58✔
124
        }, nil
58✔
125
}
126

127
func (db *cdb) UpdateWorkflowExecutionWithTasks(
128
        ctx context.Context,
129
        currentWorkflowRequest *nosqlplugin.CurrentWorkflowWriteRequest,
130
        mutatedExecution *nosqlplugin.WorkflowExecutionRequest,
131
        insertedExecution *nosqlplugin.WorkflowExecutionRequest,
132
        resetExecution *nosqlplugin.WorkflowExecutionRequest,
133
        transferTasks []*nosqlplugin.TransferTask,
134
        crossClusterTasks []*nosqlplugin.CrossClusterTask,
135
        replicationTasks []*nosqlplugin.ReplicationTask,
136
        timerTasks []*nosqlplugin.TimerTask,
137
        shardCondition *nosqlplugin.ShardCondition,
138
) error {
1,445✔
139
        shardID := shardCondition.ShardID
1,445✔
140
        var domainID, workflowID string
1,445✔
141
        var previousNextEventIDCondition int64
1,445✔
142
        if mutatedExecution != nil {
2,890✔
143
                domainID = mutatedExecution.DomainID
1,445✔
144
                workflowID = mutatedExecution.WorkflowID
1,445✔
145
                previousNextEventIDCondition = *mutatedExecution.PreviousNextEventIDCondition
1,445✔
146
        } else if resetExecution != nil {
1,447✔
147
                domainID = resetExecution.DomainID
1✔
148
                workflowID = resetExecution.WorkflowID
1✔
149
                previousNextEventIDCondition = *resetExecution.PreviousNextEventIDCondition
1✔
150
        } else {
1✔
151
                return fmt.Errorf("at least one of mutatedExecution and resetExecution should be provided")
×
152
        }
×
153

154
        batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
1,445✔
155

1,445✔
156
        err := createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest)
1,445✔
157
        if err != nil {
1,445✔
158
                return err
×
159
        }
×
160

161
        if mutatedExecution != nil {
2,890✔
162
                err = updateWorkflowExecutionAndEventBufferWithMergeAndDeleteMaps(batch, shardID, domainID, workflowID, mutatedExecution)
1,445✔
163
                if err != nil {
1,445✔
164
                        return err
×
165
                }
×
166
        }
167

168
        if insertedExecution != nil {
1,503✔
169
                err = createWorkflowExecutionWithMergeMaps(batch, shardID, domainID, workflowID, insertedExecution)
58✔
170
                if err != nil {
58✔
171
                        return err
×
172
                }
×
173
        }
174

175
        if resetExecution != nil {
1,446✔
176
                err = resetWorkflowExecutionAndMapsAndEventBuffer(batch, shardID, domainID, workflowID, resetExecution)
1✔
177
                if err != nil {
1✔
178
                        return err
×
179
                }
×
180
        }
181

182
        err = createTransferTasks(batch, shardID, domainID, workflowID, transferTasks)
1,445✔
183
        if err != nil {
1,445✔
184
                return err
×
185
        }
×
186
        err = createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks)
1,445✔
187
        if err != nil {
1,445✔
188
                return err
×
189
        }
×
190
        err = createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks)
1,445✔
191
        if err != nil {
1,445✔
192
                return err
×
193
        }
×
194
        err = createTimerTasks(batch, shardID, domainID, workflowID, timerTasks)
1,445✔
195
        if err != nil {
1,445✔
196
                return err
×
197
        }
×
198

199
        err = assertShardRangeID(batch, shardID, shardCondition.RangeID)
1,445✔
200
        if err != nil {
1,445✔
201
                return err
×
202
        }
×
203

204
        return executeUpdateWorkflowBatchTransaction(db.session, batch, currentWorkflowRequest, previousNextEventIDCondition, shardCondition)
1,445✔
205
}
206

207
func (db *cdb) SelectWorkflowExecution(ctx context.Context, shardID int, domainID, workflowID, runID string) (*nosqlplugin.WorkflowExecution, error) {
399✔
208
        query := db.session.Query(templateGetWorkflowExecutionQuery,
399✔
209
                shardID,
399✔
210
                rowTypeExecution,
399✔
211
                domainID,
399✔
212
                workflowID,
399✔
213
                runID,
399✔
214
                defaultVisibilityTimestamp,
399✔
215
                rowTypeExecutionTaskID,
399✔
216
        ).WithContext(ctx)
399✔
217

399✔
218
        result := make(map[string]interface{})
399✔
219
        if err := query.MapScan(result); err != nil {
555✔
220
                return nil, err
156✔
221
        }
156✔
222

223
        state := &nosqlplugin.WorkflowExecution{}
244✔
224
        info := parseWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
244✔
225
        state.ExecutionInfo = info
244✔
226
        state.VersionHistories = p.NewDataBlob(result["version_histories"].([]byte), common.EncodingType(result["version_histories_encoding"].(string)))
244✔
227
        // TODO: remove this after all 2DC workflows complete
244✔
228
        replicationState := parseReplicationState(result["replication_state"].(map[string]interface{}))
244✔
229
        state.ReplicationState = replicationState
244✔
230

244✔
231
        activityInfos := make(map[int64]*p.InternalActivityInfo)
244✔
232
        aMap := result["activity_map"].(map[int64]map[string]interface{})
244✔
233
        for key, value := range aMap {
260✔
234
                info := parseActivityInfo(domainID, value)
16✔
235
                activityInfos[key] = info
16✔
236
        }
16✔
237
        state.ActivityInfos = activityInfos
244✔
238

244✔
239
        timerInfos := make(map[string]*p.TimerInfo)
244✔
240
        tMap := result["timer_map"].(map[string]map[string]interface{})
244✔
241
        for key, value := range tMap {
244✔
242
                info := parseTimerInfo(value)
×
243
                timerInfos[key] = info
×
244
        }
×
245
        state.TimerInfos = timerInfos
244✔
246

244✔
247
        childExecutionInfos := make(map[int64]*p.InternalChildExecutionInfo)
244✔
248
        cMap := result["child_executions_map"].(map[int64]map[string]interface{})
244✔
249
        for key, value := range cMap {
245✔
250
                info := parseChildExecutionInfo(value)
1✔
251
                childExecutionInfos[key] = info
1✔
252
        }
1✔
253
        state.ChildExecutionInfos = childExecutionInfos
244✔
254

244✔
255
        requestCancelInfos := make(map[int64]*p.RequestCancelInfo)
244✔
256
        rMap := result["request_cancel_map"].(map[int64]map[string]interface{})
244✔
257
        for key, value := range rMap {
244✔
258
                info := parseRequestCancelInfo(value)
×
259
                requestCancelInfos[key] = info
×
260
        }
×
261
        state.RequestCancelInfos = requestCancelInfos
244✔
262

244✔
263
        signalInfos := make(map[int64]*p.SignalInfo)
244✔
264
        sMap := result["signal_map"].(map[int64]map[string]interface{})
244✔
265
        for key, value := range sMap {
244✔
UNCOV
266
                info := parseSignalInfo(value)
×
UNCOV
267
                signalInfos[key] = info
×
UNCOV
268
        }
×
269
        state.SignalInfos = signalInfos
244✔
270

244✔
271
        signalRequestedIDs := make(map[string]struct{})
244✔
272
        sList := mustConvertToSlice(result["signal_requested"])
244✔
273
        for _, v := range sList {
254✔
274
                signalRequestedIDs[v.(gocql.UUID).String()] = struct{}{}
10✔
275
        }
10✔
276
        state.SignalRequestedIDs = signalRequestedIDs
244✔
277

244✔
278
        eList := result["buffered_events_list"].([]map[string]interface{})
244✔
279
        bufferedEventsBlobs := make([]*p.DataBlob, 0, len(eList))
244✔
280
        for _, v := range eList {
245✔
281
                blob := parseHistoryEventBatchBlob(v)
1✔
282
                bufferedEventsBlobs = append(bufferedEventsBlobs, blob)
1✔
283
        }
1✔
284
        state.BufferedEvents = bufferedEventsBlobs
244✔
285

244✔
286
        state.Checksum = parseChecksum(result["checksum"].(map[string]interface{}))
244✔
287
        return state, nil
244✔
288
}
289

290
func (db *cdb) DeleteCurrentWorkflow(ctx context.Context, shardID int, domainID, workflowID, currentRunIDCondition string) error {
18✔
291
        query := db.session.Query(templateDeleteWorkflowExecutionCurrentRowQuery,
18✔
292
                shardID,
18✔
293
                rowTypeExecution,
18✔
294
                domainID,
18✔
295
                workflowID,
18✔
296
                permanentRunID,
18✔
297
                defaultVisibilityTimestamp,
18✔
298
                rowTypeExecutionTaskID,
18✔
299
                currentRunIDCondition,
18✔
300
        ).WithContext(ctx)
18✔
301

18✔
302
        return db.executeWithConsistencyAll(query)
18✔
303
}
18✔
304

305
func (db *cdb) DeleteWorkflowExecution(ctx context.Context, shardID int, domainID, workflowID, runID string) error {
18✔
306
        query := db.session.Query(templateDeleteWorkflowExecutionMutableStateQuery,
18✔
307
                shardID,
18✔
308
                rowTypeExecution,
18✔
309
                domainID,
18✔
310
                workflowID,
18✔
311
                runID,
18✔
312
                defaultVisibilityTimestamp,
18✔
313
                rowTypeExecutionTaskID,
18✔
314
        ).WithContext(ctx)
18✔
315

18✔
316
        return db.executeWithConsistencyAll(query)
18✔
317
}
18✔
318

319
func (db *cdb) SelectAllCurrentWorkflows(ctx context.Context, shardID int, pageToken []byte, pageSize int) ([]*p.CurrentWorkflowExecution, []byte, error) {
×
320
        query := db.session.Query(
×
321
                templateListCurrentExecutionsQuery,
×
322
                shardID,
×
323
                rowTypeExecution,
×
324
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
×
325

×
326
        iter := query.Iter()
×
327
        if iter == nil {
×
328
                return nil, nil, &types.InternalServiceError{
×
329
                        Message: "SelectAllCurrentWorkflows operation failed. Not able to create query iterator.",
×
330
                }
×
331
        }
×
332
        result := make(map[string]interface{})
×
333
        var executions []*p.CurrentWorkflowExecution
×
334
        for iter.MapScan(result) {
×
335
                runID := result["run_id"].(gocql.UUID).String()
×
336
                if runID != permanentRunID {
×
337
                        result = make(map[string]interface{})
×
338
                        continue
×
339
                }
340
                executions = append(executions, &p.CurrentWorkflowExecution{
×
341
                        DomainID:     result["domain_id"].(gocql.UUID).String(),
×
342
                        WorkflowID:   result["workflow_id"].(string),
×
343
                        RunID:        permanentRunID,
×
344
                        State:        result["workflow_state"].(int),
×
345
                        CurrentRunID: result["current_run_id"].(gocql.UUID).String(),
×
346
                })
×
347
                result = make(map[string]interface{})
×
348
        }
349
        nextPageToken := getNextPageToken(iter)
×
350

×
351
        err := iter.Close()
×
352
        return executions, nextPageToken, err
×
353
}
354

355
func (db *cdb) SelectAllWorkflowExecutions(ctx context.Context, shardID int, pageToken []byte, pageSize int) ([]*p.InternalListConcreteExecutionsEntity, []byte, error) {
×
356
        query := db.session.Query(
×
357
                templateListWorkflowExecutionQuery,
×
358
                shardID,
×
359
                rowTypeExecution,
×
360
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
×
361

×
362
        iter := query.Iter()
×
363
        if iter == nil {
×
364
                return nil, nil, &types.InternalServiceError{
×
365
                        Message: "SelectAllWorkflowExecutions operation failed.  Not able to create query iterator.",
×
366
                }
×
367
        }
×
368

369
        result := make(map[string]interface{})
×
370
        var executions []*p.InternalListConcreteExecutionsEntity
×
371
        for iter.MapScan(result) {
×
372
                runID := result["run_id"].(gocql.UUID).String()
×
373
                if runID == permanentRunID {
×
374
                        result = make(map[string]interface{})
×
375
                        continue
×
376
                }
377
                executions = append(executions, &p.InternalListConcreteExecutionsEntity{
×
378
                        ExecutionInfo:    parseWorkflowExecutionInfo(result["execution"].(map[string]interface{})),
×
379
                        VersionHistories: p.NewDataBlob(result["version_histories"].([]byte), common.EncodingType(result["version_histories_encoding"].(string))),
×
380
                })
×
381
                result = make(map[string]interface{})
×
382
        }
383
        nextPageToken := getNextPageToken(iter)
×
384

×
385
        if err := iter.Close(); err != nil {
×
386
                return nil, nil, err
×
387
        }
×
388
        return executions, nextPageToken, nil
×
389
}
390

391
func (db *cdb) IsWorkflowExecutionExists(ctx context.Context, shardID int, domainID, workflowID, runID string) (bool, error) {
×
392
        query := db.session.Query(templateIsWorkflowExecutionExistsQuery,
×
393
                shardID,
×
394
                rowTypeExecution,
×
395
                domainID,
×
396
                workflowID,
×
397
                runID,
×
398
                defaultVisibilityTimestamp,
×
399
                rowTypeExecutionTaskID,
×
400
        ).WithContext(ctx)
×
401

×
402
        result := make(map[string]interface{})
×
403
        if err := query.MapScan(result); err != nil {
×
404
                if db.client.IsNotFoundError(err) {
×
405
                        return false, nil
×
406
                }
×
407

408
                return false, err
×
409
        }
410
        return true, nil
×
411
}
412

413
func (db *cdb) SelectTransferTasksOrderByTaskID(ctx context.Context, shardID, pageSize int, pageToken []byte, exclusiveMinTaskID, inclusiveMaxTaskID int64) ([]*nosqlplugin.TransferTask, []byte, error) {
747✔
414
        // Reading transfer tasks need to be quorum level consistent, otherwise we could loose task
747✔
415
        query := db.session.Query(templateGetTransferTasksQuery,
747✔
416
                shardID,
747✔
417
                rowTypeTransferTask,
747✔
418
                rowTypeTransferDomainID,
747✔
419
                rowTypeTransferWorkflowID,
747✔
420
                rowTypeTransferRunID,
747✔
421
                defaultVisibilityTimestamp,
747✔
422
                exclusiveMinTaskID,
747✔
423
                inclusiveMaxTaskID,
747✔
424
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
747✔
425

747✔
426
        iter := query.Iter()
747✔
427
        if iter == nil {
747✔
428
                return nil, nil, &types.InternalServiceError{
×
429
                        Message: "SelectTransferTasksOrderByTaskID operation failed.  Not able to create query iterator.",
×
430
                }
×
431
        }
×
432

433
        var tasks []*nosqlplugin.TransferTask
747✔
434
        task := make(map[string]interface{})
747✔
435
        for iter.MapScan(task) {
2,338✔
436
                t := parseTransferTaskInfo(task["transfer"].(map[string]interface{}))
1,591✔
437
                // Reset task map to get it ready for next scan
1,591✔
438
                task = make(map[string]interface{})
1,591✔
439

1,591✔
440
                tasks = append(tasks, t)
1,591✔
441
        }
1,591✔
442
        nextPageToken := getNextPageToken(iter)
747✔
443

747✔
444
        err := iter.Close()
747✔
445
        return tasks, nextPageToken, err
747✔
446
}
447

448
func (db *cdb) DeleteTransferTask(ctx context.Context, shardID int, taskID int64) error {
×
449
        query := db.session.Query(templateCompleteTransferTaskQuery,
×
450
                shardID,
×
451
                rowTypeTransferTask,
×
452
                rowTypeTransferDomainID,
×
453
                rowTypeTransferWorkflowID,
×
454
                rowTypeTransferRunID,
×
455
                defaultVisibilityTimestamp,
×
456
                taskID,
×
457
        ).WithContext(ctx)
×
458

×
459
        return db.executeWithConsistencyAll(query)
×
460
}
×
461

462
func (db *cdb) RangeDeleteTransferTasks(ctx context.Context, shardID int, exclusiveBeginTaskID, inclusiveEndTaskID int64) error {
28✔
463
        query := db.session.Query(templateRangeCompleteTransferTaskQuery,
28✔
464
                shardID,
28✔
465
                rowTypeTransferTask,
28✔
466
                rowTypeTransferDomainID,
28✔
467
                rowTypeTransferWorkflowID,
28✔
468
                rowTypeTransferRunID,
28✔
469
                defaultVisibilityTimestamp,
28✔
470
                exclusiveBeginTaskID,
28✔
471
                inclusiveEndTaskID,
28✔
472
        ).WithContext(ctx)
28✔
473

28✔
474
        return db.executeWithConsistencyAll(query)
28✔
475
}
28✔
476

477
func (db *cdb) SelectTimerTasksOrderByVisibilityTime(ctx context.Context, shardID, pageSize int, pageToken []byte, inclusiveMinTime, exclusiveMaxTime time.Time) ([]*nosqlplugin.TimerTask, []byte, error) {
1,017✔
478
        // Reading timer tasks need to be quorum level consistent, otherwise we could loose task
1,017✔
479
        minTimestamp := p.UnixNanoToDBTimestamp(inclusiveMinTime.UnixNano())
1,017✔
480
        maxTimestamp := p.UnixNanoToDBTimestamp(exclusiveMaxTime.UnixNano())
1,017✔
481
        query := db.session.Query(templateGetTimerTasksQuery,
1,017✔
482
                shardID,
1,017✔
483
                rowTypeTimerTask,
1,017✔
484
                rowTypeTimerDomainID,
1,017✔
485
                rowTypeTimerWorkflowID,
1,017✔
486
                rowTypeTimerRunID,
1,017✔
487
                minTimestamp,
1,017✔
488
                maxTimestamp,
1,017✔
489
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
1,017✔
490

1,017✔
491
        iter := query.Iter()
1,017✔
492
        if iter == nil {
1,017✔
493
                return nil, nil, &types.InternalServiceError{
×
494
                        Message: "SelectTimerTasksOrderByVisibilityTime operation failed.  Not able to create query iterator.",
×
495
                }
×
496
        }
×
497

498
        var timers []*nosqlplugin.TimerTask
1,017✔
499
        task := make(map[string]interface{})
1,017✔
500
        for iter.MapScan(task) {
4,757✔
501
                t := parseTimerTaskInfo(task["timer"].(map[string]interface{}))
3,740✔
502
                // Reset task map to get it ready for next scan
3,740✔
503
                task = make(map[string]interface{})
3,740✔
504

3,740✔
505
                timers = append(timers, t)
3,740✔
506
        }
3,740✔
507
        nextPageToken := getNextPageToken(iter)
1,017✔
508

1,017✔
509
        err := iter.Close()
1,017✔
510
        return timers, nextPageToken, err
1,017✔
511
}
512

513
func (db *cdb) DeleteTimerTask(ctx context.Context, shardID int, taskID int64, visibilityTimestamp time.Time) error {
×
514
        ts := p.UnixNanoToDBTimestamp(visibilityTimestamp.UnixNano())
×
515
        query := db.session.Query(templateCompleteTimerTaskQuery,
×
516
                shardID,
×
517
                rowTypeTimerTask,
×
518
                rowTypeTimerDomainID,
×
519
                rowTypeTimerWorkflowID,
×
520
                rowTypeTimerRunID,
×
521
                ts,
×
522
                taskID,
×
523
        ).WithContext(ctx)
×
524

×
525
        return db.executeWithConsistencyAll(query)
×
526
}
×
527

528
func (db *cdb) RangeDeleteTimerTasks(ctx context.Context, shardID int, inclusiveMinTime, exclusiveMaxTime time.Time) error {
9✔
529
        start := p.UnixNanoToDBTimestamp(inclusiveMinTime.UnixNano())
9✔
530
        end := p.UnixNanoToDBTimestamp(exclusiveMaxTime.UnixNano())
9✔
531
        query := db.session.Query(templateRangeCompleteTimerTaskQuery,
9✔
532
                shardID,
9✔
533
                rowTypeTimerTask,
9✔
534
                rowTypeTimerDomainID,
9✔
535
                rowTypeTimerWorkflowID,
9✔
536
                rowTypeTimerRunID,
9✔
537
                start,
9✔
538
                end,
9✔
539
        ).WithContext(ctx)
9✔
540

9✔
541
        return db.executeWithConsistencyAll(query)
9✔
542
}
9✔
543

544
func (db *cdb) SelectReplicationTasksOrderByTaskID(ctx context.Context, shardID, pageSize int, pageToken []byte, exclusiveMinTaskID, inclusiveMaxTaskID int64) ([]*nosqlplugin.ReplicationTask, []byte, error) {
29✔
545
        // Reading replication tasks need to be quorum level consistent, otherwise we could loose task
29✔
546
        query := db.session.Query(templateGetReplicationTasksQuery,
29✔
547
                shardID,
29✔
548
                rowTypeReplicationTask,
29✔
549
                rowTypeReplicationDomainID,
29✔
550
                rowTypeReplicationWorkflowID,
29✔
551
                rowTypeReplicationRunID,
29✔
552
                defaultVisibilityTimestamp,
29✔
553
                exclusiveMinTaskID,
29✔
554
                inclusiveMaxTaskID,
29✔
555
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
29✔
556
        return populateGetReplicationTasks(query)
29✔
557
}
29✔
558

559
func (db *cdb) DeleteReplicationTask(ctx context.Context, shardID int, taskID int64) error {
×
560
        query := db.session.Query(templateCompleteReplicationTaskQuery,
×
561
                shardID,
×
562
                rowTypeReplicationTask,
×
563
                rowTypeReplicationDomainID,
×
564
                rowTypeReplicationWorkflowID,
×
565
                rowTypeReplicationRunID,
×
566
                defaultVisibilityTimestamp,
×
567
                taskID,
×
568
        ).WithContext(ctx)
×
569

×
570
        return db.executeWithConsistencyAll(query)
×
571
}
×
572

573
func (db *cdb) RangeDeleteReplicationTasks(ctx context.Context, shardID int, inclusiveEndTaskID int64) error {
31✔
574
        query := db.session.Query(templateCompleteReplicationTaskBeforeQuery,
31✔
575
                shardID,
31✔
576
                rowTypeReplicationTask,
31✔
577
                rowTypeReplicationDomainID,
31✔
578
                rowTypeReplicationWorkflowID,
31✔
579
                rowTypeReplicationRunID,
31✔
580
                defaultVisibilityTimestamp,
31✔
581
                inclusiveEndTaskID,
31✔
582
        ).WithContext(ctx)
31✔
583

31✔
584
        return db.executeWithConsistencyAll(query)
31✔
585
}
31✔
586

587
func (db *cdb) SelectCrossClusterTasksOrderByTaskID(ctx context.Context, shardID, pageSize int, pageToken []byte, targetCluster string, exclusiveMinTaskID, inclusiveMaxTaskID int64) ([]*nosqlplugin.CrossClusterTask, []byte, error) {
48✔
588
        // Reading cross-cluster tasks need to be quorum level consistent, otherwise we could loose task
48✔
589
        query := db.session.Query(templateGetCrossClusterTasksQuery,
48✔
590
                shardID,
48✔
591
                rowTypeCrossClusterTask,
48✔
592
                rowTypeCrossClusterDomainID,
48✔
593
                targetCluster, // workflowID field is used to store target cluster
48✔
594
                rowTypeCrossClusterRunID,
48✔
595
                defaultVisibilityTimestamp,
48✔
596
                exclusiveMinTaskID,
48✔
597
                inclusiveMaxTaskID,
48✔
598
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
48✔
599

48✔
600
        iter := query.Iter()
48✔
601
        if iter == nil {
48✔
602
                return nil, nil, &types.InternalServiceError{
×
603
                        Message: "SelectCrossClusterTasksOrderByTaskID operation failed.  Not able to create query iterator.",
×
604
                }
×
605
        }
×
606

607
        var tasks []*nosqlplugin.CrossClusterTask
48✔
608
        task := make(map[string]interface{})
48✔
609
        for iter.MapScan(task) {
48✔
610
                t := parseCrossClusterTaskInfo(task["cross_cluster"].(map[string]interface{}))
×
611
                // Reset task map to get it ready for next scan
×
612
                task = make(map[string]interface{})
×
613

×
614
                tasks = append(tasks, &nosqlplugin.CrossClusterTask{
×
615
                        TransferTask:  *t,
×
616
                        TargetCluster: targetCluster,
×
617
                })
×
618
        }
×
619
        nextPageToken := getNextPageToken(iter)
48✔
620
        err := iter.Close()
48✔
621
        return tasks, nextPageToken, err
48✔
622
}
623

624
func (db *cdb) DeleteCrossClusterTask(ctx context.Context, shardID int, targetCluster string, taskID int64) error {
×
625
        query := db.session.Query(templateCompleteCrossClusterTaskQuery,
×
626
                shardID,
×
627
                rowTypeCrossClusterTask,
×
628
                rowTypeCrossClusterDomainID,
×
629
                targetCluster,
×
630
                rowTypeCrossClusterRunID,
×
631
                defaultVisibilityTimestamp,
×
632
                taskID,
×
633
        ).WithContext(ctx)
×
634

×
635
        return db.executeWithConsistencyAll(query)
×
636
}
×
637

638
func (db *cdb) RangeDeleteCrossClusterTasks(ctx context.Context, shardID int, targetCluster string, exclusiveBeginTaskID, inclusiveEndTaskID int64) error {
40✔
639
        query := db.session.Query(templateRangeCompleteCrossClusterTaskQuery,
40✔
640
                shardID,
40✔
641
                rowTypeCrossClusterTask,
40✔
642
                rowTypeCrossClusterDomainID,
40✔
643
                targetCluster,
40✔
644
                rowTypeCrossClusterRunID,
40✔
645
                defaultVisibilityTimestamp,
40✔
646
                exclusiveBeginTaskID,
40✔
647
                inclusiveEndTaskID,
40✔
648
        ).WithContext(ctx)
40✔
649

40✔
650
        return query.Exec()
40✔
651
}
40✔
652

653
func (db *cdb) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task nosqlplugin.ReplicationTask) error {
1✔
654
        // Use source cluster name as the workflow id for replication dlq
1✔
655
        query := db.session.Query(templateCreateReplicationTaskQuery,
1✔
656
                shardID,
1✔
657
                rowTypeDLQ,
1✔
658
                rowTypeDLQDomainID,
1✔
659
                sourceCluster,
1✔
660
                rowTypeDLQRunID,
1✔
661
                task.DomainID,
1✔
662
                task.WorkflowID,
1✔
663
                task.RunID,
1✔
664
                task.TaskID,
1✔
665
                task.TaskType,
1✔
666
                task.FirstEventID,
1✔
667
                task.NextEventID,
1✔
668
                task.Version,
1✔
669
                task.ScheduledID,
1✔
670
                p.EventStoreVersion,
1✔
671
                task.BranchToken,
1✔
672
                p.EventStoreVersion,
1✔
673
                task.NewRunBranchToken,
1✔
674
                defaultVisibilityTimestamp,
1✔
675
                defaultVisibilityTimestamp,
1✔
676
                task.TaskID,
1✔
677
        ).WithContext(ctx)
1✔
678

1✔
679
        return query.Exec()
1✔
680
}
1✔
681

682
func (db *cdb) SelectReplicationDLQTasksOrderByTaskID(ctx context.Context, shardID int, sourceCluster string, pageSize int, pageToken []byte, exclusiveMinTaskID, inclusiveMaxTaskID int64) ([]*nosqlplugin.ReplicationTask, []byte, error) {
1✔
683
        // Reading replication tasks need to be quorum level consistent, otherwise we could loose task
1✔
684
        query := db.session.Query(templateGetReplicationTasksQuery,
1✔
685
                shardID,
1✔
686
                rowTypeDLQ,
1✔
687
                rowTypeDLQDomainID,
1✔
688
                sourceCluster,
1✔
689
                rowTypeDLQRunID,
1✔
690
                defaultVisibilityTimestamp,
1✔
691
                exclusiveMinTaskID,
1✔
692
                inclusiveMaxTaskID,
1✔
693
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
1✔
694

1✔
695
        return populateGetReplicationTasks(query)
1✔
696
}
1✔
697

698
func (db *cdb) SelectReplicationDLQTasksCount(ctx context.Context, shardID int, sourceCluster string) (int64, error) {
4✔
699
        // Reading replication tasks need to be quorum level consistent, otherwise we could loose task
4✔
700
        query := db.session.Query(templateGetDLQSizeQuery,
4✔
701
                shardID,
4✔
702
                rowTypeDLQ,
4✔
703
                rowTypeDLQDomainID,
4✔
704
                sourceCluster,
4✔
705
                rowTypeDLQRunID,
4✔
706
        ).WithContext(ctx)
4✔
707

4✔
708
        result := make(map[string]interface{})
4✔
709
        if err := query.MapScan(result); err != nil {
4✔
710
                return -1, err
×
711
        }
×
712

713
        queueSize := result["count"].(int64)
4✔
714
        return queueSize, nil
4✔
715
}
716

717
func (db *cdb) DeleteReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, taskID int64) error {
×
718
        query := db.session.Query(templateCompleteReplicationTaskQuery,
×
719
                shardID,
×
720
                rowTypeDLQ,
×
721
                rowTypeDLQDomainID,
×
722
                sourceCluster,
×
723
                rowTypeDLQRunID,
×
724
                defaultVisibilityTimestamp,
×
725
                taskID,
×
726
        ).WithContext(ctx)
×
727

×
728
        return db.executeWithConsistencyAll(query)
×
729
}
×
730

731
func (db *cdb) RangeDeleteReplicationDLQTasks(ctx context.Context, shardID int, sourceCluster string, exclusiveBeginTaskID, inclusiveEndTaskID int64) error {
×
732
        query := db.session.Query(templateRangeCompleteReplicationTaskQuery,
×
733
                shardID,
×
734
                rowTypeDLQ,
×
735
                rowTypeDLQDomainID,
×
736
                sourceCluster,
×
737
                rowTypeDLQRunID,
×
738
                defaultVisibilityTimestamp,
×
739
                exclusiveBeginTaskID,
×
740
                inclusiveEndTaskID,
×
741
        ).WithContext(ctx)
×
742

×
743
        return db.executeWithConsistencyAll(query)
×
744
}
×
745

746
func (db *cdb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.ReplicationTask, shardCondition nosqlplugin.ShardCondition) error {
×
747
        if len(tasks) == 0 {
×
748
                return nil
×
749
        }
×
750

751
        shardID := shardCondition.ShardID
×
752
        batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
×
753
        for _, task := range tasks {
×
754
                err := createReplicationTasks(batch, shardID, task.DomainID, task.WorkflowID, []*nosqlplugin.ReplicationTask{task})
×
755
                if err != nil {
×
756
                        return err
×
757
                }
×
758
        }
759

760
        err := assertShardRangeID(batch, shardID, shardCondition.RangeID)
×
761
        if err != nil {
×
762
                return err
×
763
        }
×
764

765
        previous := make(map[string]interface{})
×
766
        applied, iter, err := db.session.MapExecuteBatchCAS(batch, previous)
×
767
        defer func() {
×
768
                if iter != nil {
×
769
                        _ = iter.Close()
×
770
                }
×
771
        }()
772
        if err != nil {
×
773
                return err
×
774
        }
×
775

776
        if !applied {
×
777
                rowType, ok := previous["type"].(int)
×
778
                if !ok {
×
779
                        // This should never happen, as all our rows have the type field.
×
780
                        panic("Encounter row type not found")
×
781
                }
782
                if rowType == rowTypeShard {
×
783
                        if actualRangeID, ok := previous["range_id"].(int64); ok && actualRangeID != shardCondition.RangeID {
×
784
                                // CreateWorkflowExecution failed because rangeID was modified
×
785
                                return &nosqlplugin.ShardOperationConditionFailure{
×
786
                                        RangeID: actualRangeID,
×
787
                                }
×
788
                        }
×
789
                }
790

791
                // At this point we only know that the write was not applied.
792
                // It's much safer to return ShardOperationConditionFailure(which will become ShardOwnershipLostError later) as the default to force the application to reload
793
                // shard to recover from such errors
794
                var columns []string
×
795
                for k, v := range previous {
×
796
                        columns = append(columns, fmt.Sprintf("%s=%v", k, v))
×
797
                }
×
798
                return &nosqlplugin.ShardOperationConditionFailure{
×
799
                        RangeID: -1,
×
800
                        Details: strings.Join(columns, ","),
×
801
                }
×
802
        }
803
        return nil
×
804
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc