• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01875e2f-959c-4c4d-87af-1d7805759bcc

08 Apr 2023 12:26AM UTC coverage: 57.178% (+0.1%) from 57.072%
01875e2f-959c-4c4d-87af-1d7805759bcc

Pull #5197

buildkite

Steven L
bad cleanup -> good cleanup
Pull Request #5197: Demonstrate a way to get rid of the cadence-idl repo

85396 of 149351 relevant lines covered (57.18%)

2283.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.16
/common/persistence/nosql/nosqlplugin/cassandra/workflowUtils.go
1
// Copyright (c) 2021 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package cassandra
23

24
import (
25
        "fmt"
26
        "reflect"
27
        "strings"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/persistence"
32
        "github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
33
        "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
34
        "github.com/uber/cadence/common/types"
35
)
36

37
func (db *cdb) executeCreateWorkflowBatchTransaction(
38
        batch gocql.Batch,
39
        currentWorkflowRequest *nosqlplugin.CurrentWorkflowWriteRequest,
40
        execution *nosqlplugin.WorkflowExecutionRequest,
41
        shardCondition *nosqlplugin.ShardCondition,
42
) error {
168✔
43
        previous := make(map[string]interface{})
168✔
44
        applied, iter, err := db.session.MapExecuteBatchCAS(batch, previous)
168✔
45
        defer func() {
336✔
46
                if iter != nil {
336✔
47
                        _ = iter.Close()
168✔
48
                }
168✔
49
        }()
50

51
        if err != nil {
168✔
52
                return err
×
53
        }
×
54

55
        if !applied {
183✔
56
                requestConditionalRunID := ""
15✔
57
                if currentWorkflowRequest.Condition != nil {
15✔
58
                        requestConditionalRunID = currentWorkflowRequest.Condition.GetCurrentRunID()
×
59
                }
×
60
                // There can be two reasons why the query does not get applied. Either the RangeID has changed, or
61
                // the workflow is already started. Check the row info returned by Cassandra to figure out which one it is.
62
        GetFailureReasonLoop:
15✔
63
                for {
44✔
64
                        rowType, ok := previous["type"].(int)
29✔
65
                        if !ok {
29✔
66
                                // This should never happen, as all our rows have the type field.
×
67
                                break GetFailureReasonLoop
×
68
                        }
69
                        runID := previous["run_id"].(gocql.UUID).String()
29✔
70

29✔
71
                        if rowType == rowTypeShard {
44✔
72
                                if rangeID, ok := previous["range_id"].(int64); ok && rangeID != shardCondition.RangeID {
15✔
73
                                        // CreateWorkflowExecution failed because rangeID was modified
×
74
                                        return &nosqlplugin.WorkflowOperationConditionFailure{
×
75
                                                ShardRangeIDNotMatch: common.Int64Ptr(rangeID),
×
76
                                        }
×
77
                                }
×
78

79
                        } else if rowType == rowTypeExecution && runID == permanentRunID {
29✔
80
                                var columns []string
14✔
81
                                for k, v := range previous {
448✔
82
                                        columns = append(columns, fmt.Sprintf("%s=%v", k, v))
434✔
83
                                }
434✔
84

85
                                if execution, ok := previous["execution"].(map[string]interface{}); ok {
28✔
86
                                        // CreateWorkflowExecution failed because it already exists
14✔
87
                                        executionInfo := parseWorkflowExecutionInfo(execution)
14✔
88
                                        lastWriteVersion := common.EmptyVersion
14✔
89
                                        if previous["workflow_last_write_version"] != nil {
28✔
90
                                                lastWriteVersion = previous["workflow_last_write_version"].(int64)
14✔
91
                                        }
14✔
92

93
                                        msg := fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v, columns: (%v)",
14✔
94
                                                executionInfo.WorkflowID, executionInfo.RunID, shardCondition.RangeID, strings.Join(columns, ","))
14✔
95

14✔
96
                                        if currentWorkflowRequest.WriteMode == nosqlplugin.CurrentWorkflowWriteModeInsert {
28✔
97
                                                return &nosqlplugin.WorkflowOperationConditionFailure{
14✔
98
                                                        WorkflowExecutionAlreadyExists: &nosqlplugin.WorkflowExecutionAlreadyExists{
14✔
99
                                                                OtherInfo:        msg,
14✔
100
                                                                CreateRequestID:  executionInfo.CreateRequestID,
14✔
101
                                                                RunID:            executionInfo.RunID,
14✔
102
                                                                State:            executionInfo.State,
14✔
103
                                                                CloseStatus:      executionInfo.CloseStatus,
14✔
104
                                                                LastWriteVersion: lastWriteVersion,
14✔
105
                                                        },
14✔
106
                                                }
14✔
107
                                        }
14✔
108
                                        return &nosqlplugin.WorkflowOperationConditionFailure{
×
109
                                                CurrentWorkflowConditionFailInfo: &msg,
×
110
                                        }
×
111

112
                                }
113

114
                                if prevRunID := previous["current_run_id"].(gocql.UUID).String(); requestConditionalRunID != "" && prevRunID != requestConditionalRunID {
×
115
                                        // currentRunID on previous run has been changed, return to caller to handle
×
116
                                        msg := fmt.Sprintf("Workflow execution creation condition failed by mismatch runID. WorkflowId: %v, Expected Current RunID: %v, Actual Current RunID: %v",
×
117
                                                execution.WorkflowID, currentWorkflowRequest.Condition.GetCurrentRunID(), prevRunID)
×
118
                                        return &nosqlplugin.WorkflowOperationConditionFailure{
×
119
                                                CurrentWorkflowConditionFailInfo: &msg,
×
120
                                        }
×
121
                                }
×
122

123
                                msg := fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, CurrentRunID: %v, columns: (%v)",
×
124
                                        execution.WorkflowID, execution.RunID, strings.Join(columns, ","))
×
125
                                return &nosqlplugin.WorkflowOperationConditionFailure{
×
126
                                        CurrentWorkflowConditionFailInfo: &msg,
×
127
                                }
×
128
                        } else if rowType == rowTypeExecution && execution.RunID == runID {
2✔
129
                                msg := fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v",
1✔
130
                                        execution.WorkflowID, execution.RunID, shardCondition.RangeID)
1✔
131
                                lastWriteVersion := common.EmptyVersion
1✔
132
                                if previous["workflow_last_write_version"] != nil {
2✔
133
                                        lastWriteVersion = previous["workflow_last_write_version"].(int64)
1✔
134
                                }
1✔
135
                                return &nosqlplugin.WorkflowOperationConditionFailure{
1✔
136
                                        WorkflowExecutionAlreadyExists: &nosqlplugin.WorkflowExecutionAlreadyExists{
1✔
137
                                                OtherInfo:        msg,
1✔
138
                                                CreateRequestID:  execution.CreateRequestID,
1✔
139
                                                RunID:            execution.RunID,
1✔
140
                                                State:            execution.State,
1✔
141
                                                CloseStatus:      execution.CloseStatus,
1✔
142
                                                LastWriteVersion: lastWriteVersion,
1✔
143
                                        },
1✔
144
                                }
1✔
145
                        }
146

147
                        previous = make(map[string]interface{})
15✔
148
                        if !iter.MapScan(previous) {
15✔
149
                                // Cassandra returns the actual row that caused a condition failure, so we should always return
×
150
                                // from the checks above, but just in case.
×
151
                                break GetFailureReasonLoop
×
152
                        }
153
                }
154

155
                return newUnknownConditionFailureReason(shardCondition.RangeID, previous)
×
156
        }
157

158
        return nil
154✔
159
}
160

161
func (db *cdb) executeUpdateWorkflowBatchTransaction(
162
        batch gocql.Batch,
163
        currentWorkflowRequest *nosqlplugin.CurrentWorkflowWriteRequest,
164
        PreviousNextEventIDCondition int64,
165
        shardCondition *nosqlplugin.ShardCondition,
166
) error {
1,468✔
167
        previous := make(map[string]interface{})
1,468✔
168
        applied, iter, err := db.session.MapExecuteBatchCAS(batch, previous)
1,468✔
169
        defer func() {
2,936✔
170
                if iter != nil {
2,936✔
171
                        _ = iter.Close()
1,468✔
172
                }
1,468✔
173
        }()
174

175
        if err != nil {
1,468✔
176
                return err
×
177
        }
×
178

179
        if !applied {
1,468✔
180
                requestRunID := currentWorkflowRequest.Row.RunID
×
181
                requestCondition := PreviousNextEventIDCondition
×
182
                requestRangeID := shardCondition.RangeID
×
183
                requestConditionalRunID := ""
×
184
                if currentWorkflowRequest.Condition != nil {
×
185
                        requestConditionalRunID = currentWorkflowRequest.Condition.GetCurrentRunID()
×
186
                }
×
187

188
                // There can be three reasons why the query does not get applied: the RangeID has changed, or the next_event_id or current_run_id check failed.
189
                // Check the row info returned by Cassandra to figure out which one it is.
190
                rangeIDUnmatch := false
×
191
                actualRangeID := int64(0)
×
192
                nextEventIDUnmatch := false
×
193
                actualNextEventID := int64(0)
×
194
                runIDUnmatch := false
×
195
                actualCurrRunID := ""
×
196
                var allPrevious []map[string]interface{}
×
197

×
198
        GetFailureReasonLoop:
×
199
                for {
×
200
                        rowType, ok := previous["type"].(int)
×
201
                        if !ok {
×
202
                                // This should never happen, as all our rows have the type field.
×
203
                                break GetFailureReasonLoop
×
204
                        }
205

206
                        runID := previous["run_id"].(gocql.UUID).String()
×
207

×
208
                        if rowType == rowTypeShard {
×
209
                                if actualRangeID, ok = previous["range_id"].(int64); ok && actualRangeID != requestRangeID {
×
210
                                        // UpdateWorkflowExecution failed because rangeID was modified
×
211
                                        rangeIDUnmatch = true
×
212
                                }
×
213
                        } else if rowType == rowTypeExecution && runID == requestRunID {
×
214
                                if actualNextEventID, ok = previous["next_event_id"].(int64); ok && actualNextEventID != requestCondition {
×
215
                                        // UpdateWorkflowExecution failed because next event ID is unexpected
×
216
                                        nextEventIDUnmatch = true
×
217
                                }
×
218
                        } else if rowType == rowTypeExecution && runID == permanentRunID {
×
219
                                // UpdateWorkflowExecution failed because current_run_id is unexpected
×
220
                                if actualCurrRunID = previous["current_run_id"].(gocql.UUID).String(); requestConditionalRunID != "" && actualCurrRunID != requestConditionalRunID {
×
221
                                        // UpdateWorkflowExecution failed because next event ID is unexpected
×
222
                                        runIDUnmatch = true
×
223
                                }
×
224
                        }
225

226
                        allPrevious = append(allPrevious, previous)
×
227
                        previous = make(map[string]interface{})
×
228
                        if !iter.MapScan(previous) {
×
229
                                // Cassandra returns the actual row that caused a condition failure, so we should always return
×
230
                                // from the checks above, but just in case.
×
231
                                break GetFailureReasonLoop
×
232
                        }
233
                }
234

235
                if rangeIDUnmatch {
×
236
                        return &nosqlplugin.WorkflowOperationConditionFailure{
×
237
                                ShardRangeIDNotMatch: common.Int64Ptr(actualRangeID),
×
238
                        }
×
239
                }
×
240

241
                if runIDUnmatch {
×
242
                        msg := fmt.Sprintf("Failed to update mutable state.  Request Condition: %v, Actual Value: %v, Request Current RunID: %v, Actual Value: %v",
×
243
                                requestCondition, actualNextEventID, requestConditionalRunID, actualCurrRunID)
×
244
                        return &nosqlplugin.WorkflowOperationConditionFailure{
×
245
                                CurrentWorkflowConditionFailInfo: &msg,
×
246
                        }
×
247
                }
×
248

249
                if nextEventIDUnmatch {
×
250
                        msg := fmt.Sprintf("Failed to update mutable state.  Request Condition: %v, Actual Value: %v, Request Current RunID: %v, Actual Value: %v",
×
251
                                requestCondition, actualNextEventID, requestConditionalRunID, actualCurrRunID)
×
252
                        return &nosqlplugin.WorkflowOperationConditionFailure{
×
253
                                UnknownConditionFailureDetails: &msg,
×
254
                        }
×
255
                }
×
256

257
                // At this point we only know that the write was not applied.
258
                var columns []string
×
259
                columnID := 0
×
260
                for _, previous := range allPrevious {
×
261
                        for k, v := range previous {
×
262
                                columns = append(columns, fmt.Sprintf("%v: %s=%v", columnID, k, v))
×
263
                        }
×
264
                        columnID++
×
265
                }
266
                msg := fmt.Sprintf("Failed to update mutable state. ShardID: %v, RangeID: %v, Condition: %v, Request Current RunID: %v, columns: (%v)",
×
267
                        shardCondition.ShardID, requestRangeID, requestCondition, requestConditionalRunID, strings.Join(columns, ","))
×
268
                return &nosqlplugin.WorkflowOperationConditionFailure{
×
269
                        UnknownConditionFailureDetails: &msg,
×
270
                }
×
271
        }
272

273
        return nil
1,468✔
274
}
275

276
func newUnknownConditionFailureReason(
277
        rangeID int64,
278
        row map[string]interface{},
279
) *nosqlplugin.WorkflowOperationConditionFailure {
×
280
        // At this point we only know that the write was not applied.
×
281
        // It's much safer to return ShardOwnershipLostError as the default to force the application to reload
×
282
        // shard to recover from such errors
×
283
        var columns []string
×
284
        for k, v := range row {
×
285
                columns = append(columns, fmt.Sprintf("%s=%v", k, v))
×
286
        }
×
287

288
        msg := fmt.Sprintf("Failed to operate on workflow execution.  Request RangeID: %v, columns: (%v)",
×
289
                rangeID, strings.Join(columns, ","))
×
290

×
291
        return &nosqlplugin.WorkflowOperationConditionFailure{
×
292
                UnknownConditionFailureDetails: &msg,
×
293
        }
×
294
}
295

296
func (db *cdb) assertShardRangeID(batch gocql.Batch, shardID int, rangeID int64) error {
1,635✔
297
        batch.Query(templateUpdateLeaseQuery,
1,635✔
298
                rangeID,
1,635✔
299
                shardID,
1,635✔
300
                rowTypeShard,
1,635✔
301
                rowTypeShardDomainID,
1,635✔
302
                rowTypeShardWorkflowID,
1,635✔
303
                rowTypeShardRunID,
1,635✔
304
                defaultVisibilityTimestamp,
1,635✔
305
                rowTypeShardTaskID,
1,635✔
306
                rangeID,
1,635✔
307
        )
1,635✔
308
        return nil
1,635✔
309
}
1,635✔
310

311
func (db *cdb) createTimerTasks(
312
        batch gocql.Batch,
313
        shardID int,
314
        domainID string,
315
        workflowID string,
316
        timerTasks []*nosqlplugin.TimerTask,
317
) error {
1,635✔
318
        for _, task := range timerTasks {
2,710✔
319
                // Ignoring possible type cast errors.
1,075✔
320
                ts := persistence.UnixNanoToDBTimestamp(task.VisibilityTimestamp.UnixNano())
1,075✔
321

1,075✔
322
                batch.Query(templateCreateTimerTaskQuery,
1,075✔
323
                        shardID,
1,075✔
324
                        rowTypeTimerTask,
1,075✔
325
                        rowTypeTimerDomainID,
1,075✔
326
                        rowTypeTimerWorkflowID,
1,075✔
327
                        rowTypeTimerRunID,
1,075✔
328
                        domainID,
1,075✔
329
                        workflowID,
1,075✔
330
                        task.RunID,
1,075✔
331
                        ts,
1,075✔
332
                        task.TaskID,
1,075✔
333
                        task.TaskType,
1,075✔
334
                        task.TimeoutType,
1,075✔
335
                        task.EventID,
1,075✔
336
                        task.ScheduleAttempt,
1,075✔
337
                        task.Version,
1,075✔
338
                        ts,
1,075✔
339
                        task.TaskID)
1,075✔
340
        }
1,075✔
341
        return nil
1,635✔
342
}
343

344
func (db *cdb) createReplicationTasks(
345
        batch gocql.Batch,
346
        shardID int,
347
        domainID string,
348
        workflowID string,
349
        transferTasks []*nosqlplugin.ReplicationTask,
350
) error {
1,635✔
351
        for _, task := range transferTasks {
1,635✔
352
                batch.Query(templateCreateReplicationTaskQuery,
×
353
                        shardID,
×
354
                        rowTypeReplicationTask,
×
355
                        rowTypeReplicationDomainID,
×
356
                        rowTypeReplicationWorkflowID,
×
357
                        rowTypeReplicationRunID,
×
358
                        domainID,
×
359
                        workflowID,
×
360
                        task.RunID,
×
361
                        task.TaskID,
×
362
                        task.TaskType,
×
363
                        task.FirstEventID,
×
364
                        task.NextEventID,
×
365
                        task.Version,
×
366
                        task.ScheduledID,
×
367
                        persistence.EventStoreVersion,
×
368
                        task.BranchToken,
×
369
                        persistence.EventStoreVersion,
×
370
                        task.NewRunBranchToken,
×
371
                        task.CreationTime.UnixNano(),
×
372
                        // NOTE: use a constant here instead of task.VisibilityTimestamp so that we can query tasks with the same visibilityTimestamp
×
373
                        defaultVisibilityTimestamp,
×
374
                        task.TaskID)
×
375
        }
×
376
        return nil
1,635✔
377
}
378

379
func (db *cdb) createTransferTasks(
380
        batch gocql.Batch,
381
        shardID int,
382
        domainID string,
383
        workflowID string,
384
        transferTasks []*nosqlplugin.TransferTask,
385
) error {
1,635✔
386
        for _, task := range transferTasks {
2,629✔
387
                batch.Query(templateCreateTransferTaskQuery,
994✔
388
                        shardID,
994✔
389
                        rowTypeTransferTask,
994✔
390
                        rowTypeTransferDomainID,
994✔
391
                        rowTypeTransferWorkflowID,
994✔
392
                        rowTypeTransferRunID,
994✔
393
                        domainID,
994✔
394
                        workflowID,
994✔
395
                        task.RunID,
994✔
396
                        task.VisibilityTimestamp,
994✔
397
                        task.TaskID,
994✔
398
                        task.TargetDomainID,
994✔
399
                        task.TargetDomainIDs,
994✔
400
                        task.TargetWorkflowID,
994✔
401
                        task.TargetRunID,
994✔
402
                        task.TargetChildWorkflowOnly,
994✔
403
                        task.TaskList,
994✔
404
                        task.TaskType,
994✔
405
                        task.ScheduleID,
994✔
406
                        task.RecordVisibility,
994✔
407
                        task.Version,
994✔
408
                        // NOTE: use a constant here instead of task.VisibilityTimestamp so that we can query tasks with the same visibilityTimestamp
994✔
409
                        defaultVisibilityTimestamp,
994✔
410
                        task.TaskID)
994✔
411
        }
994✔
412
        return nil
1,635✔
413
}
414

415
func (db *cdb) createCrossClusterTasks(
416
        batch gocql.Batch,
417
        shardID int,
418
        domainID string,
419
        workflowID string,
420
        transferTasks []*nosqlplugin.CrossClusterTask,
421
) error {
1,635✔
422
        for _, task := range transferTasks {
1,635✔
423
                batch.Query(templateCreateCrossClusterTaskQuery,
×
424
                        shardID,
×
425
                        rowTypeCrossClusterTask,
×
426
                        rowTypeCrossClusterDomainID,
×
427
                        task.TargetCluster,
×
428
                        rowTypeCrossClusterRunID,
×
429
                        domainID,
×
430
                        workflowID,
×
431
                        task.RunID,
×
432
                        task.VisibilityTimestamp,
×
433
                        task.TaskID,
×
434
                        task.TargetDomainID,
×
435
                        task.TargetDomainIDs,
×
436
                        task.TargetWorkflowID,
×
437
                        task.TargetRunID,
×
438
                        task.TargetChildWorkflowOnly,
×
439
                        task.TaskList,
×
440
                        task.TaskType,
×
441
                        task.ScheduleID,
×
442
                        task.RecordVisibility,
×
443
                        task.Version,
×
444
                        // NOTE: use a constant here instead of task.VisibilityTimestamp so that we can query tasks with the same visibilityTimestamp
×
445
                        defaultVisibilityTimestamp,
×
446
                        task.TaskID,
×
447
                )
×
448
        }
×
449
        return nil
1,635✔
450
}
451

452
func (db *cdb) resetSignalsRequested(
453
        batch gocql.Batch,
454
        shardID int,
455
        domainID string,
456
        workflowID string,
457
        runID string,
458
        signalReqIDs []string,
459
) error {
1✔
460
        batch.Query(templateResetSignalRequestedQuery,
1✔
461
                signalReqIDs,
1✔
462
                shardID,
1✔
463
                rowTypeExecution,
1✔
464
                domainID,
1✔
465
                workflowID,
1✔
466
                runID,
1✔
467
                defaultVisibilityTimestamp,
1✔
468
                rowTypeExecutionTaskID)
1✔
469
        return nil
1✔
470
}
1✔
471

472
func (db *cdb) updateSignalsRequested(
473
        batch gocql.Batch,
474
        shardID int,
475
        domainID string,
476
        workflowID string,
477
        runID string,
478
        signalReqIDs []string,
479
        deleteSignalReqIDs []string,
480
) error {
1,693✔
481

1,693✔
482
        if len(signalReqIDs) > 0 {
1,714✔
483
                batch.Query(templateUpdateSignalRequestedQuery,
21✔
484
                        signalReqIDs,
21✔
485
                        shardID,
21✔
486
                        rowTypeExecution,
21✔
487
                        domainID,
21✔
488
                        workflowID,
21✔
489
                        runID,
21✔
490
                        defaultVisibilityTimestamp,
21✔
491
                        rowTypeExecutionTaskID)
21✔
492
        }
21✔
493

494
        if len(deleteSignalReqIDs) > 0 {
1,695✔
495
                batch.Query(templateDeleteWorkflowExecutionSignalRequestedQuery,
2✔
496
                        deleteSignalReqIDs,
2✔
497
                        shardID,
2✔
498
                        rowTypeExecution,
2✔
499
                        domainID,
2✔
500
                        workflowID,
2✔
501
                        runID,
2✔
502
                        defaultVisibilityTimestamp,
2✔
503
                        rowTypeExecutionTaskID)
2✔
504
        }
2✔
505
        return nil
1,693✔
506
}
507

508
func (db *cdb) resetSignalInfos(
509
        batch gocql.Batch,
510
        shardID int,
511
        domainID string,
512
        workflowID string,
513
        runID string,
514
        signalInfos map[int64]*persistence.SignalInfo,
515
) error {
1✔
516
        batch.Query(templateResetSignalInfoQuery,
1✔
517
                resetSignalInfoMap(signalInfos),
1✔
518
                shardID,
1✔
519
                rowTypeExecution,
1✔
520
                domainID,
1✔
521
                workflowID,
1✔
522
                runID,
1✔
523
                defaultVisibilityTimestamp,
1✔
524
                rowTypeExecutionTaskID)
1✔
525
        return nil
1✔
526
}
1✔
527

528
func resetSignalInfoMap(
529
        signalInfos map[int64]*persistence.SignalInfo,
530
) map[int64]map[string]interface{} {
1✔
531

1✔
532
        sMap := make(map[int64]map[string]interface{})
1✔
533
        for _, s := range signalInfos {
2✔
534
                sInfo := make(map[string]interface{})
1✔
535
                sInfo["version"] = s.Version
1✔
536
                sInfo["initiated_id"] = s.InitiatedID
1✔
537
                sInfo["initiated_event_batch_id"] = s.InitiatedEventBatchID
1✔
538
                sInfo["signal_request_id"] = s.SignalRequestID
1✔
539
                sInfo["signal_name"] = s.SignalName
1✔
540
                sInfo["input"] = s.Input
1✔
541
                sInfo["control"] = s.Control
1✔
542

1✔
543
                sMap[s.InitiatedID] = sInfo
1✔
544
        }
1✔
545

546
        return sMap
1✔
547
}
548

549
func (db *cdb) updateSignalInfos(
550
        batch gocql.Batch,
551
        shardID int,
552
        domainID string,
553
        workflowID string,
554
        runID string,
555
        signalInfos map[int64]*persistence.SignalInfo,
556
        deleteInfos []int64,
557
) error {
1,693✔
558
        for _, c := range signalInfos {
1,698✔
559
                batch.Query(templateUpdateSignalInfoQuery,
5✔
560
                        c.InitiatedID,
5✔
561
                        c.Version,
5✔
562
                        c.InitiatedID,
5✔
563
                        c.InitiatedEventBatchID,
5✔
564
                        c.SignalRequestID,
5✔
565
                        c.SignalName,
5✔
566
                        c.Input,
5✔
567
                        c.Control,
5✔
568
                        shardID,
5✔
569
                        rowTypeExecution,
5✔
570
                        domainID,
5✔
571
                        workflowID,
5✔
572
                        runID,
5✔
573
                        defaultVisibilityTimestamp,
5✔
574
                        rowTypeExecutionTaskID)
5✔
575
        }
5✔
576

577
        // deleteInfos are the initiatedIDs for SignalInfo being deleted
578
        for _, deleteInfo := range deleteInfos {
1,698✔
579
                batch.Query(templateDeleteSignalInfoQuery,
5✔
580
                        deleteInfo,
5✔
581
                        shardID,
5✔
582
                        rowTypeExecution,
5✔
583
                        domainID,
5✔
584
                        workflowID,
5✔
585
                        runID,
5✔
586
                        defaultVisibilityTimestamp,
5✔
587
                        rowTypeExecutionTaskID)
5✔
588
        }
5✔
589
        return nil
1,693✔
590
}
591

592
func (db *cdb) resetRequestCancelInfos(
593
        batch gocql.Batch,
594
        shardID int,
595
        domainID string,
596
        workflowID string,
597
        runID string,
598
        requestCancelInfos map[int64]*persistence.RequestCancelInfo,
599
) error {
1✔
600
        batch.Query(templateResetRequestCancelInfoQuery,
1✔
601
                resetRequestCancelInfoMap(requestCancelInfos),
1✔
602
                shardID,
1✔
603
                rowTypeExecution,
1✔
604
                domainID,
1✔
605
                workflowID,
1✔
606
                runID,
1✔
607
                defaultVisibilityTimestamp,
1✔
608
                rowTypeExecutionTaskID)
1✔
609
        return nil
1✔
610
}
1✔
611

612
func resetRequestCancelInfoMap(
613
        requestCancelInfos map[int64]*persistence.RequestCancelInfo,
614
) map[int64]map[string]interface{} {
1✔
615

1✔
616
        rcMap := make(map[int64]map[string]interface{})
1✔
617
        for _, rc := range requestCancelInfos {
1✔
618
                rcInfo := make(map[string]interface{})
×
619
                rcInfo["version"] = rc.Version
×
620
                rcInfo["initiated_id"] = rc.InitiatedID
×
621
                rcInfo["initiated_event_batch_id"] = rc.InitiatedEventBatchID
×
622
                rcInfo["cancel_request_id"] = rc.CancelRequestID
×
623

×
624
                rcMap[rc.InitiatedID] = rcInfo
×
625
        }
×
626

627
        return rcMap
1✔
628
}
629

630
func (db *cdb) updateRequestCancelInfos(
631
        batch gocql.Batch,
632
        shardID int,
633
        domainID string,
634
        workflowID string,
635
        runID string,
636
        requestCancelInfos map[int64]*persistence.RequestCancelInfo,
637
        deleteInfos []int64,
638
) error {
1,693✔
639

1,693✔
640
        for _, c := range requestCancelInfos {
1,696✔
641
                batch.Query(templateUpdateRequestCancelInfoQuery,
3✔
642
                        c.InitiatedID,
3✔
643
                        c.Version,
3✔
644
                        c.InitiatedID,
3✔
645
                        c.InitiatedEventBatchID,
3✔
646
                        c.CancelRequestID,
3✔
647
                        shardID,
3✔
648
                        rowTypeExecution,
3✔
649
                        domainID,
3✔
650
                        workflowID,
3✔
651
                        runID,
3✔
652
                        defaultVisibilityTimestamp,
3✔
653
                        rowTypeExecutionTaskID)
3✔
654
        }
3✔
655

656
        // deleteInfos are the initiatedIDs for RequestCancelInfo being deleted
657
        for _, deleteInfo := range deleteInfos {
1,696✔
658
                batch.Query(templateDeleteRequestCancelInfoQuery,
3✔
659
                        deleteInfo,
3✔
660
                        shardID,
3✔
661
                        rowTypeExecution,
3✔
662
                        domainID,
3✔
663
                        workflowID,
3✔
664
                        runID,
3✔
665
                        defaultVisibilityTimestamp,
3✔
666
                        rowTypeExecutionTaskID)
3✔
667
        }
3✔
668
        return nil
1,693✔
669
}
670

671
func (db *cdb) resetChildExecutionInfos(
672
        batch gocql.Batch,
673
        shardID int,
674
        domainID string,
675
        workflowID string,
676
        runID string,
677
        childExecutionInfos map[int64]*persistence.InternalChildExecutionInfo,
678
) error {
1✔
679
        infoMap, err := resetChildExecutionInfoMap(childExecutionInfos)
1✔
680
        if err != nil {
1✔
681
                return err
×
682
        }
×
683
        batch.Query(templateResetChildExecutionInfoQuery,
1✔
684
                infoMap,
1✔
685
                shardID,
1✔
686
                rowTypeExecution,
1✔
687
                domainID,
1✔
688
                workflowID,
1✔
689
                runID,
1✔
690
                defaultVisibilityTimestamp,
1✔
691
                rowTypeExecutionTaskID)
1✔
692
        return nil
1✔
693
}
694

695
func resetChildExecutionInfoMap(
696
        childExecutionInfos map[int64]*persistence.InternalChildExecutionInfo,
697
) (map[int64]map[string]interface{}, error) {
1✔
698

1✔
699
        cMap := make(map[int64]map[string]interface{})
1✔
700
        for _, c := range childExecutionInfos {
2✔
701
                cInfo := make(map[string]interface{})
1✔
702
                cInfo["version"] = c.Version
1✔
703
                cInfo["event_data_encoding"] = c.InitiatedEvent.GetEncodingString()
1✔
704
                cInfo["initiated_id"] = c.InitiatedID
1✔
705
                cInfo["initiated_event_batch_id"] = c.InitiatedEventBatchID
1✔
706
                cInfo["initiated_event"] = c.InitiatedEvent.Data
1✔
707
                cInfo["started_id"] = c.StartedID
1✔
708
                cInfo["started_event"] = c.StartedEvent.Data
1✔
709
                cInfo["create_request_id"] = c.CreateRequestID
1✔
710
                cInfo["started_workflow_id"] = c.StartedWorkflowID
1✔
711
                startedRunID := emptyRunID
1✔
712
                if c.StartedRunID != "" {
2✔
713
                        startedRunID = c.StartedRunID
1✔
714
                }
1✔
715
                cInfo["started_run_id"] = startedRunID
1✔
716
                cInfo["domain_id"] = c.DomainID
1✔
717
                cInfo["domain_name"] = c.DomainNameDEPRECATED
1✔
718
                cInfo["workflow_type_name"] = c.WorkflowTypeName
1✔
719
                cInfo["parent_close_policy"] = int32(c.ParentClosePolicy)
1✔
720

1✔
721
                cMap[c.InitiatedID] = cInfo
1✔
722
        }
723

724
        return cMap, nil
1✔
725
}
726

727
func (db *cdb) updateChildExecutionInfos(
728
        batch gocql.Batch,
729
        shardID int,
730
        domainID string,
731
        workflowID string,
732
        runID string,
733
        childExecutionInfos map[int64]*persistence.InternalChildExecutionInfo,
734
        deleteInfos []int64,
735
) error {
1,693✔
736

1,693✔
737
        for _, c := range childExecutionInfos {
1,706✔
738
                batch.Query(templateUpdateChildExecutionInfoQuery,
13✔
739
                        c.InitiatedID,
13✔
740
                        c.Version,
13✔
741
                        c.InitiatedID,
13✔
742
                        c.InitiatedEventBatchID,
13✔
743
                        c.InitiatedEvent.Data,
13✔
744
                        c.StartedID,
13✔
745
                        c.StartedWorkflowID,
13✔
746
                        c.StartedRunID,
13✔
747
                        c.StartedEvent.Data,
13✔
748
                        c.CreateRequestID,
13✔
749
                        c.InitiatedEvent.GetEncodingString(),
13✔
750
                        c.DomainID,
13✔
751
                        c.DomainNameDEPRECATED,
13✔
752
                        c.WorkflowTypeName,
13✔
753
                        int32(c.ParentClosePolicy),
13✔
754
                        shardID,
13✔
755
                        rowTypeExecution,
13✔
756
                        domainID,
13✔
757
                        workflowID,
13✔
758
                        runID,
13✔
759
                        defaultVisibilityTimestamp,
13✔
760
                        rowTypeExecutionTaskID)
13✔
761
        }
13✔
762

763
        // deleteInfos are the initiatedIDs for ChildInfo being deleted
764
        for _, deleteInfo := range deleteInfos {
1,699✔
765
                batch.Query(templateDeleteChildExecutionInfoQuery,
6✔
766
                        deleteInfo,
6✔
767
                        shardID,
6✔
768
                        rowTypeExecution,
6✔
769
                        domainID,
6✔
770
                        workflowID,
6✔
771
                        runID,
6✔
772
                        defaultVisibilityTimestamp,
6✔
773
                        rowTypeExecutionTaskID)
6✔
774
        }
6✔
775
        return nil
1,693✔
776
}
777

778
func (db *cdb) resetTimerInfos(
779
        batch gocql.Batch,
780
        shardID int,
781
        domainID string,
782
        workflowID string,
783
        runID string,
784
        timerInfos map[string]*persistence.TimerInfo,
785
) error {
1✔
786
        batch.Query(templateResetTimerInfoQuery,
1✔
787
                resetTimerInfoMap(timerInfos),
1✔
788
                shardID,
1✔
789
                rowTypeExecution,
1✔
790
                domainID,
1✔
791
                workflowID,
1✔
792
                runID,
1✔
793
                defaultVisibilityTimestamp,
1✔
794
                rowTypeExecutionTaskID)
1✔
795
        return nil
1✔
796
}
1✔
797

798
func resetTimerInfoMap(
799
        timerInfos map[string]*persistence.TimerInfo,
800
) map[string]map[string]interface{} {
1✔
801

1✔
802
        tMap := make(map[string]map[string]interface{})
1✔
803
        for _, t := range timerInfos {
2✔
804
                tInfo := make(map[string]interface{})
1✔
805
                tInfo["version"] = t.Version
1✔
806
                tInfo["timer_id"] = t.TimerID
1✔
807
                tInfo["started_id"] = t.StartedID
1✔
808
                tInfo["expiry_time"] = t.ExpiryTime
1✔
809
                // task_id is a misleading variable, it actually serves
1✔
810
                // the purpose of indicating whether a timer task is
1✔
811
                // generated for this timer info
1✔
812
                tInfo["task_id"] = t.TaskStatus
1✔
813

1✔
814
                tMap[t.TimerID] = tInfo
1✔
815
        }
1✔
816

817
        return tMap
1✔
818
}
819

820
func (db *cdb) updateTimerInfos(
821
        batch gocql.Batch,
822
        shardID int,
823
        domainID string,
824
        workflowID string,
825
        runID string,
826
        timerInfos map[string]*persistence.TimerInfo,
827
        deleteInfos []string,
828
) error {
1,693✔
829
        for _, timerInfo := range timerInfos {
1,704✔
830
                batch.Query(templateUpdateTimerInfoQuery,
11✔
831
                        timerInfo.TimerID,
11✔
832
                        timerInfo.Version,
11✔
833
                        timerInfo.TimerID,
11✔
834
                        timerInfo.StartedID,
11✔
835
                        timerInfo.ExpiryTime,
11✔
836
                        timerInfo.TaskStatus,
11✔
837
                        shardID,
11✔
838
                        rowTypeExecution,
11✔
839
                        domainID,
11✔
840
                        workflowID,
11✔
841
                        runID,
11✔
842
                        defaultVisibilityTimestamp,
11✔
843
                        rowTypeExecutionTaskID)
11✔
844
        }
11✔
845

846
        for _, deleteInfo := range deleteInfos {
1,701✔
847
                batch.Query(templateDeleteTimerInfoQuery,
8✔
848
                        deleteInfo,
8✔
849
                        shardID,
8✔
850
                        rowTypeExecution,
8✔
851
                        domainID,
8✔
852
                        workflowID,
8✔
853
                        runID,
8✔
854
                        defaultVisibilityTimestamp,
8✔
855
                        rowTypeExecutionTaskID)
8✔
856
        }
8✔
857
        return nil
1,693✔
858
}
859

860
func (db *cdb) resetActivityInfos(
861
        batch gocql.Batch,
862
        shardID int,
863
        domainID string,
864
        workflowID string,
865
        runID string,
866
        activityInfos map[int64]*persistence.InternalActivityInfo,
867
) error {
1✔
868
        infoMap, err := resetActivityInfoMap(activityInfos)
1✔
869
        if err != nil {
1✔
870
                return err
×
871
        }
×
872

873
        batch.Query(templateResetActivityInfoQuery,
1✔
874
                infoMap,
1✔
875
                shardID,
1✔
876
                rowTypeExecution,
1✔
877
                domainID,
1✔
878
                workflowID,
1✔
879
                runID,
1✔
880
                defaultVisibilityTimestamp,
1✔
881
                rowTypeExecutionTaskID)
1✔
882
        return nil
1✔
883
}
884

885
func resetActivityInfoMap(
886
        activityInfos map[int64]*persistence.InternalActivityInfo,
887
) (map[int64]map[string]interface{}, error) {
1✔
888

1✔
889
        aMap := make(map[int64]map[string]interface{})
1✔
890
        for _, a := range activityInfos {
2✔
891
                aInfo := make(map[string]interface{})
1✔
892
                aInfo["version"] = a.Version
1✔
893
                aInfo["event_data_encoding"] = a.ScheduledEvent.GetEncodingString()
1✔
894
                aInfo["schedule_id"] = a.ScheduleID
1✔
895
                aInfo["scheduled_event_batch_id"] = a.ScheduledEventBatchID
1✔
896
                aInfo["scheduled_event"] = a.ScheduledEvent.Data
1✔
897
                aInfo["scheduled_time"] = a.ScheduledTime
1✔
898
                aInfo["started_id"] = a.StartedID
1✔
899
                aInfo["started_event"] = a.StartedEvent.Data
1✔
900
                aInfo["started_time"] = a.StartedTime
1✔
901
                aInfo["activity_id"] = a.ActivityID
1✔
902
                aInfo["request_id"] = a.RequestID
1✔
903
                aInfo["details"] = a.Details
1✔
904
                aInfo["schedule_to_start_timeout"] = int32(a.ScheduleToStartTimeout.Seconds())
1✔
905
                aInfo["schedule_to_close_timeout"] = int32(a.ScheduleToCloseTimeout.Seconds())
1✔
906
                aInfo["start_to_close_timeout"] = int32(a.StartToCloseTimeout.Seconds())
1✔
907
                aInfo["heart_beat_timeout"] = int32(a.HeartbeatTimeout.Seconds())
1✔
908
                aInfo["cancel_requested"] = a.CancelRequested
1✔
909
                aInfo["cancel_request_id"] = a.CancelRequestID
1✔
910
                aInfo["last_hb_updated_time"] = a.LastHeartBeatUpdatedTime
1✔
911
                aInfo["timer_task_status"] = a.TimerTaskStatus
1✔
912
                aInfo["attempt"] = a.Attempt
1✔
913
                aInfo["task_list"] = a.TaskList
1✔
914
                aInfo["started_identity"] = a.StartedIdentity
1✔
915
                aInfo["has_retry_policy"] = a.HasRetryPolicy
1✔
916
                aInfo["init_interval"] = int32(a.InitialInterval.Seconds())
1✔
917
                aInfo["backoff_coefficient"] = a.BackoffCoefficient
1✔
918
                aInfo["max_interval"] = int32(a.MaximumInterval.Seconds())
1✔
919
                aInfo["expiration_time"] = a.ExpirationTime
1✔
920
                aInfo["max_attempts"] = a.MaximumAttempts
1✔
921
                aInfo["non_retriable_errors"] = a.NonRetriableErrors
1✔
922
                aInfo["last_failure_reason"] = a.LastFailureReason
1✔
923
                aInfo["last_worker_identity"] = a.LastWorkerIdentity
1✔
924
                aInfo["last_failure_details"] = a.LastFailureDetails
1✔
925

1✔
926
                aMap[a.ScheduleID] = aInfo
1✔
927
        }
1✔
928

929
        return aMap, nil
1✔
930
}
931

932
func (db *cdb) updateActivityInfos(
933
        batch gocql.Batch,
934
        shardID int,
935
        domainID string,
936
        workflowID string,
937
        runID string,
938
        activityInfos map[int64]*persistence.InternalActivityInfo,
939
        deleteInfos []int64,
940
) error {
1,693✔
941
        for _, a := range activityInfos {
2,126✔
942
                batch.Query(templateUpdateActivityInfoQuery,
433✔
943
                        a.ScheduleID,
433✔
944
                        a.Version,
433✔
945
                        a.ScheduleID,
433✔
946
                        a.ScheduledEventBatchID,
433✔
947
                        a.ScheduledEvent.Data,
433✔
948
                        a.ScheduledTime,
433✔
949
                        a.StartedID,
433✔
950
                        a.StartedEvent.Data,
433✔
951
                        a.StartedTime,
433✔
952
                        a.ActivityID,
433✔
953
                        a.RequestID,
433✔
954
                        a.Details,
433✔
955
                        int32(a.ScheduleToStartTimeout.Seconds()),
433✔
956
                        int32(a.ScheduleToCloseTimeout.Seconds()),
433✔
957
                        int32(a.StartToCloseTimeout.Seconds()),
433✔
958
                        int32(a.HeartbeatTimeout.Seconds()),
433✔
959
                        a.CancelRequested,
433✔
960
                        a.CancelRequestID,
433✔
961
                        a.LastHeartBeatUpdatedTime,
433✔
962
                        a.TimerTaskStatus,
433✔
963
                        a.Attempt,
433✔
964
                        a.TaskList,
433✔
965
                        a.StartedIdentity,
433✔
966
                        a.HasRetryPolicy,
433✔
967
                        int32(a.InitialInterval.Seconds()),
433✔
968
                        a.BackoffCoefficient,
433✔
969
                        int32(a.MaximumInterval.Seconds()),
433✔
970
                        a.ExpirationTime,
433✔
971
                        a.MaximumAttempts,
433✔
972
                        a.NonRetriableErrors,
433✔
973
                        a.LastFailureReason,
433✔
974
                        a.LastWorkerIdentity,
433✔
975
                        a.LastFailureDetails,
433✔
976
                        a.ScheduledEvent.GetEncodingString(),
433✔
977
                        shardID,
433✔
978
                        rowTypeExecution,
433✔
979
                        domainID,
433✔
980
                        workflowID,
433✔
981
                        runID,
433✔
982
                        defaultVisibilityTimestamp,
433✔
983
                        rowTypeExecutionTaskID)
433✔
984
        }
433✔
985

986
        for _, deleteInfo := range deleteInfos {
1,822✔
987
                batch.Query(templateDeleteActivityInfoQuery,
129✔
988
                        deleteInfo,
129✔
989
                        shardID,
129✔
990
                        rowTypeExecution,
129✔
991
                        domainID,
129✔
992
                        workflowID,
129✔
993
                        runID,
129✔
994
                        defaultVisibilityTimestamp,
129✔
995
                        rowTypeExecutionTaskID)
129✔
996
        }
129✔
997
        return nil
1,693✔
998
}
999

1000
// NOTE: not sure we still need it. We keep the behavior for safe during refactoring
1001
// In theory we can just return the input as output
1002
// TODO: if possible, remove it in the future or add more comment of why we need this conversion
1003
func (db *cdb) convertToCassandraTimestamp(in time.Time) time.Time {
3,385✔
1004
        return time.Unix(0, persistence.DBTimestampToUnixNano(persistence.UnixNanoToDBTimestamp(in.UnixNano())))
3,385✔
1005
}
3,385✔
1006

1007
// TODO: if possible, remove the copy in the future, or add comment of why we need it
1008
func getNextPageToken(iter gocql.Iter) []byte {
1,946✔
1009
        nextPageToken := iter.PageState()
1,946✔
1010
        newPageToken := make([]byte, len(nextPageToken))
1,946✔
1011
        copy(newPageToken, nextPageToken)
1,946✔
1012
        return newPageToken
1,946✔
1013
}
1,946✔
1014

1015
func (db *cdb) createWorkflowExecutionWithMergeMaps(
1016
        batch gocql.Batch,
1017
        shardID int,
1018
        domainID string,
1019
        workflowID string,
1020
        execution *nosqlplugin.WorkflowExecutionRequest,
1021
) error {
226✔
1022
        err := db.createWorkflowExecution(batch, shardID, domainID, workflowID, execution)
226✔
1023
        if err != nil {
226✔
1024
                return err
×
1025
        }
×
1026

1027
        if execution.EventBufferWriteMode != nosqlplugin.EventBufferWriteModeNone {
226✔
1028
                return fmt.Errorf("should only support EventBufferWriteModeNone")
×
1029
        }
×
1030

1031
        if execution.MapsWriteMode != nosqlplugin.WorkflowExecutionMapsWriteModeCreate {
226✔
1032
                return fmt.Errorf("should only support WorkflowExecutionMapsWriteModeCreate")
×
1033
        }
×
1034

1035
        err = db.updateActivityInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.ActivityInfos, nil)
226✔
1036
        if err != nil {
226✔
1037
                return err
×
1038
        }
×
1039
        err = db.updateTimerInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.TimerInfos, nil)
226✔
1040
        if err != nil {
226✔
1041
                return err
×
1042
        }
×
1043
        err = db.updateChildExecutionInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.ChildWorkflowInfos, nil)
226✔
1044
        if err != nil {
226✔
1045
                return err
×
1046
        }
×
1047
        err = db.updateRequestCancelInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.RequestCancelInfos, nil)
226✔
1048
        if err != nil {
226✔
1049
                return err
×
1050
        }
×
1051
        err = db.updateSignalInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.SignalInfos, nil)
226✔
1052
        if err != nil {
226✔
1053
                return err
×
1054
        }
×
1055
        return db.updateSignalsRequested(batch, shardID, domainID, workflowID, execution.RunID, execution.SignalRequestedIDs, nil)
226✔
1056
}
1057

1058
func (db *cdb) resetWorkflowExecutionAndMapsAndEventBuffer(
1059
        batch gocql.Batch,
1060
        shardID int,
1061
        domainID string,
1062
        workflowID string,
1063
        execution *nosqlplugin.WorkflowExecutionRequest,
1064
) error {
1✔
1065
        err := db.updateWorkflowExecution(batch, shardID, domainID, workflowID, execution)
1✔
1066
        if err != nil {
1✔
1067
                return err
×
1068
        }
×
1069

1070
        if execution.EventBufferWriteMode != nosqlplugin.EventBufferWriteModeClear {
1✔
1071
                return fmt.Errorf("should only support EventBufferWriteModeClear")
×
1072
        }
×
1073
        err = deleteBufferedEvents(batch, shardID, domainID, workflowID, execution.RunID)
1✔
1074
        if err != nil {
1✔
1075
                return err
×
1076
        }
×
1077

1078
        if execution.MapsWriteMode != nosqlplugin.WorkflowExecutionMapsWriteModeReset {
1✔
1079
                return fmt.Errorf("should only support WorkflowExecutionMapsWriteModeReset")
×
1080
        }
×
1081

1082
        err = db.resetActivityInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.ActivityInfos)
1✔
1083
        if err != nil {
1✔
1084
                return err
×
1085
        }
×
1086
        err = db.resetTimerInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.TimerInfos)
1✔
1087
        if err != nil {
1✔
1088
                return err
×
1089
        }
×
1090
        err = db.resetChildExecutionInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.ChildWorkflowInfos)
1✔
1091
        if err != nil {
1✔
1092
                return err
×
1093
        }
×
1094
        err = db.resetRequestCancelInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.RequestCancelInfos)
1✔
1095
        if err != nil {
1✔
1096
                return err
×
1097
        }
×
1098
        err = db.resetSignalInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.SignalInfos)
1✔
1099
        if err != nil {
1✔
1100
                return err
×
1101
        }
×
1102
        return db.resetSignalsRequested(batch, shardID, domainID, workflowID, execution.RunID, execution.SignalRequestedIDs)
1✔
1103
}
1104

1105
func appendBufferedEvents(
1106
        batch gocql.Batch,
1107
        newBufferedEvents *persistence.DataBlob,
1108
        shardID int,
1109
        domainID string,
1110
        workflowID string,
1111
        runID string,
1112
) error {
214✔
1113
        values := make(map[string]interface{})
214✔
1114
        values["encoding_type"] = newBufferedEvents.Encoding
214✔
1115
        values["version"] = int64(0)
214✔
1116
        values["data"] = newBufferedEvents.Data
214✔
1117
        newEventValues := []map[string]interface{}{values}
214✔
1118
        batch.Query(templateAppendBufferedEventsQuery,
214✔
1119
                newEventValues,
214✔
1120
                shardID,
214✔
1121
                rowTypeExecution,
214✔
1122
                domainID,
214✔
1123
                workflowID,
214✔
1124
                runID,
214✔
1125
                defaultVisibilityTimestamp,
214✔
1126
                rowTypeExecutionTaskID)
214✔
1127
        return nil
214✔
1128
}
214✔
1129

1130
func deleteBufferedEvents(
1131
        batch gocql.Batch,
1132
        shardID int,
1133
        domainID string,
1134
        workflowID string,
1135
        runID string,
1136
) error {
14✔
1137
        batch.Query(templateDeleteBufferedEventsQuery,
14✔
1138
                shardID,
14✔
1139
                rowTypeExecution,
14✔
1140
                domainID,
14✔
1141
                workflowID,
14✔
1142
                runID,
14✔
1143
                defaultVisibilityTimestamp,
14✔
1144
                rowTypeExecutionTaskID)
14✔
1145
        return nil
14✔
1146
}
14✔
1147

1148
func (db *cdb) updateWorkflowExecutionAndEventBufferWithMergeAndDeleteMaps(
1149
        batch gocql.Batch,
1150
        shardID int,
1151
        domainID string,
1152
        workflowID string,
1153
        execution *nosqlplugin.WorkflowExecutionRequest,
1154
) error {
1,468✔
1155
        err := db.updateWorkflowExecution(batch, shardID, domainID, workflowID, execution)
1,468✔
1156
        if err != nil {
1,468✔
1157
                return err
×
1158
        }
×
1159

1160
        if execution.EventBufferWriteMode == nosqlplugin.EventBufferWriteModeClear {
1,481✔
1161
                err = deleteBufferedEvents(batch, shardID, domainID, workflowID, execution.RunID)
13✔
1162
                if err != nil {
13✔
1163
                        return err
×
1164
                }
×
1165
        } else if execution.EventBufferWriteMode == nosqlplugin.EventBufferWriteModeAppend {
1,669✔
1166
                err = appendBufferedEvents(batch, execution.NewBufferedEventBatch, shardID, domainID, workflowID, execution.RunID)
214✔
1167
                if err != nil {
214✔
1168
                        return err
×
1169
                }
×
1170
        }
1171

1172
        if execution.MapsWriteMode != nosqlplugin.WorkflowExecutionMapsWriteModeUpdate {
1,468✔
1173
                return fmt.Errorf("should only support WorkflowExecutionMapsWriteModeUpdate")
×
1174
        }
×
1175

1176
        err = db.updateActivityInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.ActivityInfos, execution.ActivityInfoKeysToDelete)
1,468✔
1177
        if err != nil {
1,468✔
1178
                return err
×
1179
        }
×
1180
        err = db.updateTimerInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.TimerInfos, execution.TimerInfoKeysToDelete)
1,468✔
1181
        if err != nil {
1,468✔
1182
                return err
×
1183
        }
×
1184
        err = db.updateChildExecutionInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.ChildWorkflowInfos, execution.ChildWorkflowInfoKeysToDelete)
1,468✔
1185
        if err != nil {
1,468✔
1186
                return err
×
1187
        }
×
1188
        err = db.updateRequestCancelInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.RequestCancelInfos, execution.RequestCancelInfoKeysToDelete)
1,468✔
1189
        if err != nil {
1,468✔
1190
                return err
×
1191
        }
×
1192
        err = db.updateSignalInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.SignalInfos, execution.SignalInfoKeysToDelete)
1,468✔
1193
        if err != nil {
1,468✔
1194
                return err
×
1195
        }
×
1196
        return db.updateSignalsRequested(batch, shardID, domainID, workflowID, execution.RunID, execution.SignalRequestedIDs, execution.SignalRequestedIDsKeysToDelete)
1,468✔
1197
}
1198

1199
func (db *cdb) updateWorkflowExecution(
1200
        batch gocql.Batch,
1201
        shardID int,
1202
        domainID string,
1203
        workflowID string,
1204
        execution *nosqlplugin.WorkflowExecutionRequest,
1205
) error {
1,468✔
1206
        execution.StartTimestamp = db.convertToCassandraTimestamp(execution.StartTimestamp)
1,468✔
1207
        execution.LastUpdatedTimestamp = db.convertToCassandraTimestamp(execution.LastUpdatedTimestamp)
1,468✔
1208

1,468✔
1209
        batch.Query(templateUpdateWorkflowExecutionWithVersionHistoriesQuery,
1,468✔
1210
                domainID,
1,468✔
1211
                workflowID,
1,468✔
1212
                execution.RunID,
1,468✔
1213
                execution.FirstExecutionRunID,
1,468✔
1214
                execution.ParentDomainID,
1,468✔
1215
                execution.ParentWorkflowID,
1,468✔
1216
                execution.ParentRunID,
1,468✔
1217
                execution.InitiatedID,
1,468✔
1218
                execution.CompletionEventBatchID,
1,468✔
1219
                execution.CompletionEvent.Data,
1,468✔
1220
                execution.CompletionEvent.GetEncodingString(),
1,468✔
1221
                execution.TaskList,
1,468✔
1222
                execution.WorkflowTypeName,
1,468✔
1223
                int32(execution.WorkflowTimeout.Seconds()),
1,468✔
1224
                int32(execution.DecisionStartToCloseTimeout.Seconds()),
1,468✔
1225
                execution.ExecutionContext,
1,468✔
1226
                execution.State,
1,468✔
1227
                execution.CloseStatus,
1,468✔
1228
                execution.LastFirstEventID,
1,468✔
1229
                execution.LastEventTaskID,
1,468✔
1230
                execution.NextEventID,
1,468✔
1231
                execution.LastProcessedEvent,
1,468✔
1232
                execution.StartTimestamp,
1,468✔
1233
                execution.LastUpdatedTimestamp,
1,468✔
1234
                execution.CreateRequestID,
1,468✔
1235
                execution.SignalCount,
1,468✔
1236
                execution.HistorySize,
1,468✔
1237
                execution.DecisionVersion,
1,468✔
1238
                execution.DecisionScheduleID,
1,468✔
1239
                execution.DecisionStartedID,
1,468✔
1240
                execution.DecisionRequestID,
1,468✔
1241
                int32(execution.DecisionTimeout.Seconds()),
1,468✔
1242
                execution.DecisionAttempt,
1,468✔
1243
                execution.DecisionStartedTimestamp.UnixNano(),
1,468✔
1244
                execution.DecisionScheduledTimestamp.UnixNano(),
1,468✔
1245
                execution.DecisionOriginalScheduledTimestamp.UnixNano(),
1,468✔
1246
                execution.CancelRequested,
1,468✔
1247
                execution.CancelRequestID,
1,468✔
1248
                execution.StickyTaskList,
1,468✔
1249
                int32(execution.StickyScheduleToStartTimeout.Seconds()),
1,468✔
1250
                execution.ClientLibraryVersion,
1,468✔
1251
                execution.ClientFeatureVersion,
1,468✔
1252
                execution.ClientImpl,
1,468✔
1253
                execution.AutoResetPoints.Data,
1,468✔
1254
                execution.AutoResetPoints.GetEncoding(),
1,468✔
1255
                execution.Attempt,
1,468✔
1256
                execution.HasRetryPolicy,
1,468✔
1257
                int32(execution.InitialInterval.Seconds()),
1,468✔
1258
                execution.BackoffCoefficient,
1,468✔
1259
                int32(execution.MaximumInterval.Seconds()),
1,468✔
1260
                execution.ExpirationTime,
1,468✔
1261
                execution.MaximumAttempts,
1,468✔
1262
                execution.NonRetriableErrors,
1,468✔
1263
                persistence.EventStoreVersion,
1,468✔
1264
                execution.BranchToken,
1,468✔
1265
                execution.CronSchedule,
1,468✔
1266
                int32(execution.ExpirationInterval.Seconds()),
1,468✔
1267
                execution.SearchAttributes,
1,468✔
1268
                execution.Memo,
1,468✔
1269
                execution.NextEventID,
1,468✔
1270
                execution.VersionHistories.Data,
1,468✔
1271
                execution.VersionHistories.GetEncodingString(),
1,468✔
1272
                execution.Checksums.Version,
1,468✔
1273
                execution.Checksums.Flavor,
1,468✔
1274
                execution.Checksums.Value,
1,468✔
1275
                execution.LastWriteVersion,
1,468✔
1276
                execution.State,
1,468✔
1277
                shardID,
1,468✔
1278
                rowTypeExecution,
1,468✔
1279
                domainID,
1,468✔
1280
                workflowID,
1,468✔
1281
                execution.RunID,
1,468✔
1282
                defaultVisibilityTimestamp,
1,468✔
1283
                rowTypeExecutionTaskID,
1,468✔
1284
                execution.PreviousNextEventIDCondition)
1,468✔
1285

1,468✔
1286
        return nil
1,468✔
1287
}
1,468✔
1288

1289
func (db *cdb) createWorkflowExecution(
1290
        batch gocql.Batch,
1291
        shardID int,
1292
        domainID string,
1293
        workflowID string,
1294
        execution *nosqlplugin.WorkflowExecutionRequest,
1295
) error {
226✔
1296
        execution.StartTimestamp = db.convertToCassandraTimestamp(execution.StartTimestamp)
226✔
1297
        execution.LastUpdatedTimestamp = db.convertToCassandraTimestamp(execution.LastUpdatedTimestamp)
226✔
1298

226✔
1299
        batch.Query(templateCreateWorkflowExecutionWithVersionHistoriesQuery,
226✔
1300
                shardID,
226✔
1301
                domainID,
226✔
1302
                workflowID,
226✔
1303
                execution.RunID,
226✔
1304
                rowTypeExecution,
226✔
1305
                domainID,
226✔
1306
                workflowID,
226✔
1307
                execution.RunID,
226✔
1308
                execution.FirstExecutionRunID,
226✔
1309
                execution.ParentDomainID,
226✔
1310
                execution.ParentWorkflowID,
226✔
1311
                execution.ParentRunID,
226✔
1312
                execution.InitiatedID,
226✔
1313
                execution.CompletionEventBatchID,
226✔
1314
                execution.CompletionEvent.Data,
226✔
1315
                execution.CompletionEvent.GetEncodingString(),
226✔
1316
                execution.TaskList,
226✔
1317
                execution.WorkflowTypeName,
226✔
1318
                int32(execution.WorkflowTimeout.Seconds()),
226✔
1319
                int32(execution.DecisionStartToCloseTimeout.Seconds()),
226✔
1320
                execution.ExecutionContext,
226✔
1321
                execution.State,
226✔
1322
                execution.CloseStatus,
226✔
1323
                execution.LastFirstEventID,
226✔
1324
                execution.LastEventTaskID,
226✔
1325
                execution.NextEventID,
226✔
1326
                execution.LastProcessedEvent,
226✔
1327
                execution.StartTimestamp,
226✔
1328
                execution.LastUpdatedTimestamp,
226✔
1329
                execution.CreateRequestID,
226✔
1330
                execution.SignalCount,
226✔
1331
                execution.HistorySize,
226✔
1332
                execution.DecisionVersion,
226✔
1333
                execution.DecisionScheduleID,
226✔
1334
                execution.DecisionStartedID,
226✔
1335
                execution.DecisionRequestID,
226✔
1336
                int32(execution.DecisionTimeout.Seconds()),
226✔
1337
                execution.DecisionAttempt,
226✔
1338
                execution.DecisionStartedTimestamp.UnixNano(),
226✔
1339
                execution.DecisionScheduledTimestamp.UnixNano(),
226✔
1340
                execution.DecisionOriginalScheduledTimestamp.UnixNano(),
226✔
1341
                execution.CancelRequested,
226✔
1342
                execution.CancelRequestID,
226✔
1343
                execution.StickyTaskList,
226✔
1344
                int32(execution.StickyScheduleToStartTimeout.Seconds()),
226✔
1345
                execution.ClientLibraryVersion,
226✔
1346
                execution.ClientFeatureVersion,
226✔
1347
                execution.ClientImpl,
226✔
1348
                execution.AutoResetPoints.Data,
226✔
1349
                execution.AutoResetPoints.GetEncodingString(),
226✔
1350
                execution.Attempt,
226✔
1351
                execution.HasRetryPolicy,
226✔
1352
                int32(execution.InitialInterval.Seconds()),
226✔
1353
                execution.BackoffCoefficient,
226✔
1354
                int32(execution.MaximumInterval.Seconds()),
226✔
1355
                execution.ExpirationTime,
226✔
1356
                execution.MaximumAttempts,
226✔
1357
                execution.NonRetriableErrors,
226✔
1358
                persistence.EventStoreVersion,
226✔
1359
                execution.BranchToken,
226✔
1360
                execution.CronSchedule,
226✔
1361
                int32(execution.ExpirationInterval.Seconds()),
226✔
1362
                execution.SearchAttributes,
226✔
1363
                execution.Memo,
226✔
1364
                execution.NextEventID,
226✔
1365
                defaultVisibilityTimestamp,
226✔
1366
                rowTypeExecutionTaskID,
226✔
1367
                execution.VersionHistories.Data,
226✔
1368
                execution.VersionHistories.GetEncodingString(),
226✔
1369
                execution.Checksums.Version,
226✔
1370
                execution.Checksums.Flavor,
226✔
1371
                execution.Checksums.Value,
226✔
1372
                execution.LastWriteVersion,
226✔
1373
                execution.State,
226✔
1374
        )
226✔
1375
        return nil
226✔
1376
}
226✔
1377

1378
func (db *cdb) createOrUpdateCurrentWorkflow(
1379
        batch gocql.Batch,
1380
        shardID int,
1381
        domainID string,
1382
        workflowID string,
1383
        request *nosqlplugin.CurrentWorkflowWriteRequest,
1384
) error {
1,635✔
1385
        switch request.WriteMode {
1,635✔
1386
        case nosqlplugin.CurrentWorkflowWriteModeNoop:
1✔
1387
                return nil
1✔
1388
        case nosqlplugin.CurrentWorkflowWriteModeInsert:
158✔
1389
                batch.Query(templateCreateCurrentWorkflowExecutionQuery,
158✔
1390
                        shardID,
158✔
1391
                        rowTypeExecution,
158✔
1392
                        domainID,
158✔
1393
                        workflowID,
158✔
1394
                        permanentRunID,
158✔
1395
                        defaultVisibilityTimestamp,
158✔
1396
                        rowTypeExecutionTaskID,
158✔
1397
                        request.Row.RunID,
158✔
1398
                        request.Row.RunID,
158✔
1399
                        request.Row.CreateRequestID,
158✔
1400
                        request.Row.State,
158✔
1401
                        request.Row.CloseStatus,
158✔
1402
                        request.Row.LastWriteVersion,
158✔
1403
                        request.Row.State,
158✔
1404
                )
158✔
1405
        case nosqlplugin.CurrentWorkflowWriteModeUpdate:
1,478✔
1406
                if request.Condition == nil || request.Condition.GetCurrentRunID() == "" {
1,478✔
1407
                        return fmt.Errorf("CurrentWorkflowWriteModeUpdate require Condition.CurrentRunID")
×
1408
                }
×
1409
                if request.Condition.LastWriteVersion != nil && request.Condition.State != nil {
1,488✔
1410
                        batch.Query(templateUpdateCurrentWorkflowExecutionForNewQuery,
10✔
1411
                                request.Row.RunID,
10✔
1412
                                request.Row.RunID,
10✔
1413
                                request.Row.CreateRequestID,
10✔
1414
                                request.Row.State,
10✔
1415
                                request.Row.CloseStatus,
10✔
1416
                                request.Row.LastWriteVersion,
10✔
1417
                                request.Row.State,
10✔
1418
                                shardID,
10✔
1419
                                rowTypeExecution,
10✔
1420
                                domainID,
10✔
1421
                                workflowID,
10✔
1422
                                permanentRunID,
10✔
1423
                                defaultVisibilityTimestamp,
10✔
1424
                                rowTypeExecutionTaskID,
10✔
1425
                                *request.Condition.CurrentRunID,
10✔
1426
                                *request.Condition.LastWriteVersion,
10✔
1427
                                *request.Condition.State,
10✔
1428
                        )
10✔
1429
                } else {
1,479✔
1430
                        batch.Query(templateUpdateCurrentWorkflowExecutionQuery,
1,469✔
1431
                                request.Row.RunID,
1,469✔
1432
                                request.Row.RunID,
1,469✔
1433
                                request.Row.CreateRequestID,
1,469✔
1434
                                request.Row.State,
1,469✔
1435
                                request.Row.CloseStatus,
1,469✔
1436
                                request.Row.LastWriteVersion,
1,469✔
1437
                                request.Row.State,
1,469✔
1438
                                shardID,
1,469✔
1439
                                rowTypeExecution,
1,469✔
1440
                                domainID,
1,469✔
1441
                                workflowID,
1,469✔
1442
                                permanentRunID,
1,469✔
1443
                                defaultVisibilityTimestamp,
1,469✔
1444
                                rowTypeExecutionTaskID,
1,469✔
1445
                                *request.Condition.CurrentRunID,
1,469✔
1446
                        )
1,469✔
1447
                }
1,469✔
1448
        default:
×
1449
                return fmt.Errorf("unknown mode %v", request.WriteMode)
×
1450
        }
1451
        return nil
1,635✔
1452
}
1453

1454
func mustConvertToSlice(value interface{}) []interface{} {
2,085✔
1455
        v := reflect.ValueOf(value)
2,085✔
1456
        switch v.Kind() {
2,085✔
1457
        case reflect.Slice, reflect.Array:
2,085✔
1458
                result := make([]interface{}, v.Len())
2,085✔
1459
                for i := 0; i < v.Len(); i++ {
2,095✔
1460
                        result[i] = v.Index(i).Interface()
10✔
1461
                }
10✔
1462
                return result
2,085✔
1463
        default:
×
1464
                panic(fmt.Sprintf("Unable to convert %v to slice", value))
×
1465
        }
1466
}
1467

1468
func populateGetReplicationTasks(
1469
        query gocql.Query,
1470
) ([]*nosqlplugin.ReplicationTask, []byte, error) {
41✔
1471
        iter := query.Iter()
41✔
1472
        if iter == nil {
41✔
1473
                return nil, nil, &types.InternalServiceError{
×
1474
                        Message: "populateGetReplicationTasks operation failed.  Not able to create query iterator.",
×
1475
                }
×
1476
        }
×
1477

1478
        var tasks []*nosqlplugin.ReplicationTask
41✔
1479
        task := make(map[string]interface{})
41✔
1480
        for iter.MapScan(task) {
42✔
1481
                t := parseReplicationTaskInfo(task["replication"].(map[string]interface{}))
1✔
1482
                // Reset task map to get it ready for next scan
1✔
1483
                task = make(map[string]interface{})
1✔
1484

1✔
1485
                tasks = append(tasks, t)
1✔
1486
        }
1✔
1487
        nextPageToken := getNextPageToken(iter)
41✔
1488
        err := iter.Close()
41✔
1489

41✔
1490
        return tasks, nextPageToken, err
41✔
1491
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc