• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01875e2f-959c-4c4d-87af-1d7805759bcc

08 Apr 2023 12:26AM UTC coverage: 57.178% (+0.1%) from 57.072%
01875e2f-959c-4c4d-87af-1d7805759bcc

Pull #5197

buildkite

Steven L
bad cleanup -> good cleanup
Pull Request #5197: Demonstrate a way to get rid of the cadence-idl repo

85396 of 149351 relevant lines covered (57.18%)

2283.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.0
/common/persistence/nosql/nosqlplugin/cassandra/workflow.go
1
// Copyright (c) 2021 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package cassandra
23

24
import (
25
        "context"
26
        "fmt"
27
        "strings"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        p "github.com/uber/cadence/common/persistence"
32
        "github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
33
        "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
34
        "github.com/uber/cadence/common/types"
35
)
36

37
var _ nosqlplugin.WorkflowCRUD = (*cdb)(nil)
38

39
func (db *cdb) InsertWorkflowExecutionWithTasks(
40
        ctx context.Context,
41
        currentWorkflowRequest *nosqlplugin.CurrentWorkflowWriteRequest,
42
        execution *nosqlplugin.WorkflowExecutionRequest,
43
        transferTasks []*nosqlplugin.TransferTask,
44
        crossClusterTasks []*nosqlplugin.CrossClusterTask,
45
        replicationTasks []*nosqlplugin.ReplicationTask,
46
        timerTasks []*nosqlplugin.TimerTask,
47
        shardCondition *nosqlplugin.ShardCondition,
48
) error {
168✔
49
        shardID := shardCondition.ShardID
168✔
50
        domainID := execution.DomainID
168✔
51
        workflowID := execution.WorkflowID
168✔
52

168✔
53
        batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
168✔
54

168✔
55
        err := db.createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest)
168✔
56
        if err != nil {
168✔
57
                return err
×
58
        }
×
59

60
        err = db.createWorkflowExecutionWithMergeMaps(batch, shardID, domainID, workflowID, execution)
168✔
61
        if err != nil {
168✔
62
                return err
×
63
        }
×
64

65
        err = db.createTransferTasks(batch, shardID, domainID, workflowID, transferTasks)
168✔
66
        if err != nil {
168✔
67
                return err
×
68
        }
×
69
        err = db.createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks)
168✔
70
        if err != nil {
168✔
71
                return err
×
72
        }
×
73
        err = db.createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks)
168✔
74
        if err != nil {
168✔
75
                return err
×
76
        }
×
77
        err = db.createTimerTasks(batch, shardID, domainID, workflowID, timerTasks)
168✔
78
        if err != nil {
168✔
79
                return err
×
80
        }
×
81

82
        err = db.assertShardRangeID(batch, shardID, shardCondition.RangeID)
168✔
83
        if err != nil {
168✔
84
                return err
×
85
        }
×
86

87
        return db.executeCreateWorkflowBatchTransaction(batch, currentWorkflowRequest, execution, shardCondition)
168✔
88
}
89

90
func (db *cdb) SelectCurrentWorkflow(
91
        ctx context.Context,
92
        shardID int, domainID, workflowID string,
93
) (*nosqlplugin.CurrentWorkflowRow, error) {
61✔
94
        query := db.session.Query(templateGetCurrentExecutionQuery,
61✔
95
                shardID,
61✔
96
                rowTypeExecution,
61✔
97
                domainID,
61✔
98
                workflowID,
61✔
99
                permanentRunID,
61✔
100
                defaultVisibilityTimestamp,
61✔
101
                rowTypeExecutionTaskID,
61✔
102
        ).WithContext(ctx)
61✔
103

61✔
104
        result := make(map[string]interface{})
61✔
105
        if err := query.MapScan(result); err != nil {
65✔
106
                return nil, err
4✔
107
        }
4✔
108

109
        currentRunID := result["current_run_id"].(gocql.UUID).String()
58✔
110
        executionInfo := parseWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
58✔
111
        lastWriteVersion := common.EmptyVersion
58✔
112
        if result["workflow_last_write_version"] != nil {
116✔
113
                lastWriteVersion = result["workflow_last_write_version"].(int64)
58✔
114
        }
58✔
115
        return &nosqlplugin.CurrentWorkflowRow{
58✔
116
                ShardID:          shardID,
58✔
117
                DomainID:         domainID,
58✔
118
                WorkflowID:       workflowID,
58✔
119
                RunID:            currentRunID,
58✔
120
                CreateRequestID:  executionInfo.CreateRequestID,
58✔
121
                State:            executionInfo.State,
58✔
122
                CloseStatus:      executionInfo.CloseStatus,
58✔
123
                LastWriteVersion: lastWriteVersion,
58✔
124
        }, nil
58✔
125
}
126

127
func (db *cdb) UpdateWorkflowExecutionWithTasks(
128
        ctx context.Context,
129
        currentWorkflowRequest *nosqlplugin.CurrentWorkflowWriteRequest,
130
        mutatedExecution *nosqlplugin.WorkflowExecutionRequest,
131
        insertedExecution *nosqlplugin.WorkflowExecutionRequest,
132
        resetExecution *nosqlplugin.WorkflowExecutionRequest,
133
        transferTasks []*nosqlplugin.TransferTask,
134
        crossClusterTasks []*nosqlplugin.CrossClusterTask,
135
        replicationTasks []*nosqlplugin.ReplicationTask,
136
        timerTasks []*nosqlplugin.TimerTask,
137
        shardCondition *nosqlplugin.ShardCondition,
138
) error {
1,468✔
139
        shardID := shardCondition.ShardID
1,468✔
140
        var domainID, workflowID string
1,468✔
141
        var previousNextEventIDCondition int64
1,468✔
142
        if mutatedExecution != nil {
2,936✔
143
                domainID = mutatedExecution.DomainID
1,468✔
144
                workflowID = mutatedExecution.WorkflowID
1,468✔
145
                previousNextEventIDCondition = *mutatedExecution.PreviousNextEventIDCondition
1,468✔
146
        } else if resetExecution != nil {
1,470✔
147
                domainID = resetExecution.DomainID
1✔
148
                workflowID = resetExecution.WorkflowID
1✔
149
                previousNextEventIDCondition = *resetExecution.PreviousNextEventIDCondition
1✔
150
        } else {
1✔
151
                return fmt.Errorf("at least one of mutatedExecution and resetExecution should be provided")
×
152
        }
×
153

154
        batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
1,468✔
155

1,468✔
156
        err := db.createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest)
1,468✔
157
        if err != nil {
1,468✔
158
                return err
×
159
        }
×
160

161
        if mutatedExecution != nil {
2,936✔
162
                err = db.updateWorkflowExecutionAndEventBufferWithMergeAndDeleteMaps(batch, shardID, domainID, workflowID, mutatedExecution)
1,468✔
163
                if err != nil {
1,468✔
164
                        return err
×
165
                }
×
166
        }
167

168
        if insertedExecution != nil {
1,527✔
169
                err = db.createWorkflowExecutionWithMergeMaps(batch, shardID, domainID, workflowID, insertedExecution)
59✔
170
                if err != nil {
59✔
171
                        return err
×
172
                }
×
173
        }
174

175
        if resetExecution != nil {
1,469✔
176
                err = db.resetWorkflowExecutionAndMapsAndEventBuffer(batch, shardID, domainID, workflowID, resetExecution)
1✔
177
                if err != nil {
1✔
178
                        return err
×
179
                }
×
180
        }
181

182
        err = db.createTransferTasks(batch, shardID, domainID, workflowID, transferTasks)
1,468✔
183
        if err != nil {
1,468✔
184
                return err
×
185
        }
×
186
        err = db.createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks)
1,468✔
187
        if err != nil {
1,468✔
188
                return err
×
189
        }
×
190
        err = db.createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks)
1,468✔
191
        if err != nil {
1,468✔
192
                return err
×
193
        }
×
194
        err = db.createTimerTasks(batch, shardID, domainID, workflowID, timerTasks)
1,468✔
195
        if err != nil {
1,468✔
196
                return err
×
197
        }
×
198

199
        err = db.assertShardRangeID(batch, shardID, shardCondition.RangeID)
1,468✔
200
        if err != nil {
1,468✔
201
                return err
×
202
        }
×
203

204
        return db.executeUpdateWorkflowBatchTransaction(batch, currentWorkflowRequest, previousNextEventIDCondition, shardCondition)
1,468✔
205
}
206

207
func (db *cdb) SelectWorkflowExecution(ctx context.Context, shardID int, domainID, workflowID, runID string) (*nosqlplugin.WorkflowExecution, error) {
382✔
208
        query := db.session.Query(templateGetWorkflowExecutionQuery,
382✔
209
                shardID,
382✔
210
                rowTypeExecution,
382✔
211
                domainID,
382✔
212
                workflowID,
382✔
213
                runID,
382✔
214
                defaultVisibilityTimestamp,
382✔
215
                rowTypeExecutionTaskID,
382✔
216
        ).WithContext(ctx)
382✔
217

382✔
218
        result := make(map[string]interface{})
382✔
219
        if err := query.MapScan(result); err != nil {
518✔
220
                return nil, err
136✔
221
        }
136✔
222

223
        state := &nosqlplugin.WorkflowExecution{}
247✔
224
        info := parseWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
247✔
225
        state.ExecutionInfo = info
247✔
226
        state.VersionHistories = p.NewDataBlob(result["version_histories"].([]byte), common.EncodingType(result["version_histories_encoding"].(string)))
247✔
227
        // TODO: remove this after all 2DC workflows complete
247✔
228
        replicationState := parseReplicationState(result["replication_state"].(map[string]interface{}))
247✔
229
        state.ReplicationState = replicationState
247✔
230

247✔
231
        activityInfos := make(map[int64]*p.InternalActivityInfo)
247✔
232
        aMap := result["activity_map"].(map[int64]map[string]interface{})
247✔
233
        for key, value := range aMap {
273✔
234
                info := parseActivityInfo(domainID, value)
26✔
235
                activityInfos[key] = info
26✔
236
        }
26✔
237
        state.ActivityInfos = activityInfos
247✔
238

247✔
239
        timerInfos := make(map[string]*p.TimerInfo)
247✔
240
        tMap := result["timer_map"].(map[string]map[string]interface{})
247✔
241
        for key, value := range tMap {
248✔
242
                info := parseTimerInfo(value)
1✔
243
                timerInfos[key] = info
1✔
244
        }
1✔
245
        state.TimerInfos = timerInfos
247✔
246

247✔
247
        childExecutionInfos := make(map[int64]*p.InternalChildExecutionInfo)
247✔
248
        cMap := result["child_executions_map"].(map[int64]map[string]interface{})
247✔
249
        for key, value := range cMap {
249✔
250
                info := parseChildExecutionInfo(value)
2✔
251
                childExecutionInfos[key] = info
2✔
252
        }
2✔
253
        state.ChildExecutionInfos = childExecutionInfos
247✔
254

247✔
255
        requestCancelInfos := make(map[int64]*p.RequestCancelInfo)
247✔
256
        rMap := result["request_cancel_map"].(map[int64]map[string]interface{})
247✔
257
        for key, value := range rMap {
247✔
258
                info := parseRequestCancelInfo(value)
×
259
                requestCancelInfos[key] = info
×
260
        }
×
261
        state.RequestCancelInfos = requestCancelInfos
247✔
262

247✔
263
        signalInfos := make(map[int64]*p.SignalInfo)
247✔
264
        sMap := result["signal_map"].(map[int64]map[string]interface{})
247✔
265
        for key, value := range sMap {
248✔
266
                info := parseSignalInfo(value)
1✔
267
                signalInfos[key] = info
1✔
268
        }
1✔
269
        state.SignalInfos = signalInfos
247✔
270

247✔
271
        signalRequestedIDs := make(map[string]struct{})
247✔
272
        sList := mustConvertToSlice(result["signal_requested"])
247✔
273
        for _, v := range sList {
257✔
274
                signalRequestedIDs[v.(gocql.UUID).String()] = struct{}{}
10✔
275
        }
10✔
276
        state.SignalRequestedIDs = signalRequestedIDs
247✔
277

247✔
278
        eList := result["buffered_events_list"].([]map[string]interface{})
247✔
279
        bufferedEventsBlobs := make([]*p.DataBlob, 0, len(eList))
247✔
280
        for _, v := range eList {
248✔
281
                blob := parseHistoryEventBatchBlob(v)
1✔
282
                bufferedEventsBlobs = append(bufferedEventsBlobs, blob)
1✔
283
        }
1✔
284
        state.BufferedEvents = bufferedEventsBlobs
247✔
285

247✔
286
        state.Checksum = parseChecksum(result["checksum"].(map[string]interface{}))
247✔
287
        return state, nil
247✔
288
}
289

290
func (db *cdb) DeleteCurrentWorkflow(ctx context.Context, shardID int, domainID, workflowID, currentRunIDCondition string) error {
18✔
291
        query := db.session.Query(templateDeleteWorkflowExecutionCurrentRowQuery,
18✔
292
                shardID,
18✔
293
                rowTypeExecution,
18✔
294
                domainID,
18✔
295
                workflowID,
18✔
296
                permanentRunID,
18✔
297
                defaultVisibilityTimestamp,
18✔
298
                rowTypeExecutionTaskID,
18✔
299
                currentRunIDCondition,
18✔
300
        ).WithContext(ctx)
18✔
301

18✔
302
        return db.executeWithConsistencyAll(query)
18✔
303
}
18✔
304

305
func (db *cdb) DeleteWorkflowExecution(ctx context.Context, shardID int, domainID, workflowID, runID string) error {
18✔
306
        query := db.session.Query(templateDeleteWorkflowExecutionMutableStateQuery,
18✔
307
                shardID,
18✔
308
                rowTypeExecution,
18✔
309
                domainID,
18✔
310
                workflowID,
18✔
311
                runID,
18✔
312
                defaultVisibilityTimestamp,
18✔
313
                rowTypeExecutionTaskID,
18✔
314
        ).WithContext(ctx)
18✔
315

18✔
316
        return db.executeWithConsistencyAll(query)
18✔
317
}
18✔
318

319
func (db *cdb) SelectAllCurrentWorkflows(ctx context.Context, shardID int, pageToken []byte, pageSize int) ([]*p.CurrentWorkflowExecution, []byte, error) {
×
320
        query := db.session.Query(
×
321
                templateListCurrentExecutionsQuery,
×
322
                shardID,
×
323
                rowTypeExecution,
×
324
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
×
325

×
326
        iter := query.Iter()
×
327
        if iter == nil {
×
328
                return nil, nil, &types.InternalServiceError{
×
329
                        Message: "SelectAllCurrentWorkflows operation failed. Not able to create query iterator.",
×
330
                }
×
331
        }
×
332
        result := make(map[string]interface{})
×
333
        var executions []*p.CurrentWorkflowExecution
×
334
        for iter.MapScan(result) {
×
335
                runID := result["run_id"].(gocql.UUID).String()
×
336
                if runID != permanentRunID {
×
337
                        result = make(map[string]interface{})
×
338
                        continue
×
339
                }
340
                executions = append(executions, &p.CurrentWorkflowExecution{
×
341
                        DomainID:     result["domain_id"].(gocql.UUID).String(),
×
342
                        WorkflowID:   result["workflow_id"].(string),
×
343
                        RunID:        permanentRunID,
×
344
                        State:        result["workflow_state"].(int),
×
345
                        CurrentRunID: result["current_run_id"].(gocql.UUID).String(),
×
346
                })
×
347
                result = make(map[string]interface{})
×
348
        }
349
        nextPageToken := getNextPageToken(iter)
×
350

×
351
        err := iter.Close()
×
352
        return executions, nextPageToken, err
×
353
}
354

355
func (db *cdb) SelectAllWorkflowExecutions(ctx context.Context, shardID int, pageToken []byte, pageSize int) ([]*p.InternalListConcreteExecutionsEntity, []byte, error) {
×
356
        query := db.session.Query(
×
357
                templateListWorkflowExecutionQuery,
×
358
                shardID,
×
359
                rowTypeExecution,
×
360
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
×
361

×
362
        iter := query.Iter()
×
363
        if iter == nil {
×
364
                return nil, nil, &types.InternalServiceError{
×
365
                        Message: "SelectAllWorkflowExecutions operation failed.  Not able to create query iterator.",
×
366
                }
×
367
        }
×
368

369
        result := make(map[string]interface{})
×
370
        var executions []*p.InternalListConcreteExecutionsEntity
×
371
        for iter.MapScan(result) {
×
372
                runID := result["run_id"].(gocql.UUID).String()
×
373
                if runID == permanentRunID {
×
374
                        result = make(map[string]interface{})
×
375
                        continue
×
376
                }
377
                executions = append(executions, &p.InternalListConcreteExecutionsEntity{
×
378
                        ExecutionInfo:    parseWorkflowExecutionInfo(result["execution"].(map[string]interface{})),
×
379
                        VersionHistories: p.NewDataBlob(result["version_histories"].([]byte), common.EncodingType(result["version_histories_encoding"].(string))),
×
380
                })
×
381
                result = make(map[string]interface{})
×
382
        }
383
        nextPageToken := getNextPageToken(iter)
×
384

×
385
        if err := iter.Close(); err != nil {
×
386
                return nil, nil, err
×
387
        }
×
388
        return executions, nextPageToken, nil
×
389
}
390

391
func (db *cdb) IsWorkflowExecutionExists(ctx context.Context, shardID int, domainID, workflowID, runID string) (bool, error) {
×
392
        query := db.session.Query(templateIsWorkflowExecutionExistsQuery,
×
393
                shardID,
×
394
                rowTypeExecution,
×
395
                domainID,
×
396
                workflowID,
×
397
                runID,
×
398
                defaultVisibilityTimestamp,
×
399
                rowTypeExecutionTaskID,
×
400
        ).WithContext(ctx)
×
401

×
402
        result := make(map[string]interface{})
×
403
        if err := query.MapScan(result); err != nil {
×
404
                if db.client.IsNotFoundError(err) {
×
405
                        return false, nil
×
406
                }
×
407

408
                return false, err
×
409
        }
410
        return true, nil
×
411
}
412

413
func (db *cdb) SelectTransferTasksOrderByTaskID(ctx context.Context, shardID, pageSize int, pageToken []byte, exclusiveMinTaskID, inclusiveMaxTaskID int64) ([]*nosqlplugin.TransferTask, []byte, error) {
769✔
414
        // Reading transfer tasks need to be quorum level consistent, otherwise we could loose task
769✔
415
        query := db.session.Query(templateGetTransferTasksQuery,
769✔
416
                shardID,
769✔
417
                rowTypeTransferTask,
769✔
418
                rowTypeTransferDomainID,
769✔
419
                rowTypeTransferWorkflowID,
769✔
420
                rowTypeTransferRunID,
769✔
421
                defaultVisibilityTimestamp,
769✔
422
                exclusiveMinTaskID,
769✔
423
                inclusiveMaxTaskID,
769✔
424
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
769✔
425

769✔
426
        iter := query.Iter()
769✔
427
        if iter == nil {
769✔
428
                return nil, nil, &types.InternalServiceError{
×
429
                        Message: "SelectTransferTasksOrderByTaskID operation failed.  Not able to create query iterator.",
×
430
                }
×
431
        }
×
432

433
        var tasks []*nosqlplugin.TransferTask
769✔
434
        task := make(map[string]interface{})
769✔
435
        for iter.MapScan(task) {
2,608✔
436
                t := parseTransferTaskInfo(task["transfer"].(map[string]interface{}))
1,839✔
437
                // Reset task map to get it ready for next scan
1,839✔
438
                task = make(map[string]interface{})
1,839✔
439

1,839✔
440
                tasks = append(tasks, t)
1,839✔
441
        }
1,839✔
442
        nextPageToken := getNextPageToken(iter)
769✔
443

769✔
444
        err := iter.Close()
769✔
445
        return tasks, nextPageToken, err
769✔
446
}
447

448
func (db *cdb) DeleteTransferTask(ctx context.Context, shardID int, taskID int64) error {
×
449
        query := db.session.Query(templateCompleteTransferTaskQuery,
×
450
                shardID,
×
451
                rowTypeTransferTask,
×
452
                rowTypeTransferDomainID,
×
453
                rowTypeTransferWorkflowID,
×
454
                rowTypeTransferRunID,
×
455
                defaultVisibilityTimestamp,
×
456
                taskID,
×
457
        ).WithContext(ctx)
×
458

×
459
        return db.executeWithConsistencyAll(query)
×
460
}
×
461

462
func (db *cdb) RangeDeleteTransferTasks(ctx context.Context, shardID int, exclusiveBeginTaskID, inclusiveEndTaskID int64) error {
37✔
463
        query := db.session.Query(templateRangeCompleteTransferTaskQuery,
37✔
464
                shardID,
37✔
465
                rowTypeTransferTask,
37✔
466
                rowTypeTransferDomainID,
37✔
467
                rowTypeTransferWorkflowID,
37✔
468
                rowTypeTransferRunID,
37✔
469
                defaultVisibilityTimestamp,
37✔
470
                exclusiveBeginTaskID,
37✔
471
                inclusiveEndTaskID,
37✔
472
        ).WithContext(ctx)
37✔
473

37✔
474
        return db.executeWithConsistencyAll(query)
37✔
475
}
37✔
476

477
func (db *cdb) SelectTimerTasksOrderByVisibilityTime(ctx context.Context, shardID, pageSize int, pageToken []byte, inclusiveMinTime, exclusiveMaxTime time.Time) ([]*nosqlplugin.TimerTask, []byte, error) {
1,081✔
478
        // Reading timer tasks need to be quorum level consistent, otherwise we could loose task
1,081✔
479
        minTimestamp := p.UnixNanoToDBTimestamp(inclusiveMinTime.UnixNano())
1,081✔
480
        maxTimestamp := p.UnixNanoToDBTimestamp(exclusiveMaxTime.UnixNano())
1,081✔
481
        query := db.session.Query(templateGetTimerTasksQuery,
1,081✔
482
                shardID,
1,081✔
483
                rowTypeTimerTask,
1,081✔
484
                rowTypeTimerDomainID,
1,081✔
485
                rowTypeTimerWorkflowID,
1,081✔
486
                rowTypeTimerRunID,
1,081✔
487
                minTimestamp,
1,081✔
488
                maxTimestamp,
1,081✔
489
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
1,081✔
490

1,081✔
491
        iter := query.Iter()
1,081✔
492
        if iter == nil {
1,081✔
493
                return nil, nil, &types.InternalServiceError{
×
494
                        Message: "SelectTimerTasksOrderByVisibilityTime operation failed.  Not able to create query iterator.",
×
495
                }
×
496
        }
×
497

498
        var timers []*nosqlplugin.TimerTask
1,081✔
499
        task := make(map[string]interface{})
1,081✔
500
        for iter.MapScan(task) {
3,760✔
501
                t := parseTimerTaskInfo(task["timer"].(map[string]interface{}))
2,679✔
502
                // Reset task map to get it ready for next scan
2,679✔
503
                task = make(map[string]interface{})
2,679✔
504

2,679✔
505
                timers = append(timers, t)
2,679✔
506
        }
2,679✔
507
        nextPageToken := getNextPageToken(iter)
1,081✔
508

1,081✔
509
        err := iter.Close()
1,081✔
510
        return timers, nextPageToken, err
1,081✔
511
}
512

513
func (db *cdb) DeleteTimerTask(ctx context.Context, shardID int, taskID int64, visibilityTimestamp time.Time) error {
×
514
        ts := p.UnixNanoToDBTimestamp(visibilityTimestamp.UnixNano())
×
515
        query := db.session.Query(templateCompleteTimerTaskQuery,
×
516
                shardID,
×
517
                rowTypeTimerTask,
×
518
                rowTypeTimerDomainID,
×
519
                rowTypeTimerWorkflowID,
×
520
                rowTypeTimerRunID,
×
521
                ts,
×
522
                taskID,
×
523
        ).WithContext(ctx)
×
524

×
525
        return db.executeWithConsistencyAll(query)
×
526
}
×
527

528
func (db *cdb) RangeDeleteTimerTasks(ctx context.Context, shardID int, inclusiveMinTime, exclusiveMaxTime time.Time) error {
12✔
529
        start := p.UnixNanoToDBTimestamp(inclusiveMinTime.UnixNano())
12✔
530
        end := p.UnixNanoToDBTimestamp(exclusiveMaxTime.UnixNano())
12✔
531
        query := db.session.Query(templateRangeCompleteTimerTaskQuery,
12✔
532
                shardID,
12✔
533
                rowTypeTimerTask,
12✔
534
                rowTypeTimerDomainID,
12✔
535
                rowTypeTimerWorkflowID,
12✔
536
                rowTypeTimerRunID,
12✔
537
                start,
12✔
538
                end,
12✔
539
        ).WithContext(ctx)
12✔
540

12✔
541
        return db.executeWithConsistencyAll(query)
12✔
542
}
12✔
543

544
func (db *cdb) SelectReplicationTasksOrderByTaskID(ctx context.Context, shardID, pageSize int, pageToken []byte, exclusiveMinTaskID, inclusiveMaxTaskID int64) ([]*nosqlplugin.ReplicationTask, []byte, error) {
40✔
545
        // Reading replication tasks need to be quorum level consistent, otherwise we could loose task
40✔
546
        query := db.session.Query(templateGetReplicationTasksQuery,
40✔
547
                shardID,
40✔
548
                rowTypeReplicationTask,
40✔
549
                rowTypeReplicationDomainID,
40✔
550
                rowTypeReplicationWorkflowID,
40✔
551
                rowTypeReplicationRunID,
40✔
552
                defaultVisibilityTimestamp,
40✔
553
                exclusiveMinTaskID,
40✔
554
                inclusiveMaxTaskID,
40✔
555
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
40✔
556
        return populateGetReplicationTasks(query)
40✔
557
}
40✔
558

559
func (db *cdb) DeleteReplicationTask(ctx context.Context, shardID int, taskID int64) error {
×
560
        query := db.session.Query(templateCompleteReplicationTaskQuery,
×
561
                shardID,
×
562
                rowTypeReplicationTask,
×
563
                rowTypeReplicationDomainID,
×
564
                rowTypeReplicationWorkflowID,
×
565
                rowTypeReplicationRunID,
×
566
                defaultVisibilityTimestamp,
×
567
                taskID,
×
568
        ).WithContext(ctx)
×
569

×
570
        return db.executeWithConsistencyAll(query)
×
571
}
×
572

573
func (db *cdb) RangeDeleteReplicationTasks(ctx context.Context, shardID int, inclusiveEndTaskID int64) error {
39✔
574
        query := db.session.Query(templateCompleteReplicationTaskBeforeQuery,
39✔
575
                shardID,
39✔
576
                rowTypeReplicationTask,
39✔
577
                rowTypeReplicationDomainID,
39✔
578
                rowTypeReplicationWorkflowID,
39✔
579
                rowTypeReplicationRunID,
39✔
580
                defaultVisibilityTimestamp,
39✔
581
                inclusiveEndTaskID,
39✔
582
        ).WithContext(ctx)
39✔
583

39✔
584
        return db.executeWithConsistencyAll(query)
39✔
585
}
39✔
586

587
func (db *cdb) SelectCrossClusterTasksOrderByTaskID(ctx context.Context, shardID, pageSize int, pageToken []byte, targetCluster string, exclusiveMinTaskID, inclusiveMaxTaskID int64) ([]*nosqlplugin.CrossClusterTask, []byte, error) {
58✔
588
        // Reading cross-cluster tasks need to be quorum level consistent, otherwise we could loose task
58✔
589
        query := db.session.Query(templateGetCrossClusterTasksQuery,
58✔
590
                shardID,
58✔
591
                rowTypeCrossClusterTask,
58✔
592
                rowTypeCrossClusterDomainID,
58✔
593
                targetCluster, // workflowID field is used to store target cluster
58✔
594
                rowTypeCrossClusterRunID,
58✔
595
                defaultVisibilityTimestamp,
58✔
596
                exclusiveMinTaskID,
58✔
597
                inclusiveMaxTaskID,
58✔
598
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
58✔
599

58✔
600
        iter := query.Iter()
58✔
601
        if iter == nil {
58✔
602
                return nil, nil, &types.InternalServiceError{
×
603
                        Message: "SelectCrossClusterTasksOrderByTaskID operation failed.  Not able to create query iterator.",
×
604
                }
×
605
        }
×
606

607
        var tasks []*nosqlplugin.CrossClusterTask
58✔
608
        task := make(map[string]interface{})
58✔
609
        for iter.MapScan(task) {
58✔
610
                t := parseCrossClusterTaskInfo(task["cross_cluster"].(map[string]interface{}))
×
611
                // Reset task map to get it ready for next scan
×
612
                task = make(map[string]interface{})
×
613

×
614
                tasks = append(tasks, &nosqlplugin.CrossClusterTask{
×
615
                        TransferTask:  *t,
×
616
                        TargetCluster: targetCluster,
×
617
                })
×
618
        }
×
619
        nextPageToken := getNextPageToken(iter)
58✔
620
        err := iter.Close()
58✔
621
        return tasks, nextPageToken, err
58✔
622
}
623

624
func (db *cdb) DeleteCrossClusterTask(ctx context.Context, shardID int, targetCluster string, taskID int64) error {
×
625
        query := db.session.Query(templateCompleteCrossClusterTaskQuery,
×
626
                shardID,
×
627
                rowTypeCrossClusterTask,
×
628
                rowTypeCrossClusterDomainID,
×
629
                targetCluster,
×
630
                rowTypeCrossClusterRunID,
×
631
                defaultVisibilityTimestamp,
×
632
                taskID,
×
633
        ).WithContext(ctx)
×
634

×
635
        return db.executeWithConsistencyAll(query)
×
636
}
×
637

638
func (db *cdb) RangeDeleteCrossClusterTasks(ctx context.Context, shardID int, targetCluster string, exclusiveBeginTaskID, inclusiveEndTaskID int64) error {
47✔
639
        query := db.session.Query(templateRangeCompleteCrossClusterTaskQuery,
47✔
640
                shardID,
47✔
641
                rowTypeCrossClusterTask,
47✔
642
                rowTypeCrossClusterDomainID,
47✔
643
                targetCluster,
47✔
644
                rowTypeCrossClusterRunID,
47✔
645
                defaultVisibilityTimestamp,
47✔
646
                exclusiveBeginTaskID,
47✔
647
                inclusiveEndTaskID,
47✔
648
        ).WithContext(ctx)
47✔
649

47✔
650
        return query.Exec()
47✔
651
}
47✔
652

653
func (db *cdb) InsertReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, task nosqlplugin.ReplicationTask) error {
1✔
654
        // Use source cluster name as the workflow id for replication dlq
1✔
655
        query := db.session.Query(templateCreateReplicationTaskQuery,
1✔
656
                shardID,
1✔
657
                rowTypeDLQ,
1✔
658
                rowTypeDLQDomainID,
1✔
659
                sourceCluster,
1✔
660
                rowTypeDLQRunID,
1✔
661
                task.DomainID,
1✔
662
                task.WorkflowID,
1✔
663
                task.RunID,
1✔
664
                task.TaskID,
1✔
665
                task.TaskType,
1✔
666
                task.FirstEventID,
1✔
667
                task.NextEventID,
1✔
668
                task.Version,
1✔
669
                task.ScheduledID,
1✔
670
                p.EventStoreVersion,
1✔
671
                task.BranchToken,
1✔
672
                p.EventStoreVersion,
1✔
673
                task.NewRunBranchToken,
1✔
674
                defaultVisibilityTimestamp,
1✔
675
                defaultVisibilityTimestamp,
1✔
676
                task.TaskID,
1✔
677
        ).WithContext(ctx)
1✔
678

1✔
679
        return query.Exec()
1✔
680
}
1✔
681

682
func (db *cdb) SelectReplicationDLQTasksOrderByTaskID(ctx context.Context, shardID int, sourceCluster string, pageSize int, pageToken []byte, exclusiveMinTaskID, inclusiveMaxTaskID int64) ([]*nosqlplugin.ReplicationTask, []byte, error) {
1✔
683
        // Reading replication tasks need to be quorum level consistent, otherwise we could loose task
1✔
684
        query := db.session.Query(templateGetReplicationTasksQuery,
1✔
685
                shardID,
1✔
686
                rowTypeDLQ,
1✔
687
                rowTypeDLQDomainID,
1✔
688
                sourceCluster,
1✔
689
                rowTypeDLQRunID,
1✔
690
                defaultVisibilityTimestamp,
1✔
691
                exclusiveMinTaskID,
1✔
692
                inclusiveMaxTaskID,
1✔
693
        ).PageSize(pageSize).PageState(pageToken).WithContext(ctx)
1✔
694

1✔
695
        return populateGetReplicationTasks(query)
1✔
696
}
1✔
697

698
func (db *cdb) SelectReplicationDLQTasksCount(ctx context.Context, shardID int, sourceCluster string) (int64, error) {
4✔
699
        // Reading replication tasks need to be quorum level consistent, otherwise we could loose task
4✔
700
        query := db.session.Query(templateGetDLQSizeQuery,
4✔
701
                shardID,
4✔
702
                rowTypeDLQ,
4✔
703
                rowTypeDLQDomainID,
4✔
704
                sourceCluster,
4✔
705
                rowTypeDLQRunID,
4✔
706
        ).WithContext(ctx)
4✔
707

4✔
708
        result := make(map[string]interface{})
4✔
709
        if err := query.MapScan(result); err != nil {
4✔
710
                return -1, err
×
711
        }
×
712

713
        queueSize := result["count"].(int64)
4✔
714
        return queueSize, nil
4✔
715
}
716

717
func (db *cdb) DeleteReplicationDLQTask(ctx context.Context, shardID int, sourceCluster string, taskID int64) error {
×
718
        query := db.session.Query(templateCompleteReplicationTaskQuery,
×
719
                shardID,
×
720
                rowTypeDLQ,
×
721
                rowTypeDLQDomainID,
×
722
                sourceCluster,
×
723
                rowTypeDLQRunID,
×
724
                defaultVisibilityTimestamp,
×
725
                taskID,
×
726
        ).WithContext(ctx)
×
727

×
728
        return db.executeWithConsistencyAll(query)
×
729
}
×
730

731
func (db *cdb) RangeDeleteReplicationDLQTasks(ctx context.Context, shardID int, sourceCluster string, exclusiveBeginTaskID, inclusiveEndTaskID int64) error {
×
732
        query := db.session.Query(templateRangeCompleteReplicationTaskQuery,
×
733
                shardID,
×
734
                rowTypeDLQ,
×
735
                rowTypeDLQDomainID,
×
736
                sourceCluster,
×
737
                rowTypeDLQRunID,
×
738
                defaultVisibilityTimestamp,
×
739
                exclusiveBeginTaskID,
×
740
                inclusiveEndTaskID,
×
741
        ).WithContext(ctx)
×
742

×
743
        return db.executeWithConsistencyAll(query)
×
744
}
×
745

746
func (db *cdb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.ReplicationTask, shardCondition nosqlplugin.ShardCondition) error {
×
747
        if len(tasks) == 0 {
×
748
                return nil
×
749
        }
×
750

751
        shardID := shardCondition.ShardID
×
752
        batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
×
753
        for _, task := range tasks {
×
754
                err := db.createReplicationTasks(batch, shardID, task.DomainID, task.WorkflowID, []*nosqlplugin.ReplicationTask{task})
×
755
                if err != nil {
×
756
                        return err
×
757
                }
×
758
        }
759

760
        err := db.assertShardRangeID(batch, shardID, shardCondition.RangeID)
×
761
        if err != nil {
×
762
                return err
×
763
        }
×
764

765
        previous := make(map[string]interface{})
×
766
        applied, iter, err := db.session.MapExecuteBatchCAS(batch, previous)
×
767
        defer func() {
×
768
                if iter != nil {
×
769
                        _ = iter.Close()
×
770
                }
×
771
        }()
772
        if err != nil {
×
773
                return err
×
774
        }
×
775

776
        if !applied {
×
777
                rowType, ok := previous["type"].(int)
×
778
                if !ok {
×
779
                        // This should never happen, as all our rows have the type field.
×
780
                        panic("Encounter row type not found")
×
781
                }
782
                if rowType == rowTypeShard {
×
783
                        if actualRangeID, ok := previous["range_id"].(int64); ok && actualRangeID != shardCondition.RangeID {
×
784
                                // CreateWorkflowExecution failed because rangeID was modified
×
785
                                return &nosqlplugin.ShardOperationConditionFailure{
×
786
                                        RangeID: actualRangeID,
×
787
                                }
×
788
                        }
×
789
                }
790

791
                // At this point we only know that the write was not applied.
792
                // It's much safer to return ShardOperationConditionFailure(which will become ShardOwnershipLostError later) as the default to force the application to reload
793
                // shard to recover from such errors
794
                var columns []string
×
795
                for k, v := range previous {
×
796
                        columns = append(columns, fmt.Sprintf("%s=%v", k, v))
×
797
                }
×
798
                return &nosqlplugin.ShardOperationConditionFailure{
×
799
                        RangeID: -1,
×
800
                        Details: strings.Join(columns, ","),
×
801
                }
×
802
        }
803
        return nil
×
804
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc