• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e1aca-4829-4e25-951b-23cb08010d99

07 Mar 2024 09:20PM UTC coverage: 63.21% (-0.7%) from 63.932%
018e1aca-4829-4e25-951b-23cb08010d99

push

buildkite

web-flow
Add unit tests for common/persistence/sql/factory.go (#5751)

92665 of 146599 relevant lines covered (63.21%)

2349.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.01
/common/persistence/sql/sql_execution_store_util.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package sql
23

24
import (
25
        "bytes"
26
        "context"
27
        "database/sql"
28
        "fmt"
29
        "runtime/debug"
30
        "time"
31

32
        "golang.org/x/sync/errgroup"
33

34
        "github.com/uber/cadence/common"
35
        p "github.com/uber/cadence/common/persistence"
36
        "github.com/uber/cadence/common/persistence/serialization"
37
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
38
        "github.com/uber/cadence/common/types"
39
)
40

41
func applyWorkflowMutationTx(
42
        ctx context.Context,
43
        tx sqlplugin.Tx,
44
        shardID int,
45
        workflowMutation *p.InternalWorkflowMutation,
46
        parser serialization.Parser,
47
) error {
2,898✔
48

2,898✔
49
        executionInfo := workflowMutation.ExecutionInfo
2,898✔
50
        versionHistories := workflowMutation.VersionHistories
2,898✔
51
        workflowChecksum := workflowMutation.ChecksumData
2,898✔
52
        startVersion := workflowMutation.StartVersion
2,898✔
53
        lastWriteVersion := workflowMutation.LastWriteVersion
2,898✔
54
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
2,898✔
55
        workflowID := executionInfo.WorkflowID
2,898✔
56
        runID := serialization.MustParseUUID(executionInfo.RunID)
2,898✔
57

2,898✔
58
        // TODO Remove me if UPDATE holds the lock to the end of a transaction
2,898✔
59
        if err := lockAndCheckNextEventID(
2,898✔
60
                ctx,
2,898✔
61
                tx,
2,898✔
62
                shardID,
2,898✔
63
                domainID,
2,898✔
64
                workflowID,
2,898✔
65
                runID,
2,898✔
66
                workflowMutation.Condition); err != nil {
2,898✔
67
                return err
×
68
        }
×
69

70
        if err := updateExecution(
2,898✔
71
                ctx,
2,898✔
72
                tx,
2,898✔
73
                executionInfo,
2,898✔
74
                versionHistories,
2,898✔
75
                workflowChecksum,
2,898✔
76
                startVersion,
2,898✔
77
                lastWriteVersion,
2,898✔
78
                shardID,
2,898✔
79
                parser); err != nil {
2,898✔
80
                return err
×
81
        }
×
82

83
        if err := applyTasks(
2,898✔
84
                ctx,
2,898✔
85
                tx,
2,898✔
86
                shardID,
2,898✔
87
                domainID,
2,898✔
88
                workflowID,
2,898✔
89
                runID,
2,898✔
90
                workflowMutation.TransferTasks,
2,898✔
91
                workflowMutation.CrossClusterTasks,
2,898✔
92
                workflowMutation.ReplicationTasks,
2,898✔
93
                workflowMutation.TimerTasks,
2,898✔
94
                parser,
2,898✔
95
        ); err != nil {
2,898✔
96
                return err
×
97
        }
×
98

99
        if err := updateActivityInfos(
2,898✔
100
                ctx,
2,898✔
101
                tx,
2,898✔
102
                workflowMutation.UpsertActivityInfos,
2,898✔
103
                workflowMutation.DeleteActivityInfos,
2,898✔
104
                shardID,
2,898✔
105
                domainID,
2,898✔
106
                workflowID,
2,898✔
107
                runID,
2,898✔
108
                parser,
2,898✔
109
        ); err != nil {
2,898✔
110
                return err
×
111
        }
×
112

113
        if err := updateTimerInfos(
2,898✔
114
                ctx,
2,898✔
115
                tx,
2,898✔
116
                workflowMutation.UpsertTimerInfos,
2,898✔
117
                workflowMutation.DeleteTimerInfos,
2,898✔
118
                shardID,
2,898✔
119
                domainID,
2,898✔
120
                workflowID,
2,898✔
121
                runID,
2,898✔
122
                parser,
2,898✔
123
        ); err != nil {
2,898✔
124
                return err
×
125
        }
×
126

127
        if err := updateChildExecutionInfos(
2,898✔
128
                ctx,
2,898✔
129
                tx,
2,898✔
130
                workflowMutation.UpsertChildExecutionInfos,
2,898✔
131
                workflowMutation.DeleteChildExecutionInfos,
2,898✔
132
                shardID,
2,898✔
133
                domainID,
2,898✔
134
                workflowID,
2,898✔
135
                runID,
2,898✔
136
                parser,
2,898✔
137
        ); err != nil {
2,898✔
138
                return err
×
139
        }
×
140

141
        if err := updateRequestCancelInfos(
2,898✔
142
                ctx,
2,898✔
143
                tx,
2,898✔
144
                workflowMutation.UpsertRequestCancelInfos,
2,898✔
145
                workflowMutation.DeleteRequestCancelInfos,
2,898✔
146
                shardID,
2,898✔
147
                domainID,
2,898✔
148
                workflowID,
2,898✔
149
                runID,
2,898✔
150
                parser,
2,898✔
151
        ); err != nil {
2,898✔
152
                return err
×
153
        }
×
154

155
        if err := updateSignalInfos(
2,898✔
156
                ctx,
2,898✔
157
                tx,
2,898✔
158
                workflowMutation.UpsertSignalInfos,
2,898✔
159
                workflowMutation.DeleteSignalInfos,
2,898✔
160
                shardID,
2,898✔
161
                domainID,
2,898✔
162
                workflowID,
2,898✔
163
                runID,
2,898✔
164
                parser,
2,898✔
165
        ); err != nil {
2,898✔
166
                return err
×
167
        }
×
168

169
        if err := updateSignalsRequested(
2,898✔
170
                ctx,
2,898✔
171
                tx,
2,898✔
172
                workflowMutation.UpsertSignalRequestedIDs,
2,898✔
173
                workflowMutation.DeleteSignalRequestedIDs,
2,898✔
174
                shardID,
2,898✔
175
                domainID,
2,898✔
176
                workflowID,
2,898✔
177
                runID,
2,898✔
178
        ); err != nil {
2,898✔
179
                return err
×
180
        }
×
181

182
        if workflowMutation.ClearBufferedEvents {
2,924✔
183
                if err := deleteBufferedEvents(
26✔
184
                        ctx,
26✔
185
                        tx,
26✔
186
                        shardID,
26✔
187
                        domainID,
26✔
188
                        workflowID,
26✔
189
                        runID,
26✔
190
                ); err != nil {
26✔
191
                        return err
×
192
                }
×
193
        }
194

195
        return updateBufferedEvents(
2,898✔
196
                ctx,
2,898✔
197
                tx,
2,898✔
198
                workflowMutation.NewBufferedEvents,
2,898✔
199
                shardID,
2,898✔
200
                domainID,
2,898✔
201
                workflowID,
2,898✔
202
                runID,
2,898✔
203
        )
2,898✔
204
}
205

206
func applyWorkflowSnapshotTxAsReset(
207
        ctx context.Context,
208
        tx sqlplugin.Tx,
209
        shardID int,
210
        workflowSnapshot *p.InternalWorkflowSnapshot,
211
        parser serialization.Parser,
212
) error {
2✔
213

2✔
214
        executionInfo := workflowSnapshot.ExecutionInfo
2✔
215
        versionHistories := workflowSnapshot.VersionHistories
2✔
216
        workflowChecksum := workflowSnapshot.ChecksumData
2✔
217
        startVersion := workflowSnapshot.StartVersion
2✔
218
        lastWriteVersion := workflowSnapshot.LastWriteVersion
2✔
219
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
2✔
220
        workflowID := executionInfo.WorkflowID
2✔
221
        runID := serialization.MustParseUUID(executionInfo.RunID)
2✔
222

2✔
223
        // TODO Is there a way to modify the various map tables without fear of other people adding rows after we delete, without locking the executions row?
2✔
224
        if err := lockAndCheckNextEventID(
2✔
225
                ctx,
2✔
226
                tx,
2✔
227
                shardID,
2✔
228
                domainID,
2✔
229
                workflowID,
2✔
230
                runID,
2✔
231
                workflowSnapshot.Condition); err != nil {
2✔
232
                return err
×
233
        }
×
234

235
        if err := updateExecution(
2✔
236
                ctx,
2✔
237
                tx,
2✔
238
                executionInfo,
2✔
239
                versionHistories,
2✔
240
                workflowChecksum,
2✔
241
                startVersion,
2✔
242
                lastWriteVersion,
2✔
243
                shardID,
2✔
244
                parser); err != nil {
2✔
245
                return err
×
246
        }
×
247

248
        if err := applyTasks(
2✔
249
                ctx,
2✔
250
                tx,
2✔
251
                shardID,
2✔
252
                domainID,
2✔
253
                workflowID,
2✔
254
                runID,
2✔
255
                workflowSnapshot.TransferTasks,
2✔
256
                workflowSnapshot.CrossClusterTasks,
2✔
257
                workflowSnapshot.ReplicationTasks,
2✔
258
                workflowSnapshot.TimerTasks,
2✔
259
                parser,
2✔
260
        ); err != nil {
2✔
261
                return err
×
262
        }
×
263

264
        if err := deleteActivityInfoMap(
2✔
265
                ctx,
2✔
266
                tx,
2✔
267
                shardID,
2✔
268
                domainID,
2✔
269
                workflowID,
2✔
270
                runID); err != nil {
2✔
271
                return err
×
272
        }
×
273

274
        if err := updateActivityInfos(
2✔
275
                ctx,
2✔
276
                tx,
2✔
277
                workflowSnapshot.ActivityInfos,
2✔
278
                nil,
2✔
279
                shardID,
2✔
280
                domainID,
2✔
281
                workflowID,
2✔
282
                runID,
2✔
283
                parser); err != nil {
2✔
284
                return err
×
285
        }
×
286

287
        if err := deleteTimerInfoMap(
2✔
288
                ctx,
2✔
289
                tx,
2✔
290
                shardID,
2✔
291
                domainID,
2✔
292
                workflowID,
2✔
293
                runID); err != nil {
2✔
294
                return err
×
295
        }
×
296

297
        if err := updateTimerInfos(
2✔
298
                ctx,
2✔
299
                tx,
2✔
300
                workflowSnapshot.TimerInfos,
2✔
301
                nil,
2✔
302
                shardID,
2✔
303
                domainID,
2✔
304
                workflowID,
2✔
305
                runID,
2✔
306
                parser); err != nil {
2✔
307
                return err
×
308
        }
×
309

310
        if err := deleteChildExecutionInfoMap(
2✔
311
                ctx,
2✔
312
                tx,
2✔
313
                shardID,
2✔
314
                domainID,
2✔
315
                workflowID,
2✔
316
                runID); err != nil {
2✔
317
                return err
×
318
        }
×
319

320
        if err := updateChildExecutionInfos(
2✔
321
                ctx,
2✔
322
                tx,
2✔
323
                workflowSnapshot.ChildExecutionInfos,
2✔
324
                nil,
2✔
325
                shardID,
2✔
326
                domainID,
2✔
327
                workflowID,
2✔
328
                runID,
2✔
329
                parser); err != nil {
2✔
330
                return err
×
331
        }
×
332

333
        if err := deleteRequestCancelInfoMap(
2✔
334
                ctx,
2✔
335
                tx,
2✔
336
                shardID,
2✔
337
                domainID,
2✔
338
                workflowID,
2✔
339
                runID); err != nil {
2✔
340
                return err
×
341
        }
×
342

343
        if err := updateRequestCancelInfos(
2✔
344
                ctx,
2✔
345
                tx,
2✔
346
                workflowSnapshot.RequestCancelInfos,
2✔
347
                nil,
2✔
348
                shardID,
2✔
349
                domainID,
2✔
350
                workflowID,
2✔
351
                runID,
2✔
352
                parser); err != nil {
2✔
353
                return err
×
354
        }
×
355

356
        if err := deleteSignalInfoMap(
2✔
357
                ctx,
2✔
358
                tx,
2✔
359
                shardID,
2✔
360
                domainID,
2✔
361
                workflowID,
2✔
362
                runID); err != nil {
2✔
363
                return err
×
364
        }
×
365

366
        if err := updateSignalInfos(
2✔
367
                ctx,
2✔
368
                tx,
2✔
369
                workflowSnapshot.SignalInfos,
2✔
370
                nil,
2✔
371
                shardID,
2✔
372
                domainID,
2✔
373
                workflowID,
2✔
374
                runID,
2✔
375
                parser); err != nil {
2✔
376
                return err
×
377
        }
×
378

379
        if err := deleteSignalsRequestedSet(
2✔
380
                ctx,
2✔
381
                tx,
2✔
382
                shardID,
2✔
383
                domainID,
2✔
384
                workflowID,
2✔
385
                runID); err != nil {
2✔
386
                return err
×
387
        }
×
388

389
        if err := updateSignalsRequested(
2✔
390
                ctx,
2✔
391
                tx,
2✔
392
                workflowSnapshot.SignalRequestedIDs,
2✔
393
                nil,
2✔
394
                shardID,
2✔
395
                domainID,
2✔
396
                workflowID,
2✔
397
                runID); err != nil {
2✔
398
                return err
×
399
        }
×
400

401
        return deleteBufferedEvents(
2✔
402
                ctx,
2✔
403
                tx,
2✔
404
                shardID,
2✔
405
                domainID,
2✔
406
                workflowID,
2✔
407
                runID)
2✔
408
}
409

410
func applyWorkflowMutationAsyncTx(
411
        ctx context.Context,
412
        tx sqlplugin.Tx,
413
        shardID int,
414
        workflowMutation *p.InternalWorkflowMutation,
415
        parser serialization.Parser,
416
) error {
×
417

×
418
        executionInfo := workflowMutation.ExecutionInfo
×
419
        versionHistories := workflowMutation.VersionHistories
×
420
        workflowChecksum := workflowMutation.ChecksumData
×
421
        startVersion := workflowMutation.StartVersion
×
422
        lastWriteVersion := workflowMutation.LastWriteVersion
×
423
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
×
424
        workflowID := executionInfo.WorkflowID
×
425
        runID := serialization.MustParseUUID(executionInfo.RunID)
×
426

×
427
        recoverPanic := func(recovered interface{}, err *error) {
×
428
                // revive:disable-next-line:defer Func is being called using defer().
×
429
                if recovered != nil {
×
430
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
431
                }
×
432
        }
433

434
        // TODO Remove me if UPDATE holds the lock to the end of a transaction
435
        if err := lockAndCheckNextEventID(
×
436
                ctx,
×
437
                tx,
×
438
                shardID,
×
439
                domainID,
×
440
                workflowID,
×
441
                runID,
×
442
                workflowMutation.Condition); err != nil {
×
443
                return err
×
444
        }
×
445

446
        if err := updateExecution(
×
447
                ctx,
×
448
                tx,
×
449
                executionInfo,
×
450
                versionHistories,
×
451
                workflowChecksum,
×
452
                startVersion,
×
453
                lastWriteVersion,
×
454
                shardID,
×
455
                parser); err != nil {
×
456
                return err
×
457
        }
×
458

459
        g, ctx := errgroup.WithContext(ctx)
×
460

×
461
        g.Go(func() (e error) {
×
462
                defer func() { recoverPanic(recover(), &e) }()
×
463
                e = createTransferTasks(
×
464
                        ctx,
×
465
                        tx,
×
466
                        workflowMutation.TransferTasks,
×
467
                        shardID,
×
468
                        domainID,
×
469
                        workflowID,
×
470
                        runID,
×
471
                        parser)
×
472
                return e
×
473
        })
474

475
        g.Go(func() (e error) {
×
476
                defer func() { recoverPanic(recover(), &e) }()
×
477
                e = createCrossClusterTasks(
×
478
                        ctx,
×
479
                        tx,
×
480
                        workflowMutation.CrossClusterTasks,
×
481
                        shardID,
×
482
                        domainID,
×
483
                        workflowID,
×
484
                        runID,
×
485
                        parser)
×
486
                return e
×
487
        })
488

489
        g.Go(func() (e error) {
×
490
                defer func() { recoverPanic(recover(), &e) }()
×
491
                e = createReplicationTasks(
×
492
                        ctx,
×
493
                        tx,
×
494
                        workflowMutation.ReplicationTasks,
×
495
                        shardID,
×
496
                        domainID,
×
497
                        workflowID,
×
498
                        runID,
×
499
                        parser)
×
500
                return e
×
501
        })
502

503
        g.Go(func() (e error) {
×
504
                defer func() { recoverPanic(recover(), &e) }()
×
505
                e = createTimerTasks(
×
506
                        ctx,
×
507
                        tx,
×
508
                        workflowMutation.TimerTasks,
×
509
                        shardID,
×
510
                        domainID,
×
511
                        workflowID,
×
512
                        runID,
×
513
                        parser)
×
514
                return e
×
515
        })
516

517
        g.Go(func() (e error) {
×
518
                defer func() { recoverPanic(recover(), &e) }()
×
519
                e = updateActivityInfos(
×
520
                        ctx,
×
521
                        tx,
×
522
                        workflowMutation.UpsertActivityInfos,
×
523
                        workflowMutation.DeleteActivityInfos,
×
524
                        shardID,
×
525
                        domainID,
×
526
                        workflowID,
×
527
                        runID,
×
528
                        parser)
×
529
                return e
×
530
        })
531

532
        g.Go(func() (e error) {
×
533
                defer func() { recoverPanic(recover(), &e) }()
×
534
                e = updateTimerInfos(
×
535
                        ctx,
×
536
                        tx,
×
537
                        workflowMutation.UpsertTimerInfos,
×
538
                        workflowMutation.DeleteTimerInfos,
×
539
                        shardID,
×
540
                        domainID,
×
541
                        workflowID,
×
542
                        runID,
×
543
                        parser,
×
544
                )
×
545
                return e
×
546
        })
547

548
        g.Go(func() (e error) {
×
549
                defer func() { recoverPanic(recover(), &e) }()
×
550
                e = updateChildExecutionInfos(
×
551
                        ctx,
×
552
                        tx,
×
553
                        workflowMutation.UpsertChildExecutionInfos,
×
554
                        workflowMutation.DeleteChildExecutionInfos,
×
555
                        shardID,
×
556
                        domainID,
×
557
                        workflowID,
×
558
                        runID,
×
559
                        parser,
×
560
                )
×
561
                return e
×
562
        })
563

564
        g.Go(func() (e error) {
×
565
                defer func() { recoverPanic(recover(), &e) }()
×
566
                e = updateRequestCancelInfos(
×
567
                        ctx,
×
568
                        tx,
×
569
                        workflowMutation.UpsertRequestCancelInfos,
×
570
                        workflowMutation.DeleteRequestCancelInfos,
×
571
                        shardID,
×
572
                        domainID,
×
573
                        workflowID,
×
574
                        runID,
×
575
                        parser,
×
576
                )
×
577
                return e
×
578
        })
579

580
        g.Go(func() (e error) {
×
581
                defer func() { recoverPanic(recover(), &e) }()
×
582
                e = updateSignalInfos(
×
583
                        ctx,
×
584
                        tx,
×
585
                        workflowMutation.UpsertSignalInfos,
×
586
                        workflowMutation.DeleteSignalInfos,
×
587
                        shardID,
×
588
                        domainID,
×
589
                        workflowID,
×
590
                        runID,
×
591
                        parser,
×
592
                )
×
593
                return e
×
594
        })
595

596
        g.Go(func() (e error) {
×
597
                defer func() { recoverPanic(recover(), &e) }()
×
598
                e = updateSignalsRequested(
×
599
                        ctx,
×
600
                        tx,
×
601
                        workflowMutation.UpsertSignalRequestedIDs,
×
602
                        workflowMutation.DeleteSignalRequestedIDs,
×
603
                        shardID,
×
604
                        domainID,
×
605
                        workflowID,
×
606
                        runID,
×
607
                )
×
608
                return e
×
609
        })
610

611
        g.Go(func() (e error) {
×
612
                defer func() { recoverPanic(recover(), &e) }()
×
613
                if workflowMutation.ClearBufferedEvents {
×
614
                        if e = deleteBufferedEvents(
×
615
                                ctx,
×
616
                                tx,
×
617
                                shardID,
×
618
                                domainID,
×
619
                                workflowID,
×
620
                                runID,
×
621
                        ); e != nil {
×
622
                                return e
×
623
                        }
×
624
                }
625

626
                e = updateBufferedEvents(
×
627
                        ctx,
×
628
                        tx,
×
629
                        workflowMutation.NewBufferedEvents,
×
630
                        shardID,
×
631
                        domainID,
×
632
                        workflowID,
×
633
                        runID,
×
634
                )
×
635
                return e
×
636
        })
637
        return g.Wait()
×
638
}
639

640
func applyWorkflowSnapshotTxAsNew(
641
        ctx context.Context,
642
        tx sqlplugin.Tx,
643
        shardID int,
644
        workflowSnapshot *p.InternalWorkflowSnapshot,
645
        parser serialization.Parser,
646
) error {
422✔
647

422✔
648
        executionInfo := workflowSnapshot.ExecutionInfo
422✔
649
        versionHistories := workflowSnapshot.VersionHistories
422✔
650
        workflowChecksum := workflowSnapshot.ChecksumData
422✔
651
        startVersion := workflowSnapshot.StartVersion
422✔
652
        lastWriteVersion := workflowSnapshot.LastWriteVersion
422✔
653
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
422✔
654
        workflowID := executionInfo.WorkflowID
422✔
655
        runID := serialization.MustParseUUID(executionInfo.RunID)
422✔
656

422✔
657
        if err := createExecution(
422✔
658
                ctx,
422✔
659
                tx,
422✔
660
                executionInfo,
422✔
661
                versionHistories,
422✔
662
                workflowChecksum,
422✔
663
                startVersion,
422✔
664
                lastWriteVersion,
422✔
665
                shardID,
422✔
666
                parser); err != nil {
424✔
667
                return err
2✔
668
        }
2✔
669

670
        if err := applyTasks(
422✔
671
                ctx,
422✔
672
                tx,
422✔
673
                shardID,
422✔
674
                domainID,
422✔
675
                workflowID,
422✔
676
                runID,
422✔
677
                workflowSnapshot.TransferTasks,
422✔
678
                workflowSnapshot.CrossClusterTasks,
422✔
679
                workflowSnapshot.ReplicationTasks,
422✔
680
                workflowSnapshot.TimerTasks,
422✔
681
                parser,
422✔
682
        ); err != nil {
422✔
683
                return err
×
684
        }
×
685

686
        if err := updateActivityInfos(
422✔
687
                ctx,
422✔
688
                tx,
422✔
689
                workflowSnapshot.ActivityInfos,
422✔
690
                nil,
422✔
691
                shardID,
422✔
692
                domainID,
422✔
693
                workflowID,
422✔
694
                runID,
422✔
695
                parser); err != nil {
422✔
696
                return err
×
697
        }
×
698

699
        if err := updateTimerInfos(
422✔
700
                ctx,
422✔
701
                tx,
422✔
702
                workflowSnapshot.TimerInfos,
422✔
703
                nil,
422✔
704
                shardID,
422✔
705
                domainID,
422✔
706
                workflowID,
422✔
707
                runID,
422✔
708
                parser); err != nil {
422✔
709
                return err
×
710
        }
×
711

712
        if err := updateChildExecutionInfos(
422✔
713
                ctx,
422✔
714
                tx,
422✔
715
                workflowSnapshot.ChildExecutionInfos,
422✔
716
                nil,
422✔
717
                shardID,
422✔
718
                domainID,
422✔
719
                workflowID,
422✔
720
                runID,
422✔
721
                parser); err != nil {
422✔
722
                return err
×
723
        }
×
724

725
        if err := updateRequestCancelInfos(
422✔
726
                ctx,
422✔
727
                tx,
422✔
728
                workflowSnapshot.RequestCancelInfos,
422✔
729
                nil,
422✔
730
                shardID,
422✔
731
                domainID,
422✔
732
                workflowID,
422✔
733
                runID,
422✔
734
                parser); err != nil {
422✔
735
                return err
×
736
        }
×
737

738
        if err := updateSignalInfos(
422✔
739
                ctx,
422✔
740
                tx,
422✔
741
                workflowSnapshot.SignalInfos,
422✔
742
                nil,
422✔
743
                shardID,
422✔
744
                domainID,
422✔
745
                workflowID,
422✔
746
                runID,
422✔
747
                parser); err != nil {
422✔
748
                return err
×
749
        }
×
750

751
        return updateSignalsRequested(
422✔
752
                ctx,
422✔
753
                tx,
422✔
754
                workflowSnapshot.SignalRequestedIDs,
422✔
755
                nil,
422✔
756
                shardID,
422✔
757
                domainID,
422✔
758
                workflowID,
422✔
759
                runID)
422✔
760
}
761

762
func (m *sqlExecutionStore) applyWorkflowSnapshotAsyncTxAsNew(
763
        ctx context.Context,
764
        tx sqlplugin.Tx,
765
        shardID int,
766
        workflowSnapshot *p.InternalWorkflowSnapshot,
767
        parser serialization.Parser,
768
) error {
×
769

×
770
        executionInfo := workflowSnapshot.ExecutionInfo
×
771
        versionHistories := workflowSnapshot.VersionHistories
×
772
        workflowChecksum := workflowSnapshot.ChecksumData
×
773
        startVersion := workflowSnapshot.StartVersion
×
774
        lastWriteVersion := workflowSnapshot.LastWriteVersion
×
775
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
×
776
        workflowID := executionInfo.WorkflowID
×
777
        runID := serialization.MustParseUUID(executionInfo.RunID)
×
778

×
779
        recoverPanic := func(recovered interface{}, err *error) {
×
780
                if recovered != nil {
×
781
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
782
                }
×
783
        }
784
        g, ctx := errgroup.WithContext(ctx)
×
785

×
786
        g.Go(func() (e error) {
×
787
                defer func() { recoverPanic(recover(), &e) }()
×
788
                e = createExecution(
×
789
                        ctx,
×
790
                        tx,
×
791
                        executionInfo,
×
792
                        versionHistories,
×
793
                        workflowChecksum,
×
794
                        startVersion,
×
795
                        lastWriteVersion,
×
796
                        shardID,
×
797
                        parser)
×
798
                return e
×
799
        })
800

801
        g.Go(func() (e error) {
×
802
                defer func() { recoverPanic(recover(), &e) }()
×
803
                e = createTransferTasks(
×
804
                        ctx,
×
805
                        tx,
×
806
                        workflowSnapshot.TransferTasks,
×
807
                        shardID,
×
808
                        domainID,
×
809
                        workflowID,
×
810
                        runID,
×
811
                        parser)
×
812
                return e
×
813
        })
814

815
        g.Go(func() (e error) {
×
816
                defer func() { recoverPanic(recover(), &e) }()
×
817
                e = createCrossClusterTasks(
×
818
                        ctx,
×
819
                        tx,
×
820
                        workflowSnapshot.CrossClusterTasks,
×
821
                        shardID,
×
822
                        domainID,
×
823
                        workflowID,
×
824
                        runID,
×
825
                        parser)
×
826
                return e
×
827
        })
828

829
        g.Go(func() (e error) {
×
830
                defer func() { recoverPanic(recover(), &e) }()
×
831
                e = createReplicationTasks(
×
832
                        ctx,
×
833
                        tx,
×
834
                        workflowSnapshot.ReplicationTasks,
×
835
                        shardID,
×
836
                        domainID,
×
837
                        workflowID,
×
838
                        runID,
×
839
                        parser)
×
840
                return e
×
841
        })
842

843
        g.Go(func() (e error) {
×
844
                defer func() { recoverPanic(recover(), &e) }()
×
845
                e = createTimerTasks(
×
846
                        ctx,
×
847
                        tx,
×
848
                        workflowSnapshot.TimerTasks,
×
849
                        shardID,
×
850
                        domainID,
×
851
                        workflowID,
×
852
                        runID,
×
853
                        parser)
×
854
                return e
×
855
        })
856

857
        g.Go(func() (e error) {
×
858
                defer func() { recoverPanic(recover(), &e) }()
×
859
                e = updateActivityInfos(
×
860
                        ctx,
×
861
                        tx,
×
862
                        workflowSnapshot.ActivityInfos,
×
863
                        nil,
×
864
                        shardID,
×
865
                        domainID,
×
866
                        workflowID,
×
867
                        runID,
×
868
                        parser)
×
869
                return e
×
870
        })
871

872
        g.Go(func() (e error) {
×
873
                defer func() { recoverPanic(recover(), &e) }()
×
874
                e = updateTimerInfos(
×
875
                        ctx,
×
876
                        tx,
×
877
                        workflowSnapshot.TimerInfos,
×
878
                        nil,
×
879
                        shardID,
×
880
                        domainID,
×
881
                        workflowID,
×
882
                        runID,
×
883
                        parser)
×
884
                return e
×
885
        })
886

887
        g.Go(func() (e error) {
×
888
                defer func() { recoverPanic(recover(), &e) }()
×
889
                e = updateChildExecutionInfos(
×
890
                        ctx,
×
891
                        tx,
×
892
                        workflowSnapshot.ChildExecutionInfos,
×
893
                        nil,
×
894
                        shardID,
×
895
                        domainID,
×
896
                        workflowID,
×
897
                        runID,
×
898
                        parser)
×
899
                return e
×
900
        })
901

902
        g.Go(func() (e error) {
×
903
                defer func() { recoverPanic(recover(), &e) }()
×
904
                e = updateRequestCancelInfos(
×
905
                        ctx,
×
906
                        tx,
×
907
                        workflowSnapshot.RequestCancelInfos,
×
908
                        nil,
×
909
                        shardID,
×
910
                        domainID,
×
911
                        workflowID,
×
912
                        runID,
×
913
                        parser)
×
914
                return e
×
915
        })
916

917
        g.Go(func() (e error) {
×
918
                defer func() { recoverPanic(recover(), &e) }()
×
919
                e = updateSignalInfos(
×
920
                        ctx,
×
921
                        tx,
×
922
                        workflowSnapshot.SignalInfos,
×
923
                        nil,
×
924
                        shardID,
×
925
                        domainID,
×
926
                        workflowID,
×
927
                        runID,
×
928
                        parser)
×
929
                return e
×
930
        })
931

932
        g.Go(func() (e error) {
×
933
                defer func() { recoverPanic(recover(), &e) }()
×
934
                e = updateSignalsRequested(
×
935
                        ctx,
×
936
                        tx,
×
937
                        workflowSnapshot.SignalRequestedIDs,
×
938
                        nil,
×
939
                        shardID,
×
940
                        domainID,
×
941
                        workflowID,
×
942
                        runID)
×
943
                return e
×
944
        })
945
        return g.Wait()
×
946
}
947

948
func applyTasks(
949
        ctx context.Context,
950
        tx sqlplugin.Tx,
951
        shardID int,
952
        domainID serialization.UUID,
953
        workflowID string,
954
        runID serialization.UUID,
955
        transferTasks []p.Task,
956
        crossClusterTasks []p.Task,
957
        replicationTasks []p.Task,
958
        timerTasks []p.Task,
959
        parser serialization.Parser,
960
) error {
3,318✔
961

3,318✔
962
        if err := createTransferTasks(
3,318✔
963
                ctx,
3,318✔
964
                tx,
3,318✔
965
                transferTasks,
3,318✔
966
                shardID,
3,318✔
967
                domainID,
3,318✔
968
                workflowID,
3,318✔
969
                runID,
3,318✔
970
                parser); err != nil {
3,318✔
971
                return err
×
972
        }
×
973

974
        if err := createCrossClusterTasks(
3,318✔
975
                ctx,
3,318✔
976
                tx,
3,318✔
977
                crossClusterTasks,
3,318✔
978
                shardID,
3,318✔
979
                domainID,
3,318✔
980
                workflowID,
3,318✔
981
                runID,
3,318✔
982
                parser); err != nil {
3,318✔
983
                return err
×
984
        }
×
985

986
        if err := createReplicationTasks(
3,318✔
987
                ctx,
3,318✔
988
                tx,
3,318✔
989
                replicationTasks,
3,318✔
990
                shardID,
3,318✔
991
                domainID,
3,318✔
992
                workflowID,
3,318✔
993
                runID,
3,318✔
994
                parser,
3,318✔
995
        ); err != nil {
3,318✔
996
                return err
×
997
        }
×
998

999
        return createTimerTasks(
3,318✔
1000
                ctx,
3,318✔
1001
                tx,
3,318✔
1002
                timerTasks,
3,318✔
1003
                shardID,
3,318✔
1004
                domainID,
3,318✔
1005
                workflowID,
3,318✔
1006
                runID,
3,318✔
1007
                parser,
3,318✔
1008
        )
3,318✔
1009
}
1010

1011
// lockCurrentExecutionIfExists returns current execution or nil if none is found for the workflowID
1012
// locking it in the DB
1013
func lockCurrentExecutionIfExists(
1014
        ctx context.Context,
1015
        tx sqlplugin.Tx,
1016
        shardID int,
1017
        domainID serialization.UUID,
1018
        workflowID string,
1019
) (*sqlplugin.CurrentExecutionsRow, error) {
336✔
1020

336✔
1021
        rows, err := tx.LockCurrentExecutionsJoinExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
336✔
1022
                ShardID:    int64(shardID),
336✔
1023
                DomainID:   domainID,
336✔
1024
                WorkflowID: workflowID,
336✔
1025
        })
336✔
1026
        if err != nil {
336✔
1027
                if err != sql.ErrNoRows {
×
1028
                        return nil, convertCommonErrors(tx, "lockCurrentExecutionIfExists", fmt.Sprintf("Failed to get current_executions row for (shard,domain,workflow) = (%v, %v, %v).", shardID, domainID, workflowID), err)
×
1029
                }
×
1030
        }
1031
        size := len(rows)
336✔
1032
        if size > 1 {
336✔
1033
                return nil, &types.InternalServiceError{
×
1034
                        Message: fmt.Sprintf("lockCurrentExecutionIfExists failed. Multiple current_executions rows for (shard,domain,workflow) = (%v, %v, %v).", shardID, domainID, workflowID),
×
1035
                }
×
1036
        }
×
1037
        if size == 0 {
624✔
1038
                return nil, nil
288✔
1039
        }
288✔
1040
        return &rows[0], nil
50✔
1041
}
1042

1043
func createOrUpdateCurrentExecution(
1044
        ctx context.Context,
1045
        tx sqlplugin.Tx,
1046
        createMode p.CreateWorkflowMode,
1047
        shardID int,
1048
        domainID serialization.UUID,
1049
        workflowID string,
1050
        runID serialization.UUID,
1051
        state int,
1052
        closeStatus int,
1053
        createRequestID string,
1054
        startVersion int64,
1055
        lastWriteVersion int64,
1056
) error {
308✔
1057

308✔
1058
        row := sqlplugin.CurrentExecutionsRow{
308✔
1059
                ShardID:          int64(shardID),
308✔
1060
                DomainID:         domainID,
308✔
1061
                WorkflowID:       workflowID,
308✔
1062
                RunID:            runID,
308✔
1063
                CreateRequestID:  createRequestID,
308✔
1064
                State:            state,
308✔
1065
                CloseStatus:      closeStatus,
308✔
1066
                StartVersion:     startVersion,
308✔
1067
                LastWriteVersion: lastWriteVersion,
308✔
1068
        }
308✔
1069

308✔
1070
        switch createMode {
308✔
1071
        case p.CreateWorkflowModeContinueAsNew,
1072
                p.CreateWorkflowModeWorkflowIDReuse:
22✔
1073
                if err := updateCurrentExecution(
22✔
1074
                        ctx,
22✔
1075
                        tx,
22✔
1076
                        shardID,
22✔
1077
                        domainID,
22✔
1078
                        workflowID,
22✔
1079
                        runID,
22✔
1080
                        createRequestID,
22✔
1081
                        state,
22✔
1082
                        closeStatus,
22✔
1083
                        row.StartVersion,
22✔
1084
                        row.LastWriteVersion); err != nil {
22✔
1085
                        return err
×
1086
                }
×
1087
        case p.CreateWorkflowModeBrandNew:
288✔
1088
                if _, err := tx.InsertIntoCurrentExecutions(ctx, &row); err != nil {
288✔
1089
                        return convertCommonErrors(tx, "createOrUpdateCurrentExecution", "Failed to insert into current_executions table.", err)
×
1090
                }
×
1091
        case p.CreateWorkflowModeZombie:
2✔
1092
                // noop
1093
        default:
×
1094
                return fmt.Errorf("createOrUpdateCurrentExecution failed. Unknown workflow creation mode: %v", createMode)
×
1095
        }
1096

1097
        return nil
308✔
1098
}
1099

1100
func lockAndCheckNextEventID(
1101
        ctx context.Context,
1102
        tx sqlplugin.Tx,
1103
        shardID int,
1104
        domainID serialization.UUID,
1105
        workflowID string,
1106
        runID serialization.UUID,
1107
        condition int64,
1108
) error {
2,898✔
1109

2,898✔
1110
        nextEventID, err := lockNextEventID(
2,898✔
1111
                ctx,
2,898✔
1112
                tx,
2,898✔
1113
                shardID,
2,898✔
1114
                domainID,
2,898✔
1115
                workflowID,
2,898✔
1116
                runID,
2,898✔
1117
        )
2,898✔
1118

2,898✔
1119
        if err != nil {
2,898✔
1120
                return err
×
1121
        }
×
1122
        if *nextEventID != condition {
2,898✔
1123
                return &p.ConditionFailedError{
×
1124
                        Msg: fmt.Sprintf("lockAndCheckNextEventID failed. Next_event_id was %v when it should have been %v.", nextEventID, condition),
×
1125
                }
×
1126
        }
×
1127
        return nil
2,898✔
1128
}
1129

1130
func lockNextEventID(
1131
        ctx context.Context,
1132
        tx sqlplugin.Tx,
1133
        shardID int,
1134
        domainID serialization.UUID,
1135
        workflowID string,
1136
        runID serialization.UUID,
1137
) (*int64, error) {
2,898✔
1138

2,898✔
1139
        nextEventID, err := tx.WriteLockExecutions(ctx, &sqlplugin.ExecutionsFilter{
2,898✔
1140
                ShardID:    shardID,
2,898✔
1141
                DomainID:   domainID,
2,898✔
1142
                WorkflowID: workflowID,
2,898✔
1143
                RunID:      runID,
2,898✔
1144
        })
2,898✔
1145
        if err != nil {
2,898✔
1146
                if err == sql.ErrNoRows {
×
1147
                        return nil, &types.EntityNotExistsError{
×
1148
                                Message: fmt.Sprintf(
×
1149
                                        "lockNextEventID failed. Unable to lock executions row with (shard, domain, workflow, run) = (%v,%v,%v,%v) which does not exist.",
×
1150
                                        shardID,
×
1151
                                        domainID,
×
1152
                                        workflowID,
×
1153
                                        runID,
×
1154
                                ),
×
1155
                        }
×
1156
                }
×
1157
                return nil, convertCommonErrors(tx, "lockNextEventID", "", err)
×
1158
        }
1159
        result := int64(nextEventID)
2,898✔
1160
        return &result, nil
2,898✔
1161
}
1162

1163
func createCrossClusterTasks(
1164
        ctx context.Context,
1165
        tx sqlplugin.Tx,
1166
        crossClusterTasks []p.Task,
1167
        shardID int,
1168
        domainID serialization.UUID,
1169
        workflowID string,
1170
        runID serialization.UUID,
1171
        parser serialization.Parser,
1172
) error {
3,318✔
1173
        if len(crossClusterTasks) == 0 {
6,636✔
1174
                return nil
3,318✔
1175
        }
3,318✔
1176

1177
        crossClusterTasksRows := make([]sqlplugin.CrossClusterTasksRow, len(crossClusterTasks))
×
1178
        for i, task := range crossClusterTasks {
×
1179
                info := &serialization.CrossClusterTaskInfo{
×
1180
                        DomainID:            domainID,
×
1181
                        WorkflowID:          workflowID,
×
1182
                        RunID:               runID,
×
1183
                        TaskType:            int16(task.GetType()),
×
1184
                        TargetDomainID:      domainID,
×
1185
                        TargetWorkflowID:    p.TransferTaskTransferTargetWorkflowID,
×
1186
                        TargetRunID:         serialization.UUID(p.CrossClusterTaskDefaultTargetRunID),
×
1187
                        ScheduleID:          0,
×
1188
                        Version:             task.GetVersion(),
×
1189
                        VisibilityTimestamp: task.GetVisibilityTimestamp(),
×
1190
                }
×
1191

×
1192
                crossClusterTasksRows[i].ShardID = shardID
×
1193
                crossClusterTasksRows[i].TaskID = task.GetTaskID()
×
1194

×
1195
                switch task.GetType() {
×
1196
                case p.CrossClusterTaskTypeStartChildExecution:
×
1197
                        crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterStartChildExecutionTask).TargetCluster
×
1198
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.CrossClusterStartChildExecutionTask).TargetDomainID)
×
1199
                        info.TargetWorkflowID = task.(*p.CrossClusterStartChildExecutionTask).TargetWorkflowID
×
1200
                        info.ScheduleID = task.(*p.CrossClusterStartChildExecutionTask).InitiatedID
×
1201

1202
                case p.CrossClusterTaskTypeCancelExecution:
×
1203
                        crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterCancelExecutionTask).TargetCluster
×
1204
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.CrossClusterCancelExecutionTask).TargetDomainID)
×
1205
                        info.TargetWorkflowID = task.(*p.CrossClusterCancelExecutionTask).TargetWorkflowID
×
1206
                        if targetRunID := task.(*p.CrossClusterCancelExecutionTask).TargetRunID; targetRunID != "" {
×
1207
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
×
1208
                        }
×
1209
                        info.TargetChildWorkflowOnly = task.(*p.CrossClusterCancelExecutionTask).TargetChildWorkflowOnly
×
1210
                        info.ScheduleID = task.(*p.CrossClusterCancelExecutionTask).InitiatedID
×
1211

1212
                case p.CrossClusterTaskTypeSignalExecution:
×
1213
                        crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterSignalExecutionTask).TargetCluster
×
1214
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.CrossClusterSignalExecutionTask).TargetDomainID)
×
1215
                        info.TargetWorkflowID = task.(*p.CrossClusterSignalExecutionTask).TargetWorkflowID
×
1216
                        if targetRunID := task.(*p.CrossClusterSignalExecutionTask).TargetRunID; targetRunID != "" {
×
1217
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
×
1218
                        }
×
1219
                        info.TargetChildWorkflowOnly = task.(*p.CrossClusterSignalExecutionTask).TargetChildWorkflowOnly
×
1220
                        info.ScheduleID = task.(*p.CrossClusterSignalExecutionTask).InitiatedID
×
1221

1222
                case p.CrossClusterTaskTypeRecordChildExeuctionCompleted:
×
1223
                        crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterRecordChildExecutionCompletedTask).TargetCluster
×
1224
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.CrossClusterRecordChildExecutionCompletedTask).TargetDomainID)
×
1225
                        info.TargetWorkflowID = task.(*p.CrossClusterRecordChildExecutionCompletedTask).TargetWorkflowID
×
1226
                        if targetRunID := task.(*p.CrossClusterRecordChildExecutionCompletedTask).TargetRunID; targetRunID != "" {
×
1227
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
×
1228
                        }
×
1229

1230
                case p.CrossClusterTaskTypeApplyParentClosePolicy:
×
1231
                        crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterApplyParentClosePolicyTask).TargetCluster
×
1232
                        for domainID := range task.(*p.CrossClusterApplyParentClosePolicyTask).TargetDomainIDs {
×
1233
                                info.TargetDomainIDs = append(info.TargetDomainIDs, serialization.MustParseUUID(domainID))
×
1234
                        }
×
1235

1236
                default:
×
1237
                        return &types.InternalServiceError{
×
1238
                                Message: fmt.Sprintf("Unknown cross-cluster task type: %v", task.GetType()),
×
1239
                        }
×
1240
                }
1241

1242
                blob, err := parser.CrossClusterTaskInfoToBlob(info)
×
1243
                if err != nil {
×
1244
                        return err
×
1245
                }
×
1246
                crossClusterTasksRows[i].Data = blob.Data
×
1247
                crossClusterTasksRows[i].DataEncoding = string(blob.Encoding)
×
1248
        }
1249

1250
        result, err := tx.InsertIntoCrossClusterTasks(ctx, crossClusterTasksRows)
×
1251
        if err != nil {
×
1252
                return convertCommonErrors(tx, "createCrossClusterTasks", "", err)
×
1253
        }
×
1254

1255
        rowsAffected, err := result.RowsAffected()
×
1256
        if err != nil {
×
1257
                return &types.InternalServiceError{
×
1258
                        Message: fmt.Sprintf("createTransferTasks failed. Could not verify number of rows inserted. Error: %v", err),
×
1259
                }
×
1260
        }
×
1261

1262
        if int(rowsAffected) != len(crossClusterTasks) {
×
1263
                return &types.InternalServiceError{
×
1264
                        Message: fmt.Sprintf("createCrossClusterTasks failed. Inserted %v instead of %v rows into transfer_tasks. Error: %v", rowsAffected, len(crossClusterTasks), err),
×
1265
                }
×
1266
        }
×
1267

1268
        return nil
×
1269
}
1270

1271
func createTransferTasks(
1272
        ctx context.Context,
1273
        tx sqlplugin.Tx,
1274
        transferTasks []p.Task,
1275
        shardID int,
1276
        domainID serialization.UUID,
1277
        workflowID string,
1278
        runID serialization.UUID,
1279
        parser serialization.Parser,
1280
) error {
3,318✔
1281

3,318✔
1282
        if len(transferTasks) == 0 {
5,146✔
1283
                return nil
1,828✔
1284
        }
1,828✔
1285

1286
        transferTasksRows := make([]sqlplugin.TransferTasksRow, len(transferTasks))
1,492✔
1287
        for i, task := range transferTasks {
3,400✔
1288
                info := &serialization.TransferTaskInfo{
1,908✔
1289
                        DomainID:            domainID,
1,908✔
1290
                        WorkflowID:          workflowID,
1,908✔
1291
                        RunID:               runID,
1,908✔
1292
                        TaskType:            int16(task.GetType()),
1,908✔
1293
                        TargetDomainID:      domainID,
1,908✔
1294
                        TargetWorkflowID:    p.TransferTaskTransferTargetWorkflowID,
1,908✔
1295
                        ScheduleID:          0,
1,908✔
1296
                        Version:             task.GetVersion(),
1,908✔
1297
                        VisibilityTimestamp: task.GetVisibilityTimestamp(),
1,908✔
1298
                }
1,908✔
1299

1,908✔
1300
                transferTasksRows[i].ShardID = shardID
1,908✔
1301
                transferTasksRows[i].TaskID = task.GetTaskID()
1,908✔
1302

1,908✔
1303
                switch task.GetType() {
1,908✔
1304
                case p.TransferTaskTypeActivityTask:
268✔
1305
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.ActivityTask).DomainID)
268✔
1306
                        info.TaskList = task.(*p.ActivityTask).TaskList
268✔
1307
                        info.ScheduleID = task.(*p.ActivityTask).ScheduleID
268✔
1308

1309
                case p.TransferTaskTypeDecisionTask:
881✔
1310
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.DecisionTask).DomainID)
881✔
1311
                        info.TaskList = task.(*p.DecisionTask).TaskList
881✔
1312
                        info.ScheduleID = task.(*p.DecisionTask).ScheduleID
881✔
1313

1314
                case p.TransferTaskTypeCancelExecution:
6✔
1315
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.CancelExecutionTask).TargetDomainID)
6✔
1316
                        info.TargetWorkflowID = task.(*p.CancelExecutionTask).TargetWorkflowID
6✔
1317
                        if targetRunID := task.(*p.CancelExecutionTask).TargetRunID; targetRunID != "" {
12✔
1318
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
6✔
1319
                        }
6✔
1320
                        info.TargetChildWorkflowOnly = task.(*p.CancelExecutionTask).TargetChildWorkflowOnly
6✔
1321
                        info.ScheduleID = task.(*p.CancelExecutionTask).InitiatedID
6✔
1322

1323
                case p.TransferTaskTypeSignalExecution:
10✔
1324
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.SignalExecutionTask).TargetDomainID)
10✔
1325
                        info.TargetWorkflowID = task.(*p.SignalExecutionTask).TargetWorkflowID
10✔
1326
                        if targetRunID := task.(*p.SignalExecutionTask).TargetRunID; targetRunID != "" {
18✔
1327
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
8✔
1328
                        }
8✔
1329
                        info.TargetChildWorkflowOnly = task.(*p.SignalExecutionTask).TargetChildWorkflowOnly
10✔
1330
                        info.ScheduleID = task.(*p.SignalExecutionTask).InitiatedID
10✔
1331

1332
                case p.TransferTaskTypeStartChildExecution:
14✔
1333
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.StartChildExecutionTask).TargetDomainID)
14✔
1334
                        info.TargetWorkflowID = task.(*p.StartChildExecutionTask).TargetWorkflowID
14✔
1335
                        info.ScheduleID = task.(*p.StartChildExecutionTask).InitiatedID
14✔
1336

1337
                case p.TransferTaskTypeRecordChildExecutionCompleted:
×
1338
                        info.TargetDomainID = serialization.MustParseUUID(task.(*p.RecordChildExecutionCompletedTask).TargetDomainID)
×
1339
                        info.TargetWorkflowID = task.(*p.RecordChildExecutionCompletedTask).TargetWorkflowID
×
1340
                        if targetRunID := task.(*p.RecordChildExecutionCompletedTask).TargetRunID; targetRunID != "" {
×
1341
                                info.TargetRunID = serialization.MustParseUUID(targetRunID)
×
1342
                        }
×
1343

1344
                case p.TransferTaskTypeApplyParentClosePolicy:
×
1345
                        for targetDomainID := range task.(*p.ApplyParentClosePolicyTask).TargetDomainIDs {
×
1346
                                info.TargetDomainIDs = append(info.TargetDomainIDs, serialization.MustParseUUID(targetDomainID))
×
1347
                        }
×
1348

1349
                case p.TransferTaskTypeCloseExecution,
1350
                        p.TransferTaskTypeRecordWorkflowStarted,
1351
                        p.TransferTaskTypeResetWorkflow,
1352
                        p.TransferTaskTypeUpsertWorkflowSearchAttributes,
1353
                        p.TransferTaskTypeRecordWorkflowClosed:
739✔
1354
                        // No explicit property needs to be set
1355

1356
                default:
×
1357
                        return &types.InternalServiceError{
×
1358
                                Message: fmt.Sprintf("createTransferTasks failed. Unknown transfer type: %v", task.GetType()),
×
1359
                        }
×
1360
                }
1361

1362
                blob, err := parser.TransferTaskInfoToBlob(info)
1,908✔
1363
                if err != nil {
1,908✔
1364
                        return err
×
1365
                }
×
1366
                transferTasksRows[i].Data = blob.Data
1,908✔
1367
                transferTasksRows[i].DataEncoding = string(blob.Encoding)
1,908✔
1368
        }
1369

1370
        result, err := tx.InsertIntoTransferTasks(ctx, transferTasksRows)
1,492✔
1371
        if err != nil {
1,492✔
1372
                return convertCommonErrors(tx, "createTransferTasks", "", err)
×
1373
        }
×
1374

1375
        rowsAffected, err := result.RowsAffected()
1,492✔
1376
        if err != nil {
1,492✔
1377
                return &types.InternalServiceError{
×
1378
                        Message: fmt.Sprintf("createTransferTasks failed. Could not verify number of rows inserted. Error: %v", err),
×
1379
                }
×
1380
        }
×
1381

1382
        if int(rowsAffected) != len(transferTasks) {
1,492✔
1383
                return &types.InternalServiceError{
×
1384
                        Message: fmt.Sprintf("createTransferTasks failed. Inserted %v instead of %v rows into transfer_tasks. Error: %v", rowsAffected, len(transferTasks), err),
×
1385
                }
×
1386
        }
×
1387

1388
        return nil
1,492✔
1389
}
1390

1391
func createReplicationTasks(
1392
        ctx context.Context,
1393
        tx sqlplugin.Tx,
1394
        replicationTasks []p.Task,
1395
        shardID int,
1396
        domainID serialization.UUID,
1397
        workflowID string,
1398
        runID serialization.UUID,
1399
        parser serialization.Parser,
1400
) error {
3,318✔
1401

3,318✔
1402
        if len(replicationTasks) == 0 {
6,636✔
1403
                return nil
3,318✔
1404
        }
3,318✔
1405
        replicationTasksRows := make([]sqlplugin.ReplicationTasksRow, len(replicationTasks))
×
1406

×
1407
        for i, task := range replicationTasks {
×
1408

×
1409
                firstEventID := common.EmptyEventID
×
1410
                nextEventID := common.EmptyEventID
×
1411
                version := common.EmptyVersion
×
1412
                activityScheduleID := common.EmptyEventID
×
1413
                var branchToken, newRunBranchToken []byte
×
1414

×
1415
                switch task.GetType() {
×
1416
                case p.ReplicationTaskTypeHistory:
×
1417
                        historyReplicationTask, ok := task.(*p.HistoryReplicationTask)
×
1418
                        if !ok {
×
1419
                                return &types.InternalServiceError{
×
1420
                                        Message: fmt.Sprintf("createReplicationTasks failed. Failed to cast %v to HistoryReplicationTask", task),
×
1421
                                }
×
1422
                        }
×
1423
                        firstEventID = historyReplicationTask.FirstEventID
×
1424
                        nextEventID = historyReplicationTask.NextEventID
×
1425
                        version = task.GetVersion()
×
1426
                        branchToken = historyReplicationTask.BranchToken
×
1427
                        newRunBranchToken = historyReplicationTask.NewRunBranchToken
×
1428

1429
                case p.ReplicationTaskTypeSyncActivity:
×
1430
                        version = task.GetVersion()
×
1431
                        activityScheduleID = task.(*p.SyncActivityTask).ScheduledID
×
1432

1433
                case p.ReplicationTaskTypeFailoverMarker:
×
1434
                        version = task.GetVersion()
×
1435

1436
                default:
×
1437
                        return &types.InternalServiceError{
×
1438
                                Message: fmt.Sprintf("Unknown replication task: %v", task.GetType()),
×
1439
                        }
×
1440
                }
1441

1442
                blob, err := parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{
×
1443
                        DomainID:                domainID,
×
1444
                        WorkflowID:              workflowID,
×
1445
                        RunID:                   runID,
×
1446
                        TaskType:                int16(task.GetType()),
×
1447
                        FirstEventID:            firstEventID,
×
1448
                        NextEventID:             nextEventID,
×
1449
                        Version:                 version,
×
1450
                        ScheduledID:             activityScheduleID,
×
1451
                        EventStoreVersion:       p.EventStoreVersion,
×
1452
                        NewRunEventStoreVersion: p.EventStoreVersion,
×
1453
                        BranchToken:             branchToken,
×
1454
                        NewRunBranchToken:       newRunBranchToken,
×
1455
                        CreationTimestamp:       task.GetVisibilityTimestamp(),
×
1456
                })
×
1457
                if err != nil {
×
1458
                        return err
×
1459
                }
×
1460
                replicationTasksRows[i].ShardID = shardID
×
1461
                replicationTasksRows[i].TaskID = task.GetTaskID()
×
1462
                replicationTasksRows[i].Data = blob.Data
×
1463
                replicationTasksRows[i].DataEncoding = string(blob.Encoding)
×
1464
        }
1465

1466
        result, err := tx.InsertIntoReplicationTasks(ctx, replicationTasksRows)
×
1467
        if err != nil {
×
1468
                return convertCommonErrors(tx, "createReplicationTasks", "", err)
×
1469
        }
×
1470

1471
        rowsAffected, err := result.RowsAffected()
×
1472
        if err != nil {
×
1473
                return &types.InternalServiceError{
×
1474
                        Message: fmt.Sprintf("createReplicationTasks failed. Could not verify number of rows inserted. Error: %v", err),
×
1475
                }
×
1476
        }
×
1477

1478
        if int(rowsAffected) != len(replicationTasks) {
×
1479
                return &types.InternalServiceError{
×
1480
                        Message: fmt.Sprintf("createReplicationTasks failed. Inserted %v instead of %v rows into transfer_tasks. Error: %v", rowsAffected, len(replicationTasks), err),
×
1481
                }
×
1482
        }
×
1483

1484
        return nil
×
1485
}
1486

1487
func createTimerTasks(
1488
        ctx context.Context,
1489
        tx sqlplugin.Tx,
1490
        timerTasks []p.Task,
1491
        shardID int,
1492
        domainID serialization.UUID,
1493
        workflowID string,
1494
        runID serialization.UUID,
1495
        parser serialization.Parser,
1496
) error {
3,318✔
1497

3,318✔
1498
        if len(timerTasks) == 0 {
4,615✔
1499
                return nil
1,297✔
1500
        }
1,297✔
1501

1502
        timerTasksRows := make([]sqlplugin.TimerTasksRow, len(timerTasks))
2,023✔
1503

2,023✔
1504
        for i, task := range timerTasks {
4,108✔
1505
                info := &serialization.TimerTaskInfo{
2,085✔
1506
                        DomainID:        domainID,
2,085✔
1507
                        WorkflowID:      workflowID,
2,085✔
1508
                        RunID:           runID,
2,085✔
1509
                        TaskType:        int16(task.GetType()),
2,085✔
1510
                        Version:         task.GetVersion(),
2,085✔
1511
                        EventID:         common.EmptyEventID,
2,085✔
1512
                        ScheduleAttempt: 0,
2,085✔
1513
                }
2,085✔
1514

2,085✔
1515
                switch t := task.(type) {
2,085✔
1516
                case *p.DecisionTimeoutTask:
799✔
1517
                        info.EventID = t.EventID
799✔
1518
                        info.TimeoutType = common.Int16Ptr(int16(t.TimeoutType))
799✔
1519
                        info.ScheduleAttempt = t.ScheduleAttempt
799✔
1520

1521
                case *p.ActivityTimeoutTask:
487✔
1522
                        info.EventID = t.EventID
487✔
1523
                        info.TimeoutType = common.Int16Ptr(int16(t.TimeoutType))
487✔
1524
                        info.ScheduleAttempt = t.Attempt
487✔
1525

1526
                case *p.UserTimerTask:
22✔
1527
                        info.EventID = t.EventID
22✔
1528

1529
                case *p.ActivityRetryTimerTask:
6✔
1530
                        info.EventID = t.EventID
6✔
1531
                        info.ScheduleAttempt = int64(t.Attempt)
6✔
1532

1533
                case *p.WorkflowBackoffTimerTask:
46✔
1534
                        info.EventID = t.EventID
46✔
1535
                        info.TimeoutType = common.Int16Ptr(int16(t.TimeoutType))
46✔
1536

1537
                case *p.WorkflowTimeoutTask:
422✔
1538
                        // noop
1539

1540
                case *p.DeleteHistoryEventTask:
313✔
1541
                        // noop
1542

1543
                default:
×
1544
                        return &types.InternalServiceError{
×
1545
                                Message: fmt.Sprintf("createTimerTasks failed. Unknown timer task: %v", task.GetType()),
×
1546
                        }
×
1547
                }
1548

1549
                blob, err := parser.TimerTaskInfoToBlob(info)
2,085✔
1550
                if err != nil {
2,085✔
1551
                        return err
×
1552
                }
×
1553

1554
                timerTasksRows[i].ShardID = shardID
2,085✔
1555
                timerTasksRows[i].VisibilityTimestamp = task.GetVisibilityTimestamp()
2,085✔
1556
                timerTasksRows[i].TaskID = task.GetTaskID()
2,085✔
1557
                timerTasksRows[i].Data = blob.Data
2,085✔
1558
                timerTasksRows[i].DataEncoding = string(blob.Encoding)
2,085✔
1559
        }
1560

1561
        result, err := tx.InsertIntoTimerTasks(ctx, timerTasksRows)
2,023✔
1562
        if err != nil {
2,023✔
1563
                return convertCommonErrors(tx, "createTimerTasks", "", err)
×
1564
        }
×
1565
        rowsAffected, err := result.RowsAffected()
2,023✔
1566
        if err != nil {
2,023✔
1567
                return &types.InternalServiceError{
×
1568
                        Message: fmt.Sprintf("createTimerTasks failed. Could not verify number of rows inserted. Error: %v", err),
×
1569
                }
×
1570
        }
×
1571

1572
        if int(rowsAffected) != len(timerTasks) {
2,023✔
1573
                return &types.InternalServiceError{
×
1574
                        Message: fmt.Sprintf("createTimerTasks failed. Inserted %v instead of %v rows into timer_tasks.", rowsAffected, len(timerTasks)),
×
1575
                }
×
1576
        }
×
1577

1578
        return nil
2,023✔
1579
}
1580

1581
func assertNotCurrentExecution(
1582
        ctx context.Context,
1583
        tx sqlplugin.Tx,
1584
        shardID int,
1585
        domainID serialization.UUID,
1586
        workflowID string,
1587
        runID serialization.UUID,
1588
) error {
2✔
1589
        currentRow, err := tx.LockCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
2✔
1590
                ShardID:    int64(shardID),
2✔
1591
                DomainID:   domainID,
2✔
1592
                WorkflowID: workflowID,
2✔
1593
        })
2✔
1594
        if err != nil {
2✔
1595
                if err == sql.ErrNoRows {
×
1596
                        // allow bypassing no current record
×
1597
                        return nil
×
1598
                }
×
1599
                return convertCommonErrors(tx, "assertCurrentExecution", "Unable to load current record.", err)
×
1600
        }
1601
        return assertRunIDMismatch(runID, currentRow.RunID)
2✔
1602
}
1603

1604
func assertRunIDAndUpdateCurrentExecution(
1605
        ctx context.Context,
1606
        tx sqlplugin.Tx,
1607
        shardID int,
1608
        domainID serialization.UUID,
1609
        workflowID string,
1610
        newRunID serialization.UUID,
1611
        previousRunID serialization.UUID,
1612
        createRequestID string,
1613
        state int,
1614
        closeStatus int,
1615
        startVersion int64,
1616
        lastWriteVersion int64,
1617
) error {
2,898✔
1618

2,898✔
1619
        assertFn := func(currentRow *sqlplugin.CurrentExecutionsRow) error {
5,796✔
1620
                if !bytes.Equal(currentRow.RunID, previousRunID) {
2,898✔
1621
                        return &p.ConditionFailedError{Msg: fmt.Sprintf(
×
1622
                                "assertRunIDAndUpdateCurrentExecution failed. Current run ID was %v, expected %v",
×
1623
                                currentRow.RunID,
×
1624
                                previousRunID,
×
1625
                        )}
×
1626
                }
×
1627
                return nil
2,898✔
1628
        }
1629
        if err := assertCurrentExecution(ctx, tx, shardID, domainID, workflowID, assertFn); err != nil {
2,898✔
1630
                return err
×
1631
        }
×
1632

1633
        return updateCurrentExecution(ctx, tx, shardID, domainID, workflowID, newRunID, createRequestID, state, closeStatus, startVersion, lastWriteVersion)
2,898✔
1634
}
1635

1636
func assertCurrentExecution(
1637
        ctx context.Context,
1638
        tx sqlplugin.Tx,
1639
        shardID int,
1640
        domainID serialization.UUID,
1641
        workflowID string,
1642
        assertFn func(currentRow *sqlplugin.CurrentExecutionsRow) error,
1643
) error {
2,898✔
1644

2,898✔
1645
        currentRow, err := tx.LockCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
2,898✔
1646
                ShardID:    int64(shardID),
2,898✔
1647
                DomainID:   domainID,
2,898✔
1648
                WorkflowID: workflowID,
2,898✔
1649
        })
2,898✔
1650
        if err != nil {
2,898✔
1651
                return convertCommonErrors(tx, "assertCurrentExecution", "Unable to load current record.", err)
×
1652
        }
×
1653
        return assertFn(currentRow)
2,898✔
1654
}
1655

1656
func assertRunIDMismatch(runID serialization.UUID, currentRunID serialization.UUID) error {
2✔
1657
        // zombie workflow creation with existence of current record, this is a noop
2✔
1658
        if bytes.Equal(currentRunID, runID) {
2✔
1659
                return &p.ConditionFailedError{Msg: fmt.Sprintf(
×
1660
                        "assertRunIDMismatch failed. Current run ID was %v, input %v",
×
1661
                        currentRunID,
×
1662
                        runID,
×
1663
                )}
×
1664
        }
×
1665
        return nil
2✔
1666
}
1667

1668
func updateCurrentExecution(
1669
        ctx context.Context,
1670
        tx sqlplugin.Tx,
1671
        shardID int,
1672
        domainID serialization.UUID,
1673
        workflowID string,
1674
        runID serialization.UUID,
1675
        createRequestID string,
1676
        state int,
1677
        closeStatus int,
1678
        startVersion int64,
1679
        lastWriteVersion int64,
1680
) error {
2,918✔
1681

2,918✔
1682
        result, err := tx.UpdateCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsRow{
2,918✔
1683
                ShardID:          int64(shardID),
2,918✔
1684
                DomainID:         domainID,
2,918✔
1685
                WorkflowID:       workflowID,
2,918✔
1686
                RunID:            runID,
2,918✔
1687
                CreateRequestID:  createRequestID,
2,918✔
1688
                State:            state,
2,918✔
1689
                CloseStatus:      closeStatus,
2,918✔
1690
                StartVersion:     startVersion,
2,918✔
1691
                LastWriteVersion: lastWriteVersion,
2,918✔
1692
        })
2,918✔
1693
        if err != nil {
2,918✔
1694
                return convertCommonErrors(tx, "updateCurrentExecution", "", err)
×
1695
        }
×
1696
        rowsAffected, err := result.RowsAffected()
2,918✔
1697
        if err != nil {
2,918✔
1698
                return &types.InternalServiceError{
×
1699
                        Message: fmt.Sprintf("updateCurrentExecution failed. Failed to check number of rows updated in current_executions table. Error: %v", err),
×
1700
                }
×
1701
        }
×
1702
        if rowsAffected != 1 {
2,918✔
1703
                return &types.InternalServiceError{
×
1704
                        Message: fmt.Sprintf("updateCurrentExecution failed. %v rows of current_executions updated instead of 1.", rowsAffected),
×
1705
                }
×
1706
        }
×
1707
        return nil
2,918✔
1708
}
1709

1710
func buildExecutionRow(
1711
        executionInfo *p.InternalWorkflowExecutionInfo,
1712
        versionHistories *p.DataBlob,
1713
        workflowChecksum *p.DataBlob,
1714
        startVersion int64,
1715
        lastWriteVersion int64,
1716
        shardID int,
1717
        parser serialization.Parser,
1718
) (row *sqlplugin.ExecutionsRow, err error) {
3,318✔
1719

3,318✔
1720
        info := serialization.FromInternalWorkflowExecutionInfo(executionInfo)
3,318✔
1721

3,318✔
1722
        info.StartVersion = startVersion
3,318✔
1723
        if versionHistories == nil {
3,318✔
1724
                // this is allowed
×
1725
        } else {
3,318✔
1726
                info.VersionHistories = versionHistories.Data
3,318✔
1727
                info.VersionHistoriesEncoding = string(versionHistories.GetEncoding())
3,318✔
1728
        }
3,318✔
1729
        if workflowChecksum != nil {
6,636✔
1730
                info.Checksum = workflowChecksum.Data
3,318✔
1731
                info.ChecksumEncoding = string(workflowChecksum.GetEncoding())
3,318✔
1732
        }
3,318✔
1733

1734
        blob, err := parser.WorkflowExecutionInfoToBlob(info)
3,318✔
1735
        if err != nil {
3,318✔
1736
                return nil, err
×
1737
        }
×
1738

1739
        return &sqlplugin.ExecutionsRow{
3,318✔
1740
                ShardID:          shardID,
3,318✔
1741
                DomainID:         serialization.MustParseUUID(executionInfo.DomainID),
3,318✔
1742
                WorkflowID:       executionInfo.WorkflowID,
3,318✔
1743
                RunID:            serialization.MustParseUUID(executionInfo.RunID),
3,318✔
1744
                NextEventID:      int64(executionInfo.NextEventID),
3,318✔
1745
                LastWriteVersion: lastWriteVersion,
3,318✔
1746
                Data:             blob.Data,
3,318✔
1747
                DataEncoding:     string(blob.Encoding),
3,318✔
1748
        }, nil
3,318✔
1749
}
1750

1751
func createExecution(
1752
        ctx context.Context,
1753
        tx sqlplugin.Tx,
1754
        executionInfo *p.InternalWorkflowExecutionInfo,
1755
        versionHistories *p.DataBlob,
1756
        workflowChecksum *p.DataBlob,
1757
        startVersion int64,
1758
        lastWriteVersion int64,
1759
        shardID int,
1760
        parser serialization.Parser,
1761
) error {
422✔
1762

422✔
1763
        // validate workflow state & close status
422✔
1764
        if err := p.ValidateCreateWorkflowStateCloseStatus(
422✔
1765
                executionInfo.State,
422✔
1766
                executionInfo.CloseStatus); err != nil {
422✔
1767
                return err
×
1768
        }
×
1769

1770
        now := time.Now()
422✔
1771
        // TODO: this case seems to be always false
422✔
1772
        if executionInfo.StartTimestamp.IsZero() {
422✔
1773
                executionInfo.StartTimestamp = now
×
1774
        }
×
1775

1776
        row, err := buildExecutionRow(
422✔
1777
                executionInfo,
422✔
1778
                versionHistories,
422✔
1779
                workflowChecksum,
422✔
1780
                startVersion,
422✔
1781
                lastWriteVersion,
422✔
1782
                shardID,
422✔
1783
                parser,
422✔
1784
        )
422✔
1785
        if err != nil {
422✔
1786
                return err
×
1787
        }
×
1788
        result, err := tx.InsertIntoExecutions(ctx, row)
422✔
1789
        if err != nil {
424✔
1790
                if tx.IsDupEntryError(err) {
4✔
1791
                        return &p.WorkflowExecutionAlreadyStartedError{
2✔
1792
                                Msg:              fmt.Sprintf("Workflow execution already running. WorkflowId: %v", executionInfo.WorkflowID),
2✔
1793
                                StartRequestID:   executionInfo.CreateRequestID,
2✔
1794
                                RunID:            executionInfo.RunID,
2✔
1795
                                State:            executionInfo.State,
2✔
1796
                                CloseStatus:      executionInfo.CloseStatus,
2✔
1797
                                LastWriteVersion: row.LastWriteVersion,
2✔
1798
                        }
2✔
1799
                }
2✔
1800
                return convertCommonErrors(tx, "createExecution", "", err)
×
1801
        }
1802
        rowsAffected, err := result.RowsAffected()
422✔
1803
        if err != nil {
422✔
1804
                return &types.InternalServiceError{
×
1805
                        Message: fmt.Sprintf("createExecution failed. Failed to verify number of rows affected. Erorr: %v", err),
×
1806
                }
×
1807
        }
×
1808
        if rowsAffected != 1 {
422✔
1809
                return &types.EntityNotExistsError{
×
1810
                        Message: fmt.Sprintf("createExecution failed. Affected %v rows updated instead of 1.", rowsAffected),
×
1811
                }
×
1812
        }
×
1813

1814
        return nil
422✔
1815
}
1816

1817
func updateExecution(
1818
        ctx context.Context,
1819
        tx sqlplugin.Tx,
1820
        executionInfo *p.InternalWorkflowExecutionInfo,
1821
        versionHistories *p.DataBlob,
1822
        workflowChecksum *p.DataBlob,
1823
        startVersion int64,
1824
        lastWriteVersion int64,
1825
        shardID int,
1826
        parser serialization.Parser,
1827
) error {
2,898✔
1828

2,898✔
1829
        // validate workflow state & close status
2,898✔
1830
        if err := p.ValidateUpdateWorkflowStateCloseStatus(
2,898✔
1831
                executionInfo.State,
2,898✔
1832
                executionInfo.CloseStatus); err != nil {
2,898✔
1833
                return err
×
1834
        }
×
1835

1836
        row, err := buildExecutionRow(
2,898✔
1837
                executionInfo,
2,898✔
1838
                versionHistories,
2,898✔
1839
                workflowChecksum,
2,898✔
1840
                startVersion,
2,898✔
1841
                lastWriteVersion,
2,898✔
1842
                shardID,
2,898✔
1843
                parser,
2,898✔
1844
        )
2,898✔
1845
        if err != nil {
2,898✔
1846
                return err
×
1847
        }
×
1848
        result, err := tx.UpdateExecutions(ctx, row)
2,898✔
1849
        if err != nil {
2,898✔
1850
                return convertCommonErrors(tx, "updateExecution", "", err)
×
1851
        }
×
1852
        rowsAffected, err := result.RowsAffected()
2,898✔
1853
        if err != nil {
2,898✔
1854
                return &types.InternalServiceError{
×
1855
                        Message: fmt.Sprintf("updateExecution failed. Failed to verify number of rows affected. Erorr: %v", err),
×
1856
                }
×
1857
        }
×
1858
        if rowsAffected != 1 {
2,898✔
1859
                return &types.EntityNotExistsError{
×
1860
                        Message: fmt.Sprintf("updateExecution failed. Affected %v rows updated instead of 1.", rowsAffected),
×
1861
                }
×
1862
        }
×
1863

1864
        return nil
2,898✔
1865
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc