• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e53aa-bc8b-40e2-9d7c-012f8da56b45

18 Mar 2024 10:24PM UTC coverage: 64.926% (-0.005%) from 64.931%
018e53aa-bc8b-40e2-9d7c-012f8da56b45

push

buildkite

web-flow
Fix checksum validation for SQL implementation (#5790)

What changed?
Add a check for the SQL implementation of GetWorkflowExecution operation to exclude false positive checksum validation failure cases.

Why?
To make sure the checksum validation result is true, the data we read from GetWorkflowExecution operation are from a consistent view. In the NoSQL implementation, the operation is a single read, so the data is from a consistent view. However, in the SQL implementation, the operation is multiple reads from different table. If there is a concurrent update, the data we read isn't from a consistent view and the checksum validation could fail. Normally, we don't have concurrent updates with reads. But when the shard ownership changed, it might not be the case.

24 of 26 new or added lines in 5 files covered. (92.31%)

66 existing lines in 14 files now uncovered.

94774 of 145972 relevant lines covered (64.93%)

2395.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.82
/common/persistence/sql/sql_execution_store_util.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package sql
23

24
import (
25
        "bytes"
26
        "context"
27
        "database/sql"
28
        "fmt"
29
        "time"
30

31
        "github.com/uber/cadence/common"
32
        p "github.com/uber/cadence/common/persistence"
33
        "github.com/uber/cadence/common/persistence/serialization"
34
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
35
        "github.com/uber/cadence/common/types"
36
)
37

38
func applyWorkflowMutationTx(
39
        ctx context.Context,
40
        tx sqlplugin.Tx,
41
        shardID int,
42
        workflowMutation *p.InternalWorkflowMutation,
43
        parser serialization.Parser,
44
) error {
2,909✔
45

2,909✔
46
        executionInfo := workflowMutation.ExecutionInfo
2,909✔
47
        versionHistories := workflowMutation.VersionHistories
2,909✔
48
        workflowChecksum := workflowMutation.ChecksumData
2,909✔
49
        startVersion := workflowMutation.StartVersion
2,909✔
50
        lastWriteVersion := workflowMutation.LastWriteVersion
2,909✔
51
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
2,909✔
52
        workflowID := executionInfo.WorkflowID
2,909✔
53
        runID := serialization.MustParseUUID(executionInfo.RunID)
2,909✔
54

2,909✔
55
        // TODO Remove me if UPDATE holds the lock to the end of a transaction
2,909✔
56
        if err := lockAndCheckNextEventID(
2,909✔
57
                ctx,
2,909✔
58
                tx,
2,909✔
59
                shardID,
2,909✔
60
                domainID,
2,909✔
61
                workflowID,
2,909✔
62
                runID,
2,909✔
63
                workflowMutation.Condition); err != nil {
2,909✔
64
                return err
×
65
        }
×
66

67
        if err := updateExecution(
2,909✔
68
                ctx,
2,909✔
69
                tx,
2,909✔
70
                executionInfo,
2,909✔
71
                versionHistories,
2,909✔
72
                workflowChecksum,
2,909✔
73
                startVersion,
2,909✔
74
                lastWriteVersion,
2,909✔
75
                shardID,
2,909✔
76
                parser); err != nil {
2,909✔
77
                return err
×
78
        }
×
79

80
        if err := applyTasks(
2,909✔
81
                ctx,
2,909✔
82
                tx,
2,909✔
83
                shardID,
2,909✔
84
                domainID,
2,909✔
85
                workflowID,
2,909✔
86
                runID,
2,909✔
87
                workflowMutation.TransferTasks,
2,909✔
88
                workflowMutation.CrossClusterTasks,
2,909✔
89
                workflowMutation.ReplicationTasks,
2,909✔
90
                workflowMutation.TimerTasks,
2,909✔
91
                parser,
2,909✔
92
        ); err != nil {
2,909✔
93
                return err
×
94
        }
×
95

96
        if err := updateActivityInfos(
2,909✔
97
                ctx,
2,909✔
98
                tx,
2,909✔
99
                workflowMutation.UpsertActivityInfos,
2,909✔
100
                workflowMutation.DeleteActivityInfos,
2,909✔
101
                shardID,
2,909✔
102
                domainID,
2,909✔
103
                workflowID,
2,909✔
104
                runID,
2,909✔
105
                parser,
2,909✔
106
        ); err != nil {
2,909✔
107
                return err
×
108
        }
×
109

110
        if err := updateTimerInfos(
2,909✔
111
                ctx,
2,909✔
112
                tx,
2,909✔
113
                workflowMutation.UpsertTimerInfos,
2,909✔
114
                workflowMutation.DeleteTimerInfos,
2,909✔
115
                shardID,
2,909✔
116
                domainID,
2,909✔
117
                workflowID,
2,909✔
118
                runID,
2,909✔
119
                parser,
2,909✔
120
        ); err != nil {
2,909✔
121
                return err
×
122
        }
×
123

124
        if err := updateChildExecutionInfos(
2,909✔
125
                ctx,
2,909✔
126
                tx,
2,909✔
127
                workflowMutation.UpsertChildExecutionInfos,
2,909✔
128
                workflowMutation.DeleteChildExecutionInfos,
2,909✔
129
                shardID,
2,909✔
130
                domainID,
2,909✔
131
                workflowID,
2,909✔
132
                runID,
2,909✔
133
                parser,
2,909✔
134
        ); err != nil {
2,909✔
135
                return err
×
136
        }
×
137

138
        if err := updateRequestCancelInfos(
2,909✔
139
                ctx,
2,909✔
140
                tx,
2,909✔
141
                workflowMutation.UpsertRequestCancelInfos,
2,909✔
142
                workflowMutation.DeleteRequestCancelInfos,
2,909✔
143
                shardID,
2,909✔
144
                domainID,
2,909✔
145
                workflowID,
2,909✔
146
                runID,
2,909✔
147
                parser,
2,909✔
148
        ); err != nil {
2,909✔
149
                return err
×
150
        }
×
151

152
        if err := updateSignalInfos(
2,909✔
153
                ctx,
2,909✔
154
                tx,
2,909✔
155
                workflowMutation.UpsertSignalInfos,
2,909✔
156
                workflowMutation.DeleteSignalInfos,
2,909✔
157
                shardID,
2,909✔
158
                domainID,
2,909✔
159
                workflowID,
2,909✔
160
                runID,
2,909✔
161
                parser,
2,909✔
162
        ); err != nil {
2,909✔
163
                return err
×
164
        }
×
165

166
        if err := updateSignalsRequested(
2,909✔
167
                ctx,
2,909✔
168
                tx,
2,909✔
169
                workflowMutation.UpsertSignalRequestedIDs,
2,909✔
170
                workflowMutation.DeleteSignalRequestedIDs,
2,909✔
171
                shardID,
2,909✔
172
                domainID,
2,909✔
173
                workflowID,
2,909✔
174
                runID,
2,909✔
175
        ); err != nil {
2,909✔
176
                return err
×
177
        }
×
178

179
        if workflowMutation.ClearBufferedEvents {
2,936✔
180
                if err := deleteBufferedEvents(
27✔
181
                        ctx,
27✔
182
                        tx,
27✔
183
                        shardID,
27✔
184
                        domainID,
27✔
185
                        workflowID,
27✔
186
                        runID,
27✔
187
                ); err != nil {
27✔
188
                        return err
×
189
                }
×
190
        }
191

192
        return updateBufferedEvents(
2,909✔
193
                ctx,
2,909✔
194
                tx,
2,909✔
195
                workflowMutation.NewBufferedEvents,
2,909✔
196
                shardID,
2,909✔
197
                domainID,
2,909✔
198
                workflowID,
2,909✔
199
                runID,
2,909✔
200
        )
2,909✔
201
}
202

203
func applyWorkflowSnapshotTxAsReset(
204
        ctx context.Context,
205
        tx sqlplugin.Tx,
206
        shardID int,
207
        workflowSnapshot *p.InternalWorkflowSnapshot,
208
        parser serialization.Parser,
209
) error {
3✔
210

3✔
211
        executionInfo := workflowSnapshot.ExecutionInfo
3✔
212
        versionHistories := workflowSnapshot.VersionHistories
3✔
213
        workflowChecksum := workflowSnapshot.ChecksumData
3✔
214
        startVersion := workflowSnapshot.StartVersion
3✔
215
        lastWriteVersion := workflowSnapshot.LastWriteVersion
3✔
216
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
3✔
217
        workflowID := executionInfo.WorkflowID
3✔
218
        runID := serialization.MustParseUUID(executionInfo.RunID)
3✔
219

3✔
220
        // TODO Is there a way to modify the various map tables without fear of other people adding rows after we delete, without locking the executions row?
3✔
221
        if err := lockAndCheckNextEventID(
3✔
222
                ctx,
3✔
223
                tx,
3✔
224
                shardID,
3✔
225
                domainID,
3✔
226
                workflowID,
3✔
227
                runID,
3✔
228
                workflowSnapshot.Condition); err != nil {
3✔
229
                return err
×
230
        }
×
231

232
        if err := updateExecution(
3✔
233
                ctx,
3✔
234
                tx,
3✔
235
                executionInfo,
3✔
236
                versionHistories,
3✔
237
                workflowChecksum,
3✔
238
                startVersion,
3✔
239
                lastWriteVersion,
3✔
240
                shardID,
3✔
241
                parser); err != nil {
3✔
242
                return err
×
243
        }
×
244

245
        if err := applyTasks(
3✔
246
                ctx,
3✔
247
                tx,
3✔
248
                shardID,
3✔
249
                domainID,
3✔
250
                workflowID,
3✔
251
                runID,
3✔
252
                workflowSnapshot.TransferTasks,
3✔
253
                workflowSnapshot.CrossClusterTasks,
3✔
254
                workflowSnapshot.ReplicationTasks,
3✔
255
                workflowSnapshot.TimerTasks,
3✔
256
                parser,
3✔
257
        ); err != nil {
3✔
258
                return err
×
259
        }
×
260

261
        if err := deleteActivityInfoMap(
3✔
262
                ctx,
3✔
263
                tx,
3✔
264
                shardID,
3✔
265
                domainID,
3✔
266
                workflowID,
3✔
267
                runID); err != nil {
3✔
268
                return err
×
269
        }
×
270

271
        if err := updateActivityInfos(
3✔
272
                ctx,
3✔
273
                tx,
3✔
274
                workflowSnapshot.ActivityInfos,
3✔
275
                nil,
3✔
276
                shardID,
3✔
277
                domainID,
3✔
278
                workflowID,
3✔
279
                runID,
3✔
280
                parser); err != nil {
3✔
281
                return err
×
282
        }
×
283

284
        if err := deleteTimerInfoMap(
3✔
285
                ctx,
3✔
286
                tx,
3✔
287
                shardID,
3✔
288
                domainID,
3✔
289
                workflowID,
3✔
290
                runID); err != nil {
3✔
291
                return err
×
292
        }
×
293

294
        if err := updateTimerInfos(
3✔
295
                ctx,
3✔
296
                tx,
3✔
297
                workflowSnapshot.TimerInfos,
3✔
298
                nil,
3✔
299
                shardID,
3✔
300
                domainID,
3✔
301
                workflowID,
3✔
302
                runID,
3✔
303
                parser); err != nil {
3✔
304
                return err
×
305
        }
×
306

307
        if err := deleteChildExecutionInfoMap(
3✔
308
                ctx,
3✔
309
                tx,
3✔
310
                shardID,
3✔
311
                domainID,
3✔
312
                workflowID,
3✔
313
                runID); err != nil {
3✔
314
                return err
×
315
        }
×
316

317
        if err := updateChildExecutionInfos(
3✔
318
                ctx,
3✔
319
                tx,
3✔
320
                workflowSnapshot.ChildExecutionInfos,
3✔
321
                nil,
3✔
322
                shardID,
3✔
323
                domainID,
3✔
324
                workflowID,
3✔
325
                runID,
3✔
326
                parser); err != nil {
3✔
327
                return err
×
328
        }
×
329

330
        if err := deleteRequestCancelInfoMap(
3✔
331
                ctx,
3✔
332
                tx,
3✔
333
                shardID,
3✔
334
                domainID,
3✔
335
                workflowID,
3✔
336
                runID); err != nil {
3✔
337
                return err
×
338
        }
×
339

340
        if err := updateRequestCancelInfos(
3✔
341
                ctx,
3✔
342
                tx,
3✔
343
                workflowSnapshot.RequestCancelInfos,
3✔
344
                nil,
3✔
345
                shardID,
3✔
346
                domainID,
3✔
347
                workflowID,
3✔
348
                runID,
3✔
349
                parser); err != nil {
3✔
350
                return err
×
351
        }
×
352

353
        if err := deleteSignalInfoMap(
3✔
354
                ctx,
3✔
355
                tx,
3✔
356
                shardID,
3✔
357
                domainID,
3✔
358
                workflowID,
3✔
359
                runID); err != nil {
3✔
360
                return err
×
361
        }
×
362

363
        if err := updateSignalInfos(
3✔
364
                ctx,
3✔
365
                tx,
3✔
366
                workflowSnapshot.SignalInfos,
3✔
367
                nil,
3✔
368
                shardID,
3✔
369
                domainID,
3✔
370
                workflowID,
3✔
371
                runID,
3✔
372
                parser); err != nil {
3✔
373
                return err
×
374
        }
×
375

376
        if err := deleteSignalsRequestedSet(
3✔
377
                ctx,
3✔
378
                tx,
3✔
379
                shardID,
3✔
380
                domainID,
3✔
381
                workflowID,
3✔
382
                runID); err != nil {
3✔
383
                return err
×
384
        }
×
385

386
        if err := updateSignalsRequested(
3✔
387
                ctx,
3✔
388
                tx,
3✔
389
                workflowSnapshot.SignalRequestedIDs,
3✔
390
                nil,
3✔
391
                shardID,
3✔
392
                domainID,
3✔
393
                workflowID,
3✔
394
                runID); err != nil {
3✔
395
                return err
×
396
        }
×
397

398
        return deleteBufferedEvents(
3✔
399
                ctx,
3✔
400
                tx,
3✔
401
                shardID,
3✔
402
                domainID,
3✔
403
                workflowID,
3✔
404
                runID)
3✔
405
}
406

407
func applyWorkflowSnapshotTxAsNew(
408
        ctx context.Context,
409
        tx sqlplugin.Tx,
410
        shardID int,
411
        workflowSnapshot *p.InternalWorkflowSnapshot,
412
        parser serialization.Parser,
413
) error {
423✔
414

423✔
415
        executionInfo := workflowSnapshot.ExecutionInfo
423✔
416
        versionHistories := workflowSnapshot.VersionHistories
423✔
417
        workflowChecksum := workflowSnapshot.ChecksumData
423✔
418
        startVersion := workflowSnapshot.StartVersion
423✔
419
        lastWriteVersion := workflowSnapshot.LastWriteVersion
423✔
420
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
423✔
421
        workflowID := executionInfo.WorkflowID
423✔
422
        runID := serialization.MustParseUUID(executionInfo.RunID)
423✔
423

423✔
424
        if err := createExecution(
423✔
425
                ctx,
423✔
426
                tx,
423✔
427
                executionInfo,
423✔
428
                versionHistories,
423✔
429
                workflowChecksum,
423✔
430
                startVersion,
423✔
431
                lastWriteVersion,
423✔
432
                shardID,
423✔
433
                parser); err != nil {
425✔
434
                return err
2✔
435
        }
2✔
436

437
        if err := applyTasks(
423✔
438
                ctx,
423✔
439
                tx,
423✔
440
                shardID,
423✔
441
                domainID,
423✔
442
                workflowID,
423✔
443
                runID,
423✔
444
                workflowSnapshot.TransferTasks,
423✔
445
                workflowSnapshot.CrossClusterTasks,
423✔
446
                workflowSnapshot.ReplicationTasks,
423✔
447
                workflowSnapshot.TimerTasks,
423✔
448
                parser,
423✔
449
        ); err != nil {
423✔
450
                return err
×
451
        }
×
452

453
        if err := updateActivityInfos(
423✔
454
                ctx,
423✔
455
                tx,
423✔
456
                workflowSnapshot.ActivityInfos,
423✔
457
                nil,
423✔
458
                shardID,
423✔
459
                domainID,
423✔
460
                workflowID,
423✔
461
                runID,
423✔
462
                parser); err != nil {
423✔
463
                return err
×
464
        }
×
465

466
        if err := updateTimerInfos(
423✔
467
                ctx,
423✔
468
                tx,
423✔
469
                workflowSnapshot.TimerInfos,
423✔
470
                nil,
423✔
471
                shardID,
423✔
472
                domainID,
423✔
473
                workflowID,
423✔
474
                runID,
423✔
475
                parser); err != nil {
423✔
476
                return err
×
477
        }
×
478

479
        if err := updateChildExecutionInfos(
423✔
480
                ctx,
423✔
481
                tx,
423✔
482
                workflowSnapshot.ChildExecutionInfos,
423✔
483
                nil,
423✔
484
                shardID,
423✔
485
                domainID,
423✔
486
                workflowID,
423✔
487
                runID,
423✔
488
                parser); err != nil {
423✔
489
                return err
×
490
        }
×
491

492
        if err := updateRequestCancelInfos(
423✔
493
                ctx,
423✔
494
                tx,
423✔
495
                workflowSnapshot.RequestCancelInfos,
423✔
496
                nil,
423✔
497
                shardID,
423✔
498
                domainID,
423✔
499
                workflowID,
423✔
500
                runID,
423✔
501
                parser); err != nil {
423✔
502
                return err
×
503
        }
×
504

505
        if err := updateSignalInfos(
423✔
506
                ctx,
423✔
507
                tx,
423✔
508
                workflowSnapshot.SignalInfos,
423✔
509
                nil,
423✔
510
                shardID,
423✔
511
                domainID,
423✔
512
                workflowID,
423✔
513
                runID,
423✔
514
                parser); err != nil {
423✔
515
                return err
×
516
        }
×
517

518
        return updateSignalsRequested(
423✔
519
                ctx,
423✔
520
                tx,
423✔
521
                workflowSnapshot.SignalRequestedIDs,
423✔
522
                nil,
423✔
523
                shardID,
423✔
524
                domainID,
423✔
525
                workflowID,
423✔
526
                runID)
423✔
527
}
528

529
func applyTasks(
530
        ctx context.Context,
531
        tx sqlplugin.Tx,
532
        shardID int,
533
        domainID serialization.UUID,
534
        workflowID string,
535
        runID serialization.UUID,
536
        transferTasks []p.Task,
537
        crossClusterTasks []p.Task,
538
        replicationTasks []p.Task,
539
        timerTasks []p.Task,
540
        parser serialization.Parser,
541
) error {
3,331✔
542

3,331✔
543
        if err := createTransferTasks(
3,331✔
544
                ctx,
3,331✔
545
                tx,
3,331✔
546
                transferTasks,
3,331✔
547
                shardID,
3,331✔
548
                domainID,
3,331✔
549
                workflowID,
3,331✔
550
                runID,
3,331✔
551
                parser); err != nil {
3,331✔
552
                return err
×
553
        }
×
554

555
        if err := createCrossClusterTasks(
3,331✔
556
                ctx,
3,331✔
557
                tx,
3,331✔
558
                crossClusterTasks,
3,331✔
559
                shardID,
3,331✔
560
                domainID,
3,331✔
561
                workflowID,
3,331✔
562
                runID,
3,331✔
563
                parser); err != nil {
3,331✔
564
                return err
×
565
        }
×
566

567
        if err := createReplicationTasks(
3,331✔
568
                ctx,
3,331✔
569
                tx,
3,331✔
570
                replicationTasks,
3,331✔
571
                shardID,
3,331✔
572
                domainID,
3,331✔
573
                workflowID,
3,331✔
574
                runID,
3,331✔
575
                parser,
3,331✔
576
        ); err != nil {
3,331✔
577
                return err
×
578
        }
×
579

580
        return createTimerTasks(
3,331✔
581
                ctx,
3,331✔
582
                tx,
3,331✔
583
                timerTasks,
3,331✔
584
                shardID,
3,331✔
585
                domainID,
3,331✔
586
                workflowID,
3,331✔
587
                runID,
3,331✔
588
                parser,
3,331✔
589
        )
3,331✔
590
}
591

592
// lockCurrentExecutionIfExists returns current execution or nil if none is found for the workflowID
593
// locking it in the DB
594
func lockCurrentExecutionIfExists(
595
        ctx context.Context,
596
        tx sqlplugin.Tx,
597
        shardID int,
598
        domainID serialization.UUID,
599
        workflowID string,
600
) (*sqlplugin.CurrentExecutionsRow, error) {
340✔
601

340✔
602
        rows, err := tx.LockCurrentExecutionsJoinExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
340✔
603
                ShardID:    int64(shardID),
340✔
604
                DomainID:   domainID,
340✔
605
                WorkflowID: workflowID,
340✔
606
        })
340✔
607
        if err != nil {
342✔
608
                if err != sql.ErrNoRows {
3✔
609
                        return nil, convertCommonErrors(tx, "lockCurrentExecutionIfExists", fmt.Sprintf("Failed to get current_executions row for (shard,domain,workflow) = (%v, %v, %v).", shardID, domainID, workflowID), err)
1✔
610
                }
1✔
611
        }
612
        size := len(rows)
339✔
613
        if size > 1 {
340✔
614
                return nil, &types.InternalServiceError{
1✔
615
                        Message: fmt.Sprintf("lockCurrentExecutionIfExists failed. Multiple current_executions rows for (shard,domain,workflow) = (%v, %v, %v).", shardID, domainID, workflowID),
1✔
616
                }
1✔
617
        }
1✔
618
        if size == 0 {
627✔
619
                return nil, nil
289✔
620
        }
289✔
621
        return &rows[0], nil
51✔
622
}
623

624
func createOrUpdateCurrentExecution(
625
        ctx context.Context,
626
        tx sqlplugin.Tx,
627
        createMode p.CreateWorkflowMode,
628
        shardID int,
629
        domainID serialization.UUID,
630
        workflowID string,
631
        runID serialization.UUID,
632
        state int,
633
        closeStatus int,
634
        createRequestID string,
635
        startVersion int64,
636
        lastWriteVersion int64,
637
) error {
315✔
638

315✔
639
        row := sqlplugin.CurrentExecutionsRow{
315✔
640
                ShardID:          int64(shardID),
315✔
641
                DomainID:         domainID,
315✔
642
                WorkflowID:       workflowID,
315✔
643
                RunID:            runID,
315✔
644
                CreateRequestID:  createRequestID,
315✔
645
                State:            state,
315✔
646
                CloseStatus:      closeStatus,
315✔
647
                StartVersion:     startVersion,
315✔
648
                LastWriteVersion: lastWriteVersion,
315✔
649
        }
315✔
650

315✔
651
        switch createMode {
315✔
652
        case p.CreateWorkflowModeContinueAsNew,
653
                p.CreateWorkflowModeWorkflowIDReuse:
25✔
654
                if err := updateCurrentExecution(
25✔
655
                        ctx,
25✔
656
                        tx,
25✔
657
                        shardID,
25✔
658
                        domainID,
25✔
659
                        workflowID,
25✔
660
                        runID,
25✔
661
                        createRequestID,
25✔
662
                        state,
25✔
663
                        closeStatus,
25✔
664
                        row.StartVersion,
25✔
665
                        row.LastWriteVersion); err != nil {
27✔
666
                        return err
2✔
667
                }
2✔
668
        case p.CreateWorkflowModeBrandNew:
290✔
669
                if _, err := tx.InsertIntoCurrentExecutions(ctx, &row); err != nil {
291✔
670
                        return convertCommonErrors(tx, "createOrUpdateCurrentExecution", "Failed to insert into current_executions table.", err)
1✔
671
                }
1✔
672
        case p.CreateWorkflowModeZombie:
3✔
673
                // noop
674
        default:
1✔
675
                return fmt.Errorf("createOrUpdateCurrentExecution failed. Unknown workflow creation mode: %v", createMode)
1✔
676
        }
677

678
        return nil
311✔
679
}
680

681
func lockAndCheckNextEventID(
682
        ctx context.Context,
683
        tx sqlplugin.Tx,
684
        shardID int,
685
        domainID serialization.UUID,
686
        workflowID string,
687
        runID serialization.UUID,
688
        condition int64,
689
) error {
2,913✔
690

2,913✔
691
        nextEventID, err := lockNextEventID(
2,913✔
692
                ctx,
2,913✔
693
                tx,
2,913✔
694
                shardID,
2,913✔
695
                domainID,
2,913✔
696
                workflowID,
2,913✔
697
                runID,
2,913✔
698
        )
2,913✔
699

2,913✔
700
        if err != nil {
2,914✔
701
                return err
1✔
702
        }
1✔
703
        if *nextEventID != condition {
2,913✔
704
                return &p.ConditionFailedError{
1✔
705
                        Msg: fmt.Sprintf("lockAndCheckNextEventID failed. Next_event_id was %v when it should have been %v.", nextEventID, condition),
1✔
706
                }
1✔
707
        }
1✔
708
        return nil
2,911✔
709
}
710

711
func lockNextEventID(
712
        ctx context.Context,
713
        tx sqlplugin.Tx,
714
        shardID int,
715
        domainID serialization.UUID,
716
        workflowID string,
717
        runID serialization.UUID,
718
) (*int64, error) {
2,913✔
719

2,913✔
720
        nextEventID, err := tx.WriteLockExecutions(ctx, &sqlplugin.ExecutionsFilter{
2,913✔
721
                ShardID:    shardID,
2,913✔
722
                DomainID:   domainID,
2,913✔
723
                WorkflowID: workflowID,
2,913✔
724
                RunID:      runID,
2,913✔
725
        })
2,913✔
726
        if err != nil {
2,914✔
727
                if err == sql.ErrNoRows {
2✔
728
                        return nil, &types.EntityNotExistsError{
1✔
729
                                Message: fmt.Sprintf(
1✔
730
                                        "lockNextEventID failed. Unable to lock executions row with (shard, domain, workflow, run) = (%v,%v,%v,%v) which does not exist.",
1✔
731
                                        shardID,
1✔
732
                                        domainID,
1✔
733
                                        workflowID,
1✔
734
                                        runID,
1✔
735
                                ),
1✔
736
                        }
1✔
737
                }
1✔
738
                return nil, convertCommonErrors(tx, "lockNextEventID", "", err)
×
739
        }
740
        result := int64(nextEventID)
2,912✔
741
        return &result, nil
2,912✔
742
}
743

744
func createCrossClusterTasks(
745
        ctx context.Context,
746
        tx sqlplugin.Tx,
747
        crossClusterTasks []p.Task,
748
        shardID int,
749
        domainID serialization.UUID,
750
        workflowID string,
751
        runID serialization.UUID,
752
        parser serialization.Parser,
753
) error {
3,333✔
754
        if len(crossClusterTasks) == 0 {
6,661✔
755
                return nil
3,328✔
756
        }
3,328✔
757

758
        crossClusterTasksRows := make([]sqlplugin.CrossClusterTasksRow, len(crossClusterTasks))
5✔
759
        for i, task := range crossClusterTasks {
17✔
760
                info := &serialization.CrossClusterTaskInfo{
12✔
761
                        DomainID:            domainID,
12✔
762
                        WorkflowID:          workflowID,
12✔
763
                        RunID:               runID,
12✔
764
                        TaskType:            int16(task.GetType()),
12✔
765
                        TargetDomainID:      domainID,
12✔
766
                        TargetWorkflowID:    p.TransferTaskTransferTargetWorkflowID,
12✔
767
                        TargetRunID:         serialization.UUID(p.CrossClusterTaskDefaultTargetRunID),
12✔
768
                        ScheduleID:          0,
12✔
769
                        Version:             task.GetVersion(),
12✔
770
                        VisibilityTimestamp: task.GetVisibilityTimestamp(),
12✔
771
                }
12✔
772

12✔
773
                crossClusterTasksRows[i].ShardID = shardID
12✔
774
                crossClusterTasksRows[i].TaskID = task.GetTaskID()
12✔
775

12✔
776
                switch task.GetType() {
12✔
777
                case p.CrossClusterTaskTypeStartChildExecution:
8✔
778
                        crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterStartChildExecutionTask).TargetCluster
8✔
779
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.CrossClusterStartChildExecutionTask).TargetDomainID)
8✔
780
                        info.TargetWorkflowID = task.(*p.CrossClusterStartChildExecutionTask).TargetWorkflowID
8✔
781
                        info.ScheduleID = task.(*p.CrossClusterStartChildExecutionTask).InitiatedID
8✔
782

783
                case p.CrossClusterTaskTypeCancelExecution:
1✔
784
                        crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterCancelExecutionTask).TargetCluster
1✔
785
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.CrossClusterCancelExecutionTask).TargetDomainID)
1✔
786
                        info.TargetWorkflowID = task.(*p.CrossClusterCancelExecutionTask).TargetWorkflowID
1✔
787
                        if targetRunID := task.(*p.CrossClusterCancelExecutionTask).TargetRunID; targetRunID != "" {
2✔
788
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
1✔
789
                        }
1✔
790
                        info.TargetChildWorkflowOnly = task.(*p.CrossClusterCancelExecutionTask).TargetChildWorkflowOnly
1✔
791
                        info.ScheduleID = task.(*p.CrossClusterCancelExecutionTask).InitiatedID
1✔
792

793
                case p.CrossClusterTaskTypeSignalExecution:
1✔
794
                        crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterSignalExecutionTask).TargetCluster
1✔
795
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.CrossClusterSignalExecutionTask).TargetDomainID)
1✔
796
                        info.TargetWorkflowID = task.(*p.CrossClusterSignalExecutionTask).TargetWorkflowID
1✔
797
                        if targetRunID := task.(*p.CrossClusterSignalExecutionTask).TargetRunID; targetRunID != "" {
2✔
798
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
1✔
799
                        }
1✔
800
                        info.TargetChildWorkflowOnly = task.(*p.CrossClusterSignalExecutionTask).TargetChildWorkflowOnly
1✔
801
                        info.ScheduleID = task.(*p.CrossClusterSignalExecutionTask).InitiatedID
1✔
802

803
                case p.CrossClusterTaskTypeRecordChildExeuctionCompleted:
1✔
804
                        crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterRecordChildExecutionCompletedTask).TargetCluster
1✔
805
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.CrossClusterRecordChildExecutionCompletedTask).TargetDomainID)
1✔
806
                        info.TargetWorkflowID = task.(*p.CrossClusterRecordChildExecutionCompletedTask).TargetWorkflowID
1✔
807
                        if targetRunID := task.(*p.CrossClusterRecordChildExecutionCompletedTask).TargetRunID; targetRunID != "" {
2✔
808
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
1✔
809
                        }
1✔
810

811
                case p.CrossClusterTaskTypeApplyParentClosePolicy:
1✔
812
                        crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterApplyParentClosePolicyTask).TargetCluster
1✔
813
                        for domainID := range task.(*p.CrossClusterApplyParentClosePolicyTask).TargetDomainIDs {
2✔
814
                                info.TargetDomainIDs = append(info.TargetDomainIDs, serialization.MustParseUUID(domainID))
1✔
815
                        }
1✔
816

817
                default:
×
818
                        return &types.InternalServiceError{
×
819
                                Message: fmt.Sprintf("Unknown cross-cluster task type: %v", task.GetType()),
×
820
                        }
×
821
                }
822

823
                blob, err := parser.CrossClusterTaskInfoToBlob(info)
12✔
824
                if err != nil {
12✔
825
                        return err
×
826
                }
×
827
                crossClusterTasksRows[i].Data = blob.Data
12✔
828
                crossClusterTasksRows[i].DataEncoding = string(blob.Encoding)
12✔
829
        }
830

831
        result, err := tx.InsertIntoCrossClusterTasks(ctx, crossClusterTasksRows)
5✔
832
        if err != nil {
6✔
833
                return convertCommonErrors(tx, "createCrossClusterTasks", "", err)
1✔
834
        }
1✔
835

836
        rowsAffected, err := result.RowsAffected()
4✔
837
        if err != nil {
4✔
838
                return &types.InternalServiceError{
×
839
                        Message: fmt.Sprintf("createTransferTasks failed. Could not verify number of rows inserted. Error: %v", err),
×
840
                }
×
841
        }
×
842

843
        if int(rowsAffected) != len(crossClusterTasks) {
4✔
844
                return &types.InternalServiceError{
×
845
                        Message: fmt.Sprintf("createCrossClusterTasks failed. Inserted %v instead of %v rows into transfer_tasks. Error: %v", rowsAffected, len(crossClusterTasks), err),
×
846
                }
×
847
        }
×
848

849
        return nil
4✔
850
}
851

852
func createTransferTasks(
853
        ctx context.Context,
854
        tx sqlplugin.Tx,
855
        transferTasks []p.Task,
856
        shardID int,
857
        domainID serialization.UUID,
858
        workflowID string,
859
        runID serialization.UUID,
860
        parser serialization.Parser,
861
) error {
3,333✔
862

3,333✔
863
        if len(transferTasks) == 0 {
5,168✔
864
                return nil
1,835✔
865
        }
1,835✔
866

867
        transferTasksRows := make([]sqlplugin.TransferTasksRow, len(transferTasks))
1,500✔
868
        for i, task := range transferTasks {
3,423✔
869
                info := &serialization.TransferTaskInfo{
1,923✔
870
                        DomainID:            domainID,
1,923✔
871
                        WorkflowID:          workflowID,
1,923✔
872
                        RunID:               runID,
1,923✔
873
                        TaskType:            int16(task.GetType()),
1,923✔
874
                        TargetDomainID:      domainID,
1,923✔
875
                        TargetWorkflowID:    p.TransferTaskTransferTargetWorkflowID,
1,923✔
876
                        ScheduleID:          0,
1,923✔
877
                        Version:             task.GetVersion(),
1,923✔
878
                        VisibilityTimestamp: task.GetVisibilityTimestamp(),
1,923✔
879
                }
1,923✔
880

1,923✔
881
                transferTasksRows[i].ShardID = shardID
1,923✔
882
                transferTasksRows[i].TaskID = task.GetTaskID()
1,923✔
883

1,923✔
884
                switch task.GetType() {
1,923✔
885
                case p.TransferTaskTypeActivityTask:
273✔
886
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.ActivityTask).DomainID)
273✔
887
                        info.TaskList = task.(*p.ActivityTask).TaskList
273✔
888
                        info.ScheduleID = task.(*p.ActivityTask).ScheduleID
273✔
889

890
                case p.TransferTaskTypeDecisionTask:
884✔
891
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.DecisionTask).DomainID)
884✔
892
                        info.TaskList = task.(*p.DecisionTask).TaskList
884✔
893
                        info.ScheduleID = task.(*p.DecisionTask).ScheduleID
884✔
894

895
                case p.TransferTaskTypeCancelExecution:
7✔
896
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.CancelExecutionTask).TargetDomainID)
7✔
897
                        info.TargetWorkflowID = task.(*p.CancelExecutionTask).TargetWorkflowID
7✔
898
                        if targetRunID := task.(*p.CancelExecutionTask).TargetRunID; targetRunID != "" {
14✔
899
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
7✔
900
                        }
7✔
901
                        info.TargetChildWorkflowOnly = task.(*p.CancelExecutionTask).TargetChildWorkflowOnly
7✔
902
                        info.ScheduleID = task.(*p.CancelExecutionTask).InitiatedID
7✔
903

904
                case p.TransferTaskTypeSignalExecution:
11✔
905
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.SignalExecutionTask).TargetDomainID)
11✔
906
                        info.TargetWorkflowID = task.(*p.SignalExecutionTask).TargetWorkflowID
11✔
907
                        if targetRunID := task.(*p.SignalExecutionTask).TargetRunID; targetRunID != "" {
20✔
908
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
9✔
909
                        }
9✔
910
                        info.TargetChildWorkflowOnly = task.(*p.SignalExecutionTask).TargetChildWorkflowOnly
11✔
911
                        info.ScheduleID = task.(*p.SignalExecutionTask).InitiatedID
11✔
912

913
                case p.TransferTaskTypeStartChildExecution:
15✔
914
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.StartChildExecutionTask).TargetDomainID)
15✔
915
                        info.TargetWorkflowID = task.(*p.StartChildExecutionTask).TargetWorkflowID
15✔
916
                        info.ScheduleID = task.(*p.StartChildExecutionTask).InitiatedID
15✔
917

918
                case p.TransferTaskTypeRecordChildExecutionCompleted:
1✔
919
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.RecordChildExecutionCompletedTask).TargetDomainID)
1✔
920
                        info.TargetWorkflowID = task.(*p.RecordChildExecutionCompletedTask).TargetWorkflowID
1✔
921
                        if targetRunID := task.(*p.RecordChildExecutionCompletedTask).TargetRunID; targetRunID != "" {
2✔
922
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
1✔
923
                        }
1✔
924

925
                case p.TransferTaskTypeApplyParentClosePolicy:
1✔
926
                        for targetDomainID := range task.(*p.ApplyParentClosePolicyTask).TargetDomainIDs {
2✔
927
                                info.TargetDomainIDs = append(info.TargetDomainIDs, serialization.MustParseUUID(targetDomainID))
1✔
928
                        }
1✔
929

930
                case p.TransferTaskTypeCloseExecution,
931
                        p.TransferTaskTypeRecordWorkflowStarted,
932
                        p.TransferTaskTypeResetWorkflow,
933
                        p.TransferTaskTypeUpsertWorkflowSearchAttributes,
934
                        p.TransferTaskTypeRecordWorkflowClosed:
741✔
935
                        // No explicit property needs to be set
936

937
                default:
×
938
                        return &types.InternalServiceError{
×
939
                                Message: fmt.Sprintf("createTransferTasks failed. Unknown transfer type: %v", task.GetType()),
×
940
                        }
×
941
                }
942

943
                blob, err := parser.TransferTaskInfoToBlob(info)
1,923✔
944
                if err != nil {
1,923✔
945
                        return err
×
946
                }
×
947
                transferTasksRows[i].Data = blob.Data
1,923✔
948
                transferTasksRows[i].DataEncoding = string(blob.Encoding)
1,923✔
949
        }
950

951
        result, err := tx.InsertIntoTransferTasks(ctx, transferTasksRows)
1,500✔
952
        if err != nil {
1,501✔
953
                return convertCommonErrors(tx, "createTransferTasks", "", err)
1✔
954
        }
1✔
955

956
        rowsAffected, err := result.RowsAffected()
1,499✔
957
        if err != nil {
1,499✔
958
                return &types.InternalServiceError{
×
959
                        Message: fmt.Sprintf("createTransferTasks failed. Could not verify number of rows inserted. Error: %v", err),
×
960
                }
×
961
        }
×
962

963
        if int(rowsAffected) != len(transferTasks) {
1,499✔
964
                return &types.InternalServiceError{
×
965
                        Message: fmt.Sprintf("createTransferTasks failed. Inserted %v instead of %v rows into transfer_tasks. Error: %v", rowsAffected, len(transferTasks), err),
×
966
                }
×
967
        }
×
968

969
        return nil
1,499✔
970
}
971

972
func createReplicationTasks(
973
        ctx context.Context,
974
        tx sqlplugin.Tx,
975
        replicationTasks []p.Task,
976
        shardID int,
977
        domainID serialization.UUID,
978
        workflowID string,
979
        runID serialization.UUID,
980
        parser serialization.Parser,
981
) error {
3,332✔
982

3,332✔
983
        if len(replicationTasks) == 0 {
6,660✔
984
                return nil
3,328✔
985
        }
3,328✔
986
        replicationTasksRows := make([]sqlplugin.ReplicationTasksRow, len(replicationTasks))
4✔
987

4✔
988
        for i, task := range replicationTasks {
19✔
989

15✔
990
                firstEventID := common.EmptyEventID
15✔
991
                nextEventID := common.EmptyEventID
15✔
992
                version := common.EmptyVersion
15✔
993
                activityScheduleID := common.EmptyEventID
15✔
994
                var branchToken, newRunBranchToken []byte
15✔
995

15✔
996
                switch task.GetType() {
15✔
997
                case p.ReplicationTaskTypeHistory:
13✔
998
                        historyReplicationTask, ok := task.(*p.HistoryReplicationTask)
13✔
999
                        if !ok {
13✔
1000
                                return &types.InternalServiceError{
×
1001
                                        Message: fmt.Sprintf("createReplicationTasks failed. Failed to cast %v to HistoryReplicationTask", task),
×
1002
                                }
×
1003
                        }
×
1004
                        firstEventID = historyReplicationTask.FirstEventID
13✔
1005
                        nextEventID = historyReplicationTask.NextEventID
13✔
1006
                        version = task.GetVersion()
13✔
1007
                        branchToken = historyReplicationTask.BranchToken
13✔
1008
                        newRunBranchToken = historyReplicationTask.NewRunBranchToken
13✔
1009

1010
                case p.ReplicationTaskTypeSyncActivity:
1✔
1011
                        version = task.GetVersion()
1✔
1012
                        activityScheduleID = task.(*p.SyncActivityTask).ScheduledID
1✔
1013

1014
                case p.ReplicationTaskTypeFailoverMarker:
1✔
1015
                        version = task.GetVersion()
1✔
1016

1017
                default:
×
1018
                        return &types.InternalServiceError{
×
1019
                                Message: fmt.Sprintf("Unknown replication task: %v", task.GetType()),
×
1020
                        }
×
1021
                }
1022

1023
                blob, err := parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{
15✔
1024
                        DomainID:                domainID,
15✔
1025
                        WorkflowID:              workflowID,
15✔
1026
                        RunID:                   runID,
15✔
1027
                        TaskType:                int16(task.GetType()),
15✔
1028
                        FirstEventID:            firstEventID,
15✔
1029
                        NextEventID:             nextEventID,
15✔
1030
                        Version:                 version,
15✔
1031
                        ScheduledID:             activityScheduleID,
15✔
1032
                        EventStoreVersion:       p.EventStoreVersion,
15✔
1033
                        NewRunEventStoreVersion: p.EventStoreVersion,
15✔
1034
                        BranchToken:             branchToken,
15✔
1035
                        NewRunBranchToken:       newRunBranchToken,
15✔
1036
                        CreationTimestamp:       task.GetVisibilityTimestamp(),
15✔
1037
                })
15✔
1038
                if err != nil {
15✔
1039
                        return err
×
1040
                }
×
1041
                replicationTasksRows[i].ShardID = shardID
15✔
1042
                replicationTasksRows[i].TaskID = task.GetTaskID()
15✔
1043
                replicationTasksRows[i].Data = blob.Data
15✔
1044
                replicationTasksRows[i].DataEncoding = string(blob.Encoding)
15✔
1045
        }
1046

1047
        result, err := tx.InsertIntoReplicationTasks(ctx, replicationTasksRows)
4✔
1048
        if err != nil {
4✔
1049
                return convertCommonErrors(tx, "createReplicationTasks", "", err)
×
1050
        }
×
1051

1052
        rowsAffected, err := result.RowsAffected()
4✔
1053
        if err != nil {
4✔
1054
                return &types.InternalServiceError{
×
1055
                        Message: fmt.Sprintf("createReplicationTasks failed. Could not verify number of rows inserted. Error: %v", err),
×
1056
                }
×
1057
        }
×
1058

1059
        if int(rowsAffected) != len(replicationTasks) {
4✔
1060
                return &types.InternalServiceError{
×
1061
                        Message: fmt.Sprintf("createReplicationTasks failed. Inserted %v instead of %v rows into transfer_tasks. Error: %v", rowsAffected, len(replicationTasks), err),
×
1062
                }
×
1063
        }
×
1064

1065
        return nil
4✔
1066
}
1067

1068
func createTimerTasks(
1069
        ctx context.Context,
1070
        tx sqlplugin.Tx,
1071
        timerTasks []p.Task,
1072
        shardID int,
1073
        domainID serialization.UUID,
1074
        workflowID string,
1075
        runID serialization.UUID,
1076
        parser serialization.Parser,
1077
) error {
3,333✔
1078

3,333✔
1079
        if len(timerTasks) == 0 {
4,635✔
1080
                return nil
1,302✔
1081
        }
1,302✔
1082

1083
        timerTasksRows := make([]sqlplugin.TimerTasksRow, len(timerTasks))
2,033✔
1084

2,033✔
1085
        for i, task := range timerTasks {
4,140✔
1086
                info := &serialization.TimerTaskInfo{
2,107✔
1087
                        DomainID:        domainID,
2,107✔
1088
                        WorkflowID:      workflowID,
2,107✔
1089
                        RunID:           runID,
2,107✔
1090
                        TaskType:        int16(task.GetType()),
2,107✔
1091
                        Version:         task.GetVersion(),
2,107✔
1092
                        EventID:         common.EmptyEventID,
2,107✔
1093
                        ScheduleAttempt: 0,
2,107✔
1094
                }
2,107✔
1095

2,107✔
1096
                switch t := task.(type) {
2,107✔
1097
                case *p.DecisionTimeoutTask:
812✔
1098
                        info.EventID = t.EventID
812✔
1099
                        info.TimeoutType = common.Int16Ptr(int16(t.TimeoutType))
812✔
1100
                        info.ScheduleAttempt = t.ScheduleAttempt
812✔
1101

1102
                case *p.ActivityTimeoutTask:
490✔
1103
                        info.EventID = t.EventID
490✔
1104
                        info.TimeoutType = common.Int16Ptr(int16(t.TimeoutType))
490✔
1105
                        info.ScheduleAttempt = t.Attempt
490✔
1106

1107
                case *p.UserTimerTask:
23✔
1108
                        info.EventID = t.EventID
23✔
1109

1110
                case *p.ActivityRetryTimerTask:
7✔
1111
                        info.EventID = t.EventID
7✔
1112
                        info.ScheduleAttempt = int64(t.Attempt)
7✔
1113

1114
                case *p.WorkflowBackoffTimerTask:
47✔
1115
                        info.EventID = t.EventID
47✔
1116
                        info.TimeoutType = common.Int16Ptr(int16(t.TimeoutType))
47✔
1117

1118
                case *p.WorkflowTimeoutTask:
423✔
1119
                        // noop
1120

1121
                case *p.DeleteHistoryEventTask:
315✔
1122
                        // noop
1123

1124
                default:
×
1125
                        return &types.InternalServiceError{
×
1126
                                Message: fmt.Sprintf("createTimerTasks failed. Unknown timer task: %v", task.GetType()),
×
1127
                        }
×
1128
                }
1129

1130
                blob, err := parser.TimerTaskInfoToBlob(info)
2,107✔
1131
                if err != nil {
2,107✔
1132
                        return err
×
1133
                }
×
1134

1135
                timerTasksRows[i].ShardID = shardID
2,107✔
1136
                timerTasksRows[i].VisibilityTimestamp = task.GetVisibilityTimestamp()
2,107✔
1137
                timerTasksRows[i].TaskID = task.GetTaskID()
2,107✔
1138
                timerTasksRows[i].Data = blob.Data
2,107✔
1139
                timerTasksRows[i].DataEncoding = string(blob.Encoding)
2,107✔
1140
        }
1141

1142
        result, err := tx.InsertIntoTimerTasks(ctx, timerTasksRows)
2,033✔
1143
        if err != nil {
2,034✔
1144
                return convertCommonErrors(tx, "createTimerTasks", "", err)
1✔
1145
        }
1✔
1146
        rowsAffected, err := result.RowsAffected()
2,032✔
1147
        if err != nil {
2,032✔
1148
                return &types.InternalServiceError{
×
1149
                        Message: fmt.Sprintf("createTimerTasks failed. Could not verify number of rows inserted. Error: %v", err),
×
1150
                }
×
1151
        }
×
1152

1153
        if int(rowsAffected) != len(timerTasks) {
2,032✔
1154
                return &types.InternalServiceError{
×
1155
                        Message: fmt.Sprintf("createTimerTasks failed. Inserted %v instead of %v rows into timer_tasks.", rowsAffected, len(timerTasks)),
×
1156
                }
×
1157
        }
×
1158

1159
        return nil
2,032✔
1160
}
1161

1162
func assertNotCurrentExecution(
1163
        ctx context.Context,
1164
        tx sqlplugin.Tx,
1165
        shardID int,
1166
        domainID serialization.UUID,
1167
        workflowID string,
1168
        runID serialization.UUID,
1169
) error {
6✔
1170
        currentRow, err := tx.LockCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
6✔
1171
                ShardID:    int64(shardID),
6✔
1172
                DomainID:   domainID,
6✔
1173
                WorkflowID: workflowID,
6✔
1174
        })
6✔
1175
        if err != nil {
8✔
1176
                if err == sql.ErrNoRows {
3✔
1177
                        // allow bypassing no current record
1✔
1178
                        return nil
1✔
1179
                }
1✔
1180
                return convertCommonErrors(tx, "assertCurrentExecution", "Unable to load current record.", err)
1✔
1181
        }
1182
        return assertRunIDMismatch(runID, currentRow.RunID)
4✔
1183
}
1184

1185
func assertRunIDAndUpdateCurrentExecution(
1186
        ctx context.Context,
1187
        tx sqlplugin.Tx,
1188
        shardID int,
1189
        domainID serialization.UUID,
1190
        workflowID string,
1191
        newRunID serialization.UUID,
1192
        previousRunID serialization.UUID,
1193
        createRequestID string,
1194
        state int,
1195
        closeStatus int,
1196
        startVersion int64,
1197
        lastWriteVersion int64,
1198
) error {
2,912✔
1199

2,912✔
1200
        assertFn := func(currentRow *sqlplugin.CurrentExecutionsRow) error {
5,823✔
1201
                if !bytes.Equal(currentRow.RunID, previousRunID) {
2,912✔
1202
                        return &p.ConditionFailedError{Msg: fmt.Sprintf(
1✔
1203
                                "assertRunIDAndUpdateCurrentExecution failed. Current run ID was %v, expected %v",
1✔
1204
                                currentRow.RunID,
1✔
1205
                                previousRunID,
1✔
1206
                        )}
1✔
1207
                }
1✔
1208
                return nil
2,910✔
1209
        }
1210
        if err := assertCurrentExecution(ctx, tx, shardID, domainID, workflowID, assertFn); err != nil {
2,914✔
1211
                return err
2✔
1212
        }
2✔
1213

1214
        return updateCurrentExecution(ctx, tx, shardID, domainID, workflowID, newRunID, createRequestID, state, closeStatus, startVersion, lastWriteVersion)
2,910✔
1215
}
1216

1217
func assertCurrentExecution(
1218
        ctx context.Context,
1219
        tx sqlplugin.Tx,
1220
        shardID int,
1221
        domainID serialization.UUID,
1222
        workflowID string,
1223
        assertFn func(currentRow *sqlplugin.CurrentExecutionsRow) error,
1224
) error {
2,912✔
1225

2,912✔
1226
        currentRow, err := tx.LockCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
2,912✔
1227
                ShardID:    int64(shardID),
2,912✔
1228
                DomainID:   domainID,
2,912✔
1229
                WorkflowID: workflowID,
2,912✔
1230
        })
2,912✔
1231
        if err != nil {
2,913✔
1232
                return convertCommonErrors(tx, "assertCurrentExecution", "Unable to load current record.", err)
1✔
1233
        }
1✔
1234
        return assertFn(currentRow)
2,911✔
1235
}
1236

1237
func assertRunIDMismatch(runID serialization.UUID, currentRunID serialization.UUID) error {
6✔
1238
        // zombie workflow creation with existence of current record, this is a noop
6✔
1239
        if bytes.Equal(currentRunID, runID) {
8✔
1240
                return &p.ConditionFailedError{Msg: fmt.Sprintf(
2✔
1241
                        "assertRunIDMismatch failed. Current run ID was %v, input %v",
2✔
1242
                        currentRunID,
2✔
1243
                        runID,
2✔
1244
                )}
2✔
1245
        }
2✔
1246
        return nil
4✔
1247
}
1248

1249
func updateCurrentExecution(
1250
        ctx context.Context,
1251
        tx sqlplugin.Tx,
1252
        shardID int,
1253
        domainID serialization.UUID,
1254
        workflowID string,
1255
        runID serialization.UUID,
1256
        createRequestID string,
1257
        state int,
1258
        closeStatus int,
1259
        startVersion int64,
1260
        lastWriteVersion int64,
1261
) error {
2,933✔
1262

2,933✔
1263
        result, err := tx.UpdateCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsRow{
2,933✔
1264
                ShardID:          int64(shardID),
2,933✔
1265
                DomainID:         domainID,
2,933✔
1266
                WorkflowID:       workflowID,
2,933✔
1267
                RunID:            runID,
2,933✔
1268
                CreateRequestID:  createRequestID,
2,933✔
1269
                State:            state,
2,933✔
1270
                CloseStatus:      closeStatus,
2,933✔
1271
                StartVersion:     startVersion,
2,933✔
1272
                LastWriteVersion: lastWriteVersion,
2,933✔
1273
        })
2,933✔
1274
        if err != nil {
2,935✔
1275
                return convertCommonErrors(tx, "updateCurrentExecution", "", err)
2✔
1276
        }
2✔
1277
        rowsAffected, err := result.RowsAffected()
2,931✔
1278
        if err != nil {
2,931✔
1279
                return &types.InternalServiceError{
×
1280
                        Message: fmt.Sprintf("updateCurrentExecution failed. Failed to check number of rows updated in current_executions table. Error: %v", err),
×
1281
                }
×
1282
        }
×
1283
        if rowsAffected != 1 {
2,932✔
1284
                return &types.InternalServiceError{
1✔
1285
                        Message: fmt.Sprintf("updateCurrentExecution failed. %v rows of current_executions updated instead of 1.", rowsAffected),
1✔
1286
                }
1✔
1287
        }
1✔
1288
        return nil
2,930✔
1289
}
1290

1291
func buildExecutionRow(
1292
        executionInfo *p.InternalWorkflowExecutionInfo,
1293
        versionHistories *p.DataBlob,
1294
        workflowChecksum *p.DataBlob,
1295
        startVersion int64,
1296
        lastWriteVersion int64,
1297
        shardID int,
1298
        parser serialization.Parser,
1299
) (row *sqlplugin.ExecutionsRow, err error) {
3,335✔
1300

3,335✔
1301
        info := serialization.FromInternalWorkflowExecutionInfo(executionInfo)
3,335✔
1302

3,335✔
1303
        info.StartVersion = startVersion
3,335✔
1304
        if versionHistories == nil {
3,338✔
1305
                // this is allowed
3✔
1306
        } else {
3,335✔
1307
                info.VersionHistories = versionHistories.Data
3,332✔
1308
                info.VersionHistoriesEncoding = string(versionHistories.GetEncoding())
3,332✔
1309
        }
3,332✔
1310
        if workflowChecksum != nil {
3,335✔
UNCOV
1311
                info.Checksum = workflowChecksum.Data
×
UNCOV
1312
                info.ChecksumEncoding = string(workflowChecksum.GetEncoding())
×
UNCOV
1313
        }
×
1314

1315
        blob, err := parser.WorkflowExecutionInfoToBlob(info)
3,335✔
1316
        if err != nil {
3,335✔
1317
                return nil, err
×
1318
        }
×
1319

1320
        return &sqlplugin.ExecutionsRow{
3,335✔
1321
                ShardID:          shardID,
3,335✔
1322
                DomainID:         serialization.MustParseUUID(executionInfo.DomainID),
3,335✔
1323
                WorkflowID:       executionInfo.WorkflowID,
3,335✔
1324
                RunID:            serialization.MustParseUUID(executionInfo.RunID),
3,335✔
1325
                NextEventID:      int64(executionInfo.NextEventID),
3,335✔
1326
                LastWriteVersion: lastWriteVersion,
3,335✔
1327
                Data:             blob.Data,
3,335✔
1328
                DataEncoding:     string(blob.Encoding),
3,335✔
1329
        }, nil
3,335✔
1330
}
1331

1332
func createExecution(
1333
        ctx context.Context,
1334
        tx sqlplugin.Tx,
1335
        executionInfo *p.InternalWorkflowExecutionInfo,
1336
        versionHistories *p.DataBlob,
1337
        workflowChecksum *p.DataBlob,
1338
        startVersion int64,
1339
        lastWriteVersion int64,
1340
        shardID int,
1341
        parser serialization.Parser,
1342
) error {
425✔
1343

425✔
1344
        // validate workflow state & close status
425✔
1345
        if err := p.ValidateCreateWorkflowStateCloseStatus(
425✔
1346
                executionInfo.State,
425✔
1347
                executionInfo.CloseStatus); err != nil {
425✔
1348
                return err
×
1349
        }
×
1350

1351
        now := time.Now()
425✔
1352
        // TODO: this case seems to be always false
425✔
1353
        if executionInfo.StartTimestamp.IsZero() {
428✔
1354
                executionInfo.StartTimestamp = now
3✔
1355
        }
3✔
1356

1357
        row, err := buildExecutionRow(
425✔
1358
                executionInfo,
425✔
1359
                versionHistories,
425✔
1360
                workflowChecksum,
425✔
1361
                startVersion,
425✔
1362
                lastWriteVersion,
425✔
1363
                shardID,
425✔
1364
                parser,
425✔
1365
        )
425✔
1366
        if err != nil {
425✔
1367
                return err
×
1368
        }
×
1369
        result, err := tx.InsertIntoExecutions(ctx, row)
425✔
1370
        if err != nil {
428✔
1371
                if tx.IsDupEntryError(err) {
6✔
1372
                        return &p.WorkflowExecutionAlreadyStartedError{
3✔
1373
                                Msg:              fmt.Sprintf("Workflow execution already running. WorkflowId: %v", executionInfo.WorkflowID),
3✔
1374
                                StartRequestID:   executionInfo.CreateRequestID,
3✔
1375
                                RunID:            executionInfo.RunID,
3✔
1376
                                State:            executionInfo.State,
3✔
1377
                                CloseStatus:      executionInfo.CloseStatus,
3✔
1378
                                LastWriteVersion: row.LastWriteVersion,
3✔
1379
                        }
3✔
1380
                }
3✔
1381
                return convertCommonErrors(tx, "createExecution", "", err)
×
1382
        }
1383
        rowsAffected, err := result.RowsAffected()
424✔
1384
        if err != nil {
424✔
1385
                return &types.InternalServiceError{
×
1386
                        Message: fmt.Sprintf("createExecution failed. Failed to verify number of rows affected. Erorr: %v", err),
×
1387
                }
×
1388
        }
×
1389
        if rowsAffected != 1 {
424✔
1390
                return &types.EntityNotExistsError{
×
1391
                        Message: fmt.Sprintf("createExecution failed. Affected %v rows updated instead of 1.", rowsAffected),
×
1392
                }
×
1393
        }
×
1394

1395
        return nil
424✔
1396
}
1397

1398
func updateExecution(
1399
        ctx context.Context,
1400
        tx sqlplugin.Tx,
1401
        executionInfo *p.InternalWorkflowExecutionInfo,
1402
        versionHistories *p.DataBlob,
1403
        workflowChecksum *p.DataBlob,
1404
        startVersion int64,
1405
        lastWriteVersion int64,
1406
        shardID int,
1407
        parser serialization.Parser,
1408
) error {
2,912✔
1409

2,912✔
1410
        // validate workflow state & close status
2,912✔
1411
        if err := p.ValidateUpdateWorkflowStateCloseStatus(
2,912✔
1412
                executionInfo.State,
2,912✔
1413
                executionInfo.CloseStatus); err != nil {
2,912✔
1414
                return err
×
1415
        }
×
1416

1417
        row, err := buildExecutionRow(
2,912✔
1418
                executionInfo,
2,912✔
1419
                versionHistories,
2,912✔
1420
                workflowChecksum,
2,912✔
1421
                startVersion,
2,912✔
1422
                lastWriteVersion,
2,912✔
1423
                shardID,
2,912✔
1424
                parser,
2,912✔
1425
        )
2,912✔
1426
        if err != nil {
2,912✔
1427
                return err
×
1428
        }
×
1429
        result, err := tx.UpdateExecutions(ctx, row)
2,912✔
1430
        if err != nil {
2,913✔
1431
                return convertCommonErrors(tx, "updateExecution", "", err)
1✔
1432
        }
1✔
1433
        rowsAffected, err := result.RowsAffected()
2,911✔
1434
        if err != nil {
2,911✔
1435
                return &types.InternalServiceError{
×
1436
                        Message: fmt.Sprintf("updateExecution failed. Failed to verify number of rows affected. Erorr: %v", err),
×
1437
                }
×
1438
        }
×
1439
        if rowsAffected != 1 {
2,911✔
1440
                return &types.EntityNotExistsError{
×
1441
                        Message: fmt.Sprintf("updateExecution failed. Affected %v rows updated instead of 1.", rowsAffected),
×
1442
                }
×
1443
        }
×
1444

1445
        return nil
2,911✔
1446
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc