• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0189416e-1e95-4b22-9e8a-4ad1a70a4ade

10 Jul 2023 08:11PM UTC coverage: 57.022% (-0.3%) from 57.338%
0189416e-1e95-4b22-9e8a-4ad1a70a4ade

push

buildkite

web-flow
Allow to configure HTTP settings using template (#5329)

87172 of 152873 relevant lines covered (57.02%)

2487.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.86
/common/persistence/sql/sqlExecutionStore.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "bytes"
25
        "context"
26
        "database/sql"
27
        "encoding/json"
28
        "fmt"
29
        "math"
30
        "runtime/debug"
31
        "time"
32

33
        "golang.org/x/sync/errgroup"
34

35
        "github.com/uber/cadence/common"
36
        "github.com/uber/cadence/common/collection"
37
        "github.com/uber/cadence/common/log"
38
        "github.com/uber/cadence/common/log/tag"
39
        p "github.com/uber/cadence/common/persistence"
40
        "github.com/uber/cadence/common/persistence/serialization"
41
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
42
        "github.com/uber/cadence/common/types"
43
)
44

45
const (
46
        emptyWorkflowID       string = ""
47
        emptyReplicationRunID string = "30000000-5000-f000-f000-000000000000"
48
)
49

50
type sqlExecutionStore struct {
51
        sqlStore
52
        shardID int
53
}
54

55
var _ p.ExecutionStore = (*sqlExecutionStore)(nil)
56

57
// NewSQLExecutionStore creates an instance of ExecutionStore
58
func NewSQLExecutionStore(
59
        db sqlplugin.DB,
60
        logger log.Logger,
61
        shardID int,
62
        parser serialization.Parser,
63
        dc *p.DynamicConfiguration,
64
) (p.ExecutionStore, error) {
42✔
65

42✔
66
        return &sqlExecutionStore{
42✔
67
                shardID: shardID,
42✔
68
                sqlStore: sqlStore{
42✔
69
                        db:     db,
42✔
70
                        logger: logger,
42✔
71
                        parser: parser,
42✔
72
                        dc:     dc,
42✔
73
                },
42✔
74
        }, nil
42✔
75
}
42✔
76

77
// txExecuteShardLocked executes f under transaction and with read lock on shard row
78
func (m *sqlExecutionStore) txExecuteShardLocked(
79
        ctx context.Context,
80
        dbShardID int,
81
        operation string,
82
        rangeID int64,
83
        fn func(tx sqlplugin.Tx) error,
84
) error {
3,268✔
85

3,268✔
86
        return m.txExecute(ctx, dbShardID, operation, func(tx sqlplugin.Tx) error {
6,536✔
87
                if err := readLockShard(ctx, tx, m.shardID, rangeID); err != nil {
3,268✔
88
                        return err
×
89
                }
×
90
                err := fn(tx)
3,268✔
91
                if err != nil {
3,298✔
92
                        return err
30✔
93
                }
30✔
94
                return nil
3,240✔
95
        })
96
}
97

98
func (m *sqlExecutionStore) GetShardID() int {
7,706✔
99
        return m.shardID
7,706✔
100
}
7,706✔
101

102
func (m *sqlExecutionStore) CreateWorkflowExecution(
103
        ctx context.Context,
104
        request *p.InternalCreateWorkflowExecutionRequest,
105
) (response *p.CreateWorkflowExecutionResponse, err error) {
336✔
106
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
336✔
107

336✔
108
        err = m.txExecuteShardLocked(ctx, dbShardID, "CreateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
672✔
109
                response, err = m.createWorkflowExecutionTx(ctx, tx, request)
336✔
110
                return err
336✔
111
        })
336✔
112
        return
336✔
113
}
114

115
func (m *sqlExecutionStore) createWorkflowExecutionTx(
116
        ctx context.Context,
117
        tx sqlplugin.Tx,
118
        request *p.InternalCreateWorkflowExecutionRequest,
119
) (*p.CreateWorkflowExecutionResponse, error) {
336✔
120

336✔
121
        newWorkflow := request.NewWorkflowSnapshot
336✔
122
        executionInfo := newWorkflow.ExecutionInfo
336✔
123
        startVersion := newWorkflow.StartVersion
336✔
124
        lastWriteVersion := newWorkflow.LastWriteVersion
336✔
125
        shardID := m.shardID
336✔
126
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
336✔
127
        workflowID := executionInfo.WorkflowID
336✔
128
        runID := serialization.MustParseUUID(executionInfo.RunID)
336✔
129

336✔
130
        if err := p.ValidateCreateWorkflowModeState(
336✔
131
                request.Mode,
336✔
132
                newWorkflow,
336✔
133
        ); err != nil {
336✔
134
                return nil, err
×
135
        }
×
136

137
        var err error
336✔
138
        var row *sqlplugin.CurrentExecutionsRow
336✔
139
        if row, err = lockCurrentExecutionIfExists(ctx, tx, m.shardID, domainID, workflowID); err != nil {
336✔
140
                return nil, err
×
141
        }
×
142

143
        // current workflow record check
144
        if row != nil {
386✔
145
                // current run ID, last write version, current workflow state check
50✔
146
                switch request.Mode {
50✔
147
                case p.CreateWorkflowModeBrandNew:
28✔
148
                        return nil, &p.WorkflowExecutionAlreadyStartedError{
28✔
149
                                Msg:              fmt.Sprintf("Workflow execution already running. WorkflowId: %v", row.WorkflowID),
28✔
150
                                StartRequestID:   row.CreateRequestID,
28✔
151
                                RunID:            row.RunID.String(),
28✔
152
                                State:            int(row.State),
28✔
153
                                CloseStatus:      int(row.CloseStatus),
28✔
154
                                LastWriteVersion: row.LastWriteVersion,
28✔
155
                        }
28✔
156

157
                case p.CreateWorkflowModeWorkflowIDReuse:
20✔
158
                        if request.PreviousLastWriteVersion != row.LastWriteVersion {
20✔
159
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
160
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
161
                                                "LastWriteVersion: %v, PreviousLastWriteVersion: %v",
×
162
                                                workflowID, row.LastWriteVersion, request.PreviousLastWriteVersion),
×
163
                                }
×
164
                        }
×
165
                        if row.State != p.WorkflowStateCompleted {
20✔
166
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
167
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
168
                                                "State: %v, Expected: %v",
×
169
                                                workflowID, row.State, p.WorkflowStateCompleted),
×
170
                                }
×
171
                        }
×
172
                        runIDStr := row.RunID.String()
20✔
173
                        if runIDStr != request.PreviousRunID {
20✔
174
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
175
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
176
                                                "RunID: %v, PreviousRunID: %v",
×
177
                                                workflowID, runIDStr, request.PreviousRunID),
×
178
                                }
×
179
                        }
×
180

181
                case p.CreateWorkflowModeZombie:
2✔
182
                        // zombie workflow creation with existence of current record, this is a noop
2✔
183
                        if err := assertRunIDMismatch(serialization.MustParseUUID(executionInfo.RunID), row.RunID); err != nil {
2✔
184
                                return nil, err
×
185
                        }
×
186

187
                case p.CreateWorkflowModeContinueAsNew:
2✔
188
                        // continueAsNew mode expects a current run exists
2✔
189
                        if err := assertRunIDMismatch(serialization.MustParseUUID(executionInfo.RunID), row.RunID); err != nil {
2✔
190
                                return nil, err
×
191
                        }
×
192

193
                default:
×
194
                        return nil, &types.InternalServiceError{
×
195
                                Message: fmt.Sprintf(
×
196
                                        "CreteWorkflowExecution: unknown mode: %v",
×
197
                                        request.Mode,
×
198
                                ),
×
199
                        }
×
200
                }
201
        }
202

203
        if err := createOrUpdateCurrentExecution(
308✔
204
                ctx,
308✔
205
                tx,
308✔
206
                request.Mode,
308✔
207
                m.shardID,
308✔
208
                domainID,
308✔
209
                workflowID,
308✔
210
                runID,
308✔
211
                executionInfo.State,
308✔
212
                executionInfo.CloseStatus,
308✔
213
                executionInfo.CreateRequestID,
308✔
214
                startVersion,
308✔
215
                lastWriteVersion); err != nil {
308✔
216
                return nil, err
×
217
        }
×
218

219
        if err := m.applyWorkflowSnapshotTxAsNew(ctx, tx, shardID, &request.NewWorkflowSnapshot, m.parser); err != nil {
310✔
220
                return nil, err
2✔
221
        }
2✔
222

223
        return &p.CreateWorkflowExecutionResponse{}, nil
308✔
224
}
225

226
func (m *sqlExecutionStore) getExecutions(
227
        ctx context.Context,
228
        request *p.InternalGetWorkflowExecutionRequest,
229
        domainID serialization.UUID,
230
        wfID string,
231
        runID serialization.UUID,
232
) ([]sqlplugin.ExecutionsRow, error) {
758✔
233
        executions, err := m.db.SelectFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
758✔
234
                ShardID: m.shardID, DomainID: domainID, WorkflowID: wfID, RunID: runID})
758✔
235

758✔
236
        if err != nil {
1,024✔
237
                if err == sql.ErrNoRows {
532✔
238
                        return nil, &types.EntityNotExistsError{
266✔
239
                                Message: fmt.Sprintf(
266✔
240
                                        "Workflow execution not found.  WorkflowId: %v, RunId: %v",
266✔
241
                                        request.Execution.GetWorkflowID(),
266✔
242
                                        request.Execution.GetRunID(),
266✔
243
                                ),
266✔
244
                        }
266✔
245
                }
266✔
246
                return nil, convertCommonErrors(m.db, "GetWorkflowExecution", "", err)
×
247
        }
248

249
        if len(executions) == 0 {
494✔
250
                return nil, &types.EntityNotExistsError{
×
251
                        Message: fmt.Sprintf(
×
252
                                "Workflow execution not found.  WorkflowId: %v, RunId: %v",
×
253
                                request.Execution.GetWorkflowID(),
×
254
                                request.Execution.GetRunID(),
×
255
                        ),
×
256
                }
×
257
        }
×
258

259
        if len(executions) != 1 {
494✔
260
                return nil, &types.InternalServiceError{
×
261
                        Message: "GetWorkflowExecution return more than one results.",
×
262
                }
×
263
        }
×
264
        return executions, nil
494✔
265
}
266

267
func (m *sqlExecutionStore) GetWorkflowExecution(
268
        ctx context.Context,
269
        request *p.InternalGetWorkflowExecutionRequest,
270
) (resp *p.InternalGetWorkflowExecutionResponse, e error) {
758✔
271
        recoverPanic := func(recovered interface{}, err *error) {
6,808✔
272
                if recovered != nil {
6,050✔
273
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
274
                }
×
275
        }
276

277
        domainID := serialization.MustParseUUID(request.DomainID)
758✔
278
        runID := serialization.MustParseUUID(request.Execution.RunID)
758✔
279
        wfID := request.Execution.WorkflowID
758✔
280

758✔
281
        var executions []sqlplugin.ExecutionsRow
758✔
282
        var activityInfos map[int64]*p.InternalActivityInfo
758✔
283
        var timerInfos map[string]*p.TimerInfo
758✔
284
        var childExecutionInfos map[int64]*p.InternalChildExecutionInfo
758✔
285
        var requestCancelInfos map[int64]*p.RequestCancelInfo
758✔
286
        var signalInfos map[int64]*p.SignalInfo
758✔
287
        var bufferedEvents []*p.DataBlob
758✔
288
        var signalsRequested map[string]struct{}
758✔
289

758✔
290
        g, ctx := errgroup.WithContext(ctx)
758✔
291

758✔
292
        g.Go(func() (e error) {
1,516✔
293
                defer func() { recoverPanic(recover(), &e) }()
1,516✔
294
                executions, e = m.getExecutions(ctx, request, domainID, wfID, runID)
758✔
295
                return e
758✔
296
        })
297

298
        g.Go(func() (e error) {
1,516✔
299
                defer func() { recoverPanic(recover(), &e) }()
1,516✔
300
                activityInfos, e = getActivityInfoMap(
758✔
301
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
758✔
302
                return e
758✔
303
        })
304

305
        g.Go(func() (e error) {
1,516✔
306
                defer func() { recoverPanic(recover(), &e) }()
1,516✔
307
                timerInfos, e = getTimerInfoMap(
758✔
308
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
758✔
309
                return e
758✔
310
        })
311

312
        g.Go(func() (e error) {
1,516✔
313
                defer func() { recoverPanic(recover(), &e) }()
1,516✔
314
                childExecutionInfos, e = getChildExecutionInfoMap(
758✔
315
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
758✔
316
                return e
758✔
317
        })
318

319
        g.Go(func() (e error) {
1,516✔
320
                defer func() { recoverPanic(recover(), &e) }()
1,516✔
321
                requestCancelInfos, e = getRequestCancelInfoMap(
758✔
322
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
758✔
323
                return e
758✔
324
        })
325

326
        g.Go(func() (e error) {
1,516✔
327
                defer func() { recoverPanic(recover(), &e) }()
1,516✔
328
                signalInfos, e = getSignalInfoMap(
758✔
329
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
758✔
330
                return e
758✔
331
        })
332

333
        g.Go(func() (e error) {
1,516✔
334
                defer func() { recoverPanic(recover(), &e) }()
1,516✔
335
                bufferedEvents, e = getBufferedEvents(
758✔
336
                        ctx, m.db, m.shardID, domainID, wfID, runID)
758✔
337
                return e
758✔
338
        })
339

340
        g.Go(func() (e error) {
1,516✔
341
                defer func() { recoverPanic(recover(), &e) }()
1,516✔
342
                signalsRequested, e = getSignalsRequested(
758✔
343
                        ctx, m.db, m.shardID, domainID, wfID, runID)
758✔
344
                return e
758✔
345
        })
346

347
        err := g.Wait()
758✔
348
        if err != nil {
1,024✔
349
                return nil, err
266✔
350
        }
266✔
351

352
        state, err := m.populateWorkflowMutableState(executions[0])
494✔
353
        if err != nil {
494✔
354
                return nil, &types.InternalServiceError{
×
355
                        Message: fmt.Sprintf("GetWorkflowExecution: failed. Error: %v", err),
×
356
                }
×
357
        }
×
358
        state.ActivityInfos = activityInfos
494✔
359
        state.TimerInfos = timerInfos
494✔
360
        state.ChildExecutionInfos = childExecutionInfos
494✔
361
        state.RequestCancelInfos = requestCancelInfos
494✔
362
        state.SignalInfos = signalInfos
494✔
363
        state.BufferedEvents = bufferedEvents
494✔
364
        state.SignalRequestedIDs = signalsRequested
494✔
365

494✔
366
        return &p.InternalGetWorkflowExecutionResponse{State: state}, nil
494✔
367
}
368

369
func (m *sqlExecutionStore) UpdateWorkflowExecution(
370
        ctx context.Context,
371
        request *p.InternalUpdateWorkflowExecutionRequest,
372
) error {
2,934✔
373
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
2,934✔
374
        return m.txExecuteShardLocked(ctx, dbShardID, "UpdateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
5,868✔
375
                return m.updateWorkflowExecutionTx(ctx, tx, request)
2,934✔
376
        })
2,934✔
377
}
378

379
func (m *sqlExecutionStore) updateWorkflowExecutionTx(
380
        ctx context.Context,
381
        tx sqlplugin.Tx,
382
        request *p.InternalUpdateWorkflowExecutionRequest,
383
) error {
2,934✔
384

2,934✔
385
        updateWorkflow := request.UpdateWorkflowMutation
2,934✔
386
        newWorkflow := request.NewWorkflowSnapshot
2,934✔
387

2,934✔
388
        executionInfo := updateWorkflow.ExecutionInfo
2,934✔
389
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
2,934✔
390
        workflowID := executionInfo.WorkflowID
2,934✔
391
        runID := serialization.MustParseUUID(executionInfo.RunID)
2,934✔
392
        shardID := m.shardID
2,934✔
393

2,934✔
394
        if err := p.ValidateUpdateWorkflowModeState(
2,934✔
395
                request.Mode,
2,934✔
396
                updateWorkflow,
2,934✔
397
                newWorkflow,
2,934✔
398
        ); err != nil {
2,934✔
399
                return err
×
400
        }
×
401

402
        switch request.Mode {
2,934✔
403
        case p.UpdateWorkflowModeIgnoreCurrent:
×
404
                // no-op
405
        case p.UpdateWorkflowModeBypassCurrent:
2✔
406
                if err := assertNotCurrentExecution(
2✔
407
                        ctx,
2✔
408
                        tx,
2✔
409
                        shardID,
2✔
410
                        domainID,
2✔
411
                        workflowID,
2✔
412
                        runID); err != nil {
2✔
413
                        return err
×
414
                }
×
415

416
        case p.UpdateWorkflowModeUpdateCurrent:
2,934✔
417
                if newWorkflow != nil {
3,052✔
418
                        newExecutionInfo := newWorkflow.ExecutionInfo
118✔
419
                        startVersion := newWorkflow.StartVersion
118✔
420
                        lastWriteVersion := newWorkflow.LastWriteVersion
118✔
421
                        newDomainID := serialization.MustParseUUID(newExecutionInfo.DomainID)
118✔
422
                        newRunID := serialization.MustParseUUID(newExecutionInfo.RunID)
118✔
423

118✔
424
                        if !bytes.Equal(domainID, newDomainID) {
118✔
425
                                return &types.InternalServiceError{
×
426
                                        Message: "UpdateWorkflowExecution: cannot continue as new to another domain",
×
427
                                }
×
428
                        }
×
429

430
                        if err := assertRunIDAndUpdateCurrentExecution(
118✔
431
                                ctx,
118✔
432
                                tx,
118✔
433
                                shardID,
118✔
434
                                domainID,
118✔
435
                                workflowID,
118✔
436
                                newRunID,
118✔
437
                                runID,
118✔
438
                                newWorkflow.ExecutionInfo.CreateRequestID,
118✔
439
                                newWorkflow.ExecutionInfo.State,
118✔
440
                                newWorkflow.ExecutionInfo.CloseStatus,
118✔
441
                                startVersion,
118✔
442
                                lastWriteVersion); err != nil {
118✔
443
                                return err
×
444
                        }
×
445
                } else {
2,818✔
446
                        startVersion := updateWorkflow.StartVersion
2,818✔
447
                        lastWriteVersion := updateWorkflow.LastWriteVersion
2,818✔
448
                        // this is only to update the current record
2,818✔
449
                        if err := assertRunIDAndUpdateCurrentExecution(
2,818✔
450
                                ctx,
2,818✔
451
                                tx,
2,818✔
452
                                shardID,
2,818✔
453
                                domainID,
2,818✔
454
                                workflowID,
2,818✔
455
                                runID,
2,818✔
456
                                runID,
2,818✔
457
                                executionInfo.CreateRequestID,
2,818✔
458
                                executionInfo.State,
2,818✔
459
                                executionInfo.CloseStatus,
2,818✔
460
                                startVersion,
2,818✔
461
                                lastWriteVersion); err != nil {
2,818✔
462
                                return err
×
463
                        }
×
464
                }
465

466
        default:
×
467
                return &types.InternalServiceError{
×
468
                        Message: fmt.Sprintf("UpdateWorkflowExecution: unknown mode: %v", request.Mode),
×
469
                }
×
470
        }
471

472
        if m.useAsyncTransaction() { // async transaction is enabled
2,934✔
473
                // TODO: it's possible to merge some operations in the following 2 functions in a batch, should refactor the code later
×
474
                if err := applyWorkflowMutationAsyncTx(ctx, tx, shardID, &updateWorkflow, m.parser); err != nil {
×
475
                        return err
×
476
                }
×
477
                if newWorkflow != nil {
×
478
                        if err := m.applyWorkflowSnapshotAsyncTxAsNew(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
×
479
                                return err
×
480
                        }
×
481
                }
482
                return nil
×
483
        }
484

485
        if err := applyWorkflowMutationTx(ctx, tx, shardID, &updateWorkflow, m.parser); err != nil {
2,934✔
486
                return err
×
487
        }
×
488
        if newWorkflow != nil {
3,052✔
489
                if err := m.applyWorkflowSnapshotTxAsNew(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
118✔
490
                        return err
×
491
                }
×
492
        }
493
        return nil
2,934✔
494
}
495

496
func (m *sqlExecutionStore) ConflictResolveWorkflowExecution(
497
        ctx context.Context,
498
        request *p.InternalConflictResolveWorkflowExecutionRequest,
499
) error {
2✔
500
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
2✔
501
        return m.txExecuteShardLocked(ctx, dbShardID, "ConflictResolveWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
4✔
502
                return m.conflictResolveWorkflowExecutionTx(ctx, tx, request)
2✔
503
        })
2✔
504
}
505

506
func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx(
507
        ctx context.Context,
508
        tx sqlplugin.Tx,
509
        request *p.InternalConflictResolveWorkflowExecutionRequest,
510
) error {
2✔
511

2✔
512
        currentWorkflow := request.CurrentWorkflowMutation
2✔
513
        resetWorkflow := request.ResetWorkflowSnapshot
2✔
514
        newWorkflow := request.NewWorkflowSnapshot
2✔
515

2✔
516
        shardID := m.shardID
2✔
517

2✔
518
        domainID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.DomainID)
2✔
519
        workflowID := resetWorkflow.ExecutionInfo.WorkflowID
2✔
520

2✔
521
        if err := p.ValidateConflictResolveWorkflowModeState(
2✔
522
                request.Mode,
2✔
523
                resetWorkflow,
2✔
524
                newWorkflow,
2✔
525
                currentWorkflow,
2✔
526
        ); err != nil {
2✔
527
                return err
×
528
        }
×
529

530
        switch request.Mode {
2✔
531
        case p.ConflictResolveWorkflowModeBypassCurrent:
2✔
532
                if err := assertNotCurrentExecution(
2✔
533
                        ctx,
2✔
534
                        tx,
2✔
535
                        shardID,
2✔
536
                        domainID,
2✔
537
                        workflowID,
2✔
538
                        serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)); err != nil {
2✔
539
                        return err
×
540
                }
×
541

542
        case p.ConflictResolveWorkflowModeUpdateCurrent:
2✔
543
                executionInfo := resetWorkflow.ExecutionInfo
2✔
544
                startVersion := resetWorkflow.StartVersion
2✔
545
                lastWriteVersion := resetWorkflow.LastWriteVersion
2✔
546
                if newWorkflow != nil {
2✔
547
                        executionInfo = newWorkflow.ExecutionInfo
×
548
                        startVersion = newWorkflow.StartVersion
×
549
                        lastWriteVersion = newWorkflow.LastWriteVersion
×
550
                }
×
551
                runID := serialization.MustParseUUID(executionInfo.RunID)
2✔
552
                createRequestID := executionInfo.CreateRequestID
2✔
553
                state := executionInfo.State
2✔
554
                closeStatus := executionInfo.CloseStatus
2✔
555

2✔
556
                if currentWorkflow != nil {
2✔
557
                        prevRunID := serialization.MustParseUUID(currentWorkflow.ExecutionInfo.RunID)
×
558

×
559
                        if err := assertRunIDAndUpdateCurrentExecution(
×
560
                                ctx,
×
561
                                tx,
×
562
                                m.shardID,
×
563
                                domainID,
×
564
                                workflowID,
×
565
                                runID,
×
566
                                prevRunID,
×
567
                                createRequestID,
×
568
                                state,
×
569
                                closeStatus,
×
570
                                startVersion,
×
571
                                lastWriteVersion); err != nil {
×
572
                                return err
×
573
                        }
×
574
                } else {
2✔
575
                        // reset workflow is current
2✔
576
                        prevRunID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)
2✔
577

2✔
578
                        if err := assertRunIDAndUpdateCurrentExecution(
2✔
579
                                ctx,
2✔
580
                                tx,
2✔
581
                                m.shardID,
2✔
582
                                domainID,
2✔
583
                                workflowID,
2✔
584
                                runID,
2✔
585
                                prevRunID,
2✔
586
                                createRequestID,
2✔
587
                                state,
2✔
588
                                closeStatus,
2✔
589
                                startVersion,
2✔
590
                                lastWriteVersion); err != nil {
2✔
591
                                return err
×
592
                        }
×
593
                }
594

595
        default:
×
596
                return &types.InternalServiceError{
×
597
                        Message: fmt.Sprintf("ConflictResolveWorkflowExecution: unknown mode: %v", request.Mode),
×
598
                }
×
599
        }
600

601
        if err := applyWorkflowSnapshotTxAsReset(ctx, tx, shardID, &resetWorkflow, m.parser); err != nil {
2✔
602
                return err
×
603
        }
×
604
        if currentWorkflow != nil {
2✔
605
                if err := applyWorkflowMutationTx(ctx, tx, shardID, currentWorkflow, m.parser); err != nil {
×
606
                        return err
×
607
                }
×
608
        }
609
        if newWorkflow != nil {
2✔
610
                if err := m.applyWorkflowSnapshotTxAsNew(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
×
611
                        return err
×
612
                }
×
613
        }
614
        return nil
2✔
615
}
616

617
func (m *sqlExecutionStore) DeleteWorkflowExecution(
618
        ctx context.Context,
619
        request *p.DeleteWorkflowExecutionRequest,
620
) error {
36✔
621
        recoverPanic := func(recovered interface{}, err *error) {
310✔
622
                if recovered != nil {
274✔
623
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
624
                }
×
625
        }
626
        domainID := serialization.MustParseUUID(request.DomainID)
36✔
627
        runID := serialization.MustParseUUID(request.RunID)
36✔
628
        wfID := request.WorkflowID
36✔
629
        g, ctx := errgroup.WithContext(ctx)
36✔
630

36✔
631
        g.Go(func() (e error) {
72✔
632
                defer func() { recoverPanic(recover(), &e) }()
72✔
633
                _, e = m.db.DeleteFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
36✔
634
                        ShardID:    m.shardID,
36✔
635
                        DomainID:   domainID,
36✔
636
                        WorkflowID: wfID,
36✔
637
                        RunID:      runID,
36✔
638
                })
36✔
639
                return e
36✔
640
        })
641

642
        g.Go(func() (e error) {
72✔
643
                defer func() { recoverPanic(recover(), &e) }()
72✔
644
                _, e = m.db.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
36✔
645
                        ShardID:    int64(m.shardID),
36✔
646
                        DomainID:   domainID,
36✔
647
                        WorkflowID: wfID,
36✔
648
                        RunID:      runID,
36✔
649
                })
36✔
650
                return e
36✔
651
        })
652

653
        g.Go(func() (e error) {
72✔
654
                defer func() { recoverPanic(recover(), &e) }()
72✔
655
                _, e = m.db.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
36✔
656
                        ShardID:    int64(m.shardID),
36✔
657
                        DomainID:   domainID,
36✔
658
                        WorkflowID: wfID,
36✔
659
                        RunID:      runID,
36✔
660
                })
36✔
661
                return e
36✔
662
        })
663

664
        g.Go(func() (e error) {
72✔
665
                defer func() { recoverPanic(recover(), &e) }()
72✔
666
                _, e = m.db.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
36✔
667
                        ShardID:    int64(m.shardID),
36✔
668
                        DomainID:   domainID,
36✔
669
                        WorkflowID: wfID,
36✔
670
                        RunID:      runID,
36✔
671
                })
36✔
672
                return e
36✔
673
        })
674

675
        g.Go(func() (e error) {
72✔
676
                defer func() { recoverPanic(recover(), &e) }()
72✔
677
                _, e = m.db.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
36✔
678
                        ShardID:    int64(m.shardID),
36✔
679
                        DomainID:   domainID,
36✔
680
                        WorkflowID: wfID,
36✔
681
                        RunID:      runID,
36✔
682
                })
36✔
683
                return e
36✔
684
        })
685

686
        g.Go(func() (e error) {
72✔
687
                defer func() { recoverPanic(recover(), &e) }()
72✔
688
                _, e = m.db.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
36✔
689
                        ShardID:    int64(m.shardID),
36✔
690
                        DomainID:   domainID,
36✔
691
                        WorkflowID: wfID,
36✔
692
                        RunID:      runID,
36✔
693
                })
36✔
694
                return e
36✔
695
        })
696

697
        g.Go(func() (e error) {
72✔
698
                defer func() { recoverPanic(recover(), &e) }()
72✔
699
                _, e = m.db.DeleteFromBufferedEvents(ctx, &sqlplugin.BufferedEventsFilter{
36✔
700
                        ShardID:    m.shardID,
36✔
701
                        DomainID:   domainID,
36✔
702
                        WorkflowID: wfID,
36✔
703
                        RunID:      runID,
36✔
704
                })
36✔
705
                return e
36✔
706
        })
707

708
        g.Go(func() (e error) {
72✔
709
                defer func() { recoverPanic(recover(), &e) }()
72✔
710
                _, e = m.db.DeleteFromSignalsRequestedSets(ctx, &sqlplugin.SignalsRequestedSetsFilter{
36✔
711
                        ShardID:    int64(m.shardID),
36✔
712
                        DomainID:   domainID,
36✔
713
                        WorkflowID: wfID,
36✔
714
                        RunID:      runID,
36✔
715
                })
36✔
716
                return e
36✔
717
        })
718
        return g.Wait()
36✔
719
}
720

721
// its possible for a new run of the same workflow to have started after the run we are deleting
722
// here was finished. In that case, current_executions table will have the same workflowID but different
723
// runID. The following code will delete the row from current_executions if and only if the runID is
724
// same as the one we are trying to delete here
725
func (m *sqlExecutionStore) DeleteCurrentWorkflowExecution(
726
        ctx context.Context,
727
        request *p.DeleteCurrentWorkflowExecutionRequest,
728
) error {
36✔
729

36✔
730
        domainID := serialization.MustParseUUID(request.DomainID)
36✔
731
        runID := serialization.MustParseUUID(request.RunID)
36✔
732
        _, err := m.db.DeleteFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
36✔
733
                ShardID:    int64(m.shardID),
36✔
734
                DomainID:   domainID,
36✔
735
                WorkflowID: request.WorkflowID,
36✔
736
                RunID:      runID,
36✔
737
        })
36✔
738
        if err != nil {
36✔
739
                return convertCommonErrors(m.db, "DeleteCurrentWorkflowExecution", "", err)
×
740
        }
×
741
        return nil
36✔
742
}
743

744
func (m *sqlExecutionStore) GetCurrentExecution(
745
        ctx context.Context,
746
        request *p.GetCurrentExecutionRequest,
747
) (*p.GetCurrentExecutionResponse, error) {
122✔
748

122✔
749
        row, err := m.db.SelectFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
122✔
750
                ShardID:    int64(m.shardID),
122✔
751
                DomainID:   serialization.MustParseUUID(request.DomainID),
122✔
752
                WorkflowID: request.WorkflowID,
122✔
753
        })
122✔
754
        if err != nil {
130✔
755
                return nil, convertCommonErrors(m.db, "GetCurrentExecution", "", err)
8✔
756
        }
8✔
757
        return &p.GetCurrentExecutionResponse{
116✔
758
                StartRequestID:   row.CreateRequestID,
116✔
759
                RunID:            row.RunID.String(),
116✔
760
                State:            int(row.State),
116✔
761
                CloseStatus:      int(row.CloseStatus),
116✔
762
                LastWriteVersion: row.LastWriteVersion,
116✔
763
        }, nil
116✔
764
}
765

766
func (m *sqlExecutionStore) ListCurrentExecutions(
767
        _ context.Context,
768
        _ *p.ListCurrentExecutionsRequest,
769
) (*p.ListCurrentExecutionsResponse, error) {
×
770
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
771
}
×
772

773
func (m *sqlExecutionStore) IsWorkflowExecutionExists(
774
        _ context.Context,
775
        _ *p.IsWorkflowExecutionExistsRequest,
776
) (*p.IsWorkflowExecutionExistsResponse, error) {
×
777
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
778
}
×
779

780
func (m *sqlExecutionStore) ListConcreteExecutions(
781
        ctx context.Context,
782
        request *p.ListConcreteExecutionsRequest,
783
) (*p.InternalListConcreteExecutionsResponse, error) {
×
784

×
785
        filter := &sqlplugin.ExecutionsFilter{}
×
786
        if len(request.PageToken) > 0 {
×
787
                err := gobDeserialize(request.PageToken, &filter)
×
788
                if err != nil {
×
789
                        return nil, &types.InternalServiceError{
×
790
                                Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
791
                        }
×
792
                }
×
793
        } else {
×
794
                filter = &sqlplugin.ExecutionsFilter{
×
795
                        ShardID:    m.shardID,
×
796
                        WorkflowID: "",
×
797
                }
×
798
        }
×
799
        filter.Size = request.PageSize
×
800

×
801
        executions, err := m.db.SelectFromExecutions(ctx, filter)
×
802
        if err != nil {
×
803
                if err == sql.ErrNoRows {
×
804
                        return &p.InternalListConcreteExecutionsResponse{}, nil
×
805
                }
×
806
                return nil, convertCommonErrors(m.db, "ListConcreteExecutions", "", err)
×
807
        }
808

809
        if len(executions) == 0 {
×
810
                return &p.InternalListConcreteExecutionsResponse{}, nil
×
811
        }
×
812
        lastExecution := executions[len(executions)-1]
×
813
        nextFilter := &sqlplugin.ExecutionsFilter{
×
814
                ShardID:    m.shardID,
×
815
                WorkflowID: lastExecution.WorkflowID,
×
816
        }
×
817
        token, err := gobSerialize(nextFilter)
×
818
        if err != nil {
×
819
                return nil, &types.InternalServiceError{
×
820
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
821
                }
×
822
        }
×
823
        concreteExecutions, err := m.populateInternalListConcreteExecutions(executions)
×
824
        if err != nil {
×
825
                return nil, &types.InternalServiceError{
×
826
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
827
                }
×
828
        }
×
829

830
        return &p.InternalListConcreteExecutionsResponse{
×
831
                Executions:    concreteExecutions,
×
832
                NextPageToken: token,
×
833
        }, nil
×
834
}
835

836
func (m *sqlExecutionStore) GetTransferTasks(
837
        ctx context.Context,
838
        request *p.GetTransferTasksRequest,
839
) (*p.GetTransferTasksResponse, error) {
1,544✔
840
        minReadLevel := request.ReadLevel
1,544✔
841
        if len(request.NextPageToken) > 0 {
1,544✔
842
                readLevel, err := deserializePageToken(request.NextPageToken)
×
843
                if err != nil {
×
844
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "failed to deserialize page token", err)
×
845
                }
×
846
                minReadLevel = readLevel
×
847
        }
848
        rows, err := m.db.SelectFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
1,544✔
849
                ShardID:   m.shardID,
1,544✔
850
                MinTaskID: minReadLevel,
1,544✔
851
                MaxTaskID: request.MaxReadLevel,
1,544✔
852
                PageSize:  request.BatchSize,
1,544✔
853
        })
1,544✔
854
        if err != nil {
1,544✔
855
                if err != sql.ErrNoRows {
×
856
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "", err)
×
857
                }
×
858
        }
859
        resp := &p.GetTransferTasksResponse{Tasks: make([]*p.TransferTaskInfo, len(rows))}
1,544✔
860
        for i, row := range rows {
5,273✔
861
                info, err := m.parser.TransferTaskInfoFromBlob(row.Data, row.DataEncoding)
3,729✔
862
                if err != nil {
3,729✔
863
                        return nil, err
×
864
                }
×
865
                resp.Tasks[i] = &p.TransferTaskInfo{
3,729✔
866
                        TaskID:                  row.TaskID,
3,729✔
867
                        DomainID:                info.DomainID.String(),
3,729✔
868
                        WorkflowID:              info.GetWorkflowID(),
3,729✔
869
                        RunID:                   info.RunID.String(),
3,729✔
870
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
3,729✔
871
                        TargetDomainID:          info.TargetDomainID.String(),
3,729✔
872
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
3,729✔
873
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
3,729✔
874
                        TargetRunID:             info.TargetRunID.String(),
3,729✔
875
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
3,729✔
876
                        TaskList:                info.GetTaskList(),
3,729✔
877
                        TaskType:                int(info.GetTaskType()),
3,729✔
878
                        ScheduleID:              info.GetScheduleID(),
3,729✔
879
                        Version:                 info.GetVersion(),
3,729✔
880
                }
3,729✔
881
        }
882
        if len(rows) > 0 {
3,024✔
883
                lastTaskID := rows[len(rows)-1].TaskID
1,480✔
884
                if lastTaskID < request.MaxReadLevel {
1,484✔
885
                        resp.NextPageToken = serializePageToken(lastTaskID)
4✔
886
                }
4✔
887
        }
888
        return resp, nil
1,544✔
889
}
890

891
func (m *sqlExecutionStore) CompleteTransferTask(
892
        ctx context.Context,
893
        request *p.CompleteTransferTaskRequest,
894
) error {
×
895

×
896
        if _, err := m.db.DeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
×
897
                ShardID: m.shardID,
×
898
                TaskID:  request.TaskID,
×
899
        }); err != nil {
×
900
                return convertCommonErrors(m.db, "CompleteTransferTask", "", err)
×
901
        }
×
902
        return nil
×
903
}
904

905
func (m *sqlExecutionStore) RangeCompleteTransferTask(
906
        ctx context.Context,
907
        request *p.RangeCompleteTransferTaskRequest,
908
) (*p.RangeCompleteTransferTaskResponse, error) {
74✔
909
        result, err := m.db.RangeDeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
74✔
910
                ShardID:   m.shardID,
74✔
911
                MinTaskID: request.ExclusiveBeginTaskID,
74✔
912
                MaxTaskID: request.InclusiveEndTaskID,
74✔
913
                PageSize:  request.PageSize,
74✔
914
        })
74✔
915
        if err != nil {
74✔
916
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
×
917
        }
×
918
        rowsDeleted, err := result.RowsAffected()
74✔
919
        if err != nil {
74✔
920
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
×
921
        }
×
922
        return &p.RangeCompleteTransferTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
74✔
923
}
924

925
func (m *sqlExecutionStore) GetCrossClusterTasks(
926
        ctx context.Context,
927
        request *p.GetCrossClusterTasksRequest,
928
) (*p.GetCrossClusterTasksResponse, error) {
106✔
929
        minReadLevel := request.ReadLevel
106✔
930
        if len(request.NextPageToken) > 0 {
106✔
931
                readLevel, err := deserializePageToken(request.NextPageToken)
×
932
                if err != nil {
×
933
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "failed to deserialize page token", err)
×
934
                }
×
935
                minReadLevel = readLevel
×
936
        }
937
        rows, err := m.db.SelectFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
106✔
938
                TargetCluster: request.TargetCluster,
106✔
939
                ShardID:       m.shardID,
106✔
940
                MinTaskID:     minReadLevel,
106✔
941
                MaxTaskID:     request.MaxReadLevel,
106✔
942
                PageSize:      request.BatchSize,
106✔
943
        })
106✔
944
        if err != nil {
106✔
945
                if err != sql.ErrNoRows {
×
946
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "", err)
×
947
                }
×
948
        }
949
        resp := &p.GetCrossClusterTasksResponse{Tasks: make([]*p.CrossClusterTaskInfo, len(rows))}
106✔
950
        for i, row := range rows {
106✔
951
                info, err := m.parser.CrossClusterTaskInfoFromBlob(row.Data, row.DataEncoding)
×
952
                if err != nil {
×
953
                        return nil, err
×
954
                }
×
955
                resp.Tasks[i] = &p.CrossClusterTaskInfo{
×
956
                        TaskID:                  row.TaskID,
×
957
                        DomainID:                info.DomainID.String(),
×
958
                        WorkflowID:              info.GetWorkflowID(),
×
959
                        RunID:                   info.RunID.String(),
×
960
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
×
961
                        TargetDomainID:          info.TargetDomainID.String(),
×
962
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
×
963
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
×
964
                        TargetRunID:             info.TargetRunID.String(),
×
965
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
×
966
                        TaskList:                info.GetTaskList(),
×
967
                        TaskType:                int(info.GetTaskType()),
×
968
                        ScheduleID:              info.GetScheduleID(),
×
969
                        Version:                 info.GetVersion(),
×
970
                }
×
971
        }
972
        if len(rows) > 0 {
106✔
973
                lastTaskID := rows[len(rows)-1].TaskID
×
974
                if lastTaskID < request.MaxReadLevel {
×
975
                        resp.NextPageToken = serializePageToken(lastTaskID)
×
976
                }
×
977
        }
978
        return resp, nil
106✔
979

980
}
981

982
func (m *sqlExecutionStore) CompleteCrossClusterTask(
983
        ctx context.Context,
984
        request *p.CompleteCrossClusterTaskRequest,
985
) error {
×
986
        if _, err := m.db.DeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
×
987
                TargetCluster: request.TargetCluster,
×
988
                ShardID:       m.shardID,
×
989
                TaskID:        request.TaskID,
×
990
        }); err != nil {
×
991
                return convertCommonErrors(m.db, "CompleteCrossClusterTask", "", err)
×
992
        }
×
993
        return nil
×
994
}
995

996
func (m *sqlExecutionStore) RangeCompleteCrossClusterTask(
997
        ctx context.Context,
998
        request *p.RangeCompleteCrossClusterTaskRequest,
999
) (*p.RangeCompleteCrossClusterTaskResponse, error) {
90✔
1000
        result, err := m.db.RangeDeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
90✔
1001
                TargetCluster: request.TargetCluster,
90✔
1002
                ShardID:       m.shardID,
90✔
1003
                MinTaskID:     request.ExclusiveBeginTaskID,
90✔
1004
                MaxTaskID:     request.InclusiveEndTaskID,
90✔
1005
                PageSize:      request.PageSize,
90✔
1006
        })
90✔
1007
        if err != nil {
90✔
1008
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
×
1009
        }
×
1010
        rowsDeleted, err := result.RowsAffected()
90✔
1011
        if err != nil {
90✔
1012
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
×
1013
        }
×
1014
        return &p.RangeCompleteCrossClusterTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
90✔
1015
}
1016

1017
func (m *sqlExecutionStore) GetReplicationTasks(
1018
        ctx context.Context,
1019
        request *p.GetReplicationTasksRequest,
1020
) (*p.InternalGetReplicationTasksResponse, error) {
82✔
1021

82✔
1022
        readLevel, maxReadLevelInclusive, err := getReadLevels(request)
82✔
1023
        if err != nil {
82✔
1024
                return nil, err
×
1025
        }
×
1026

1027
        rows, err := m.db.SelectFromReplicationTasks(
82✔
1028
                ctx,
82✔
1029
                &sqlplugin.ReplicationTasksFilter{
82✔
1030
                        ShardID:   m.shardID,
82✔
1031
                        MinTaskID: readLevel,
82✔
1032
                        MaxTaskID: maxReadLevelInclusive,
82✔
1033
                        PageSize:  request.BatchSize,
82✔
1034
                })
82✔
1035

82✔
1036
        switch err {
82✔
1037
        case nil:
82✔
1038
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
82✔
1039
        case sql.ErrNoRows:
×
1040
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1041
        default:
×
1042
                return nil, convertCommonErrors(m.db, "GetReplicationTasks", "", err)
×
1043
        }
1044
}
1045

1046
func getReadLevels(request *p.GetReplicationTasksRequest) (readLevel int64, maxReadLevelInclusive int64, err error) {
82✔
1047
        readLevel = request.ReadLevel
82✔
1048
        if len(request.NextPageToken) > 0 {
84✔
1049
                readLevel, err = deserializePageToken(request.NextPageToken)
2✔
1050
                if err != nil {
2✔
1051
                        return 0, 0, err
×
1052
                }
×
1053
        }
1054

1055
        maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxReadLevel)
82✔
1056
        return readLevel, maxReadLevelInclusive, nil
82✔
1057
}
1058

1059
func (m *sqlExecutionStore) populateGetReplicationTasksResponse(
1060
        rows []sqlplugin.ReplicationTasksRow,
1061
        requestMaxReadLevel int64,
1062
) (*p.InternalGetReplicationTasksResponse, error) {
82✔
1063
        if len(rows) == 0 {
164✔
1064
                return &p.InternalGetReplicationTasksResponse{}, nil
82✔
1065
        }
82✔
1066

1067
        var tasks = make([]*p.InternalReplicationTaskInfo, len(rows))
2✔
1068
        for i, row := range rows {
4✔
1069
                info, err := m.parser.ReplicationTaskInfoFromBlob(row.Data, row.DataEncoding)
2✔
1070
                if err != nil {
2✔
1071
                        return nil, err
×
1072
                }
×
1073

1074
                tasks[i] = &p.InternalReplicationTaskInfo{
2✔
1075
                        TaskID:            row.TaskID,
2✔
1076
                        DomainID:          info.DomainID.String(),
2✔
1077
                        WorkflowID:        info.GetWorkflowID(),
2✔
1078
                        RunID:             info.RunID.String(),
2✔
1079
                        TaskType:          int(info.GetTaskType()),
2✔
1080
                        FirstEventID:      info.GetFirstEventID(),
2✔
1081
                        NextEventID:       info.GetNextEventID(),
2✔
1082
                        Version:           info.GetVersion(),
2✔
1083
                        ScheduledID:       info.GetScheduledID(),
2✔
1084
                        BranchToken:       info.GetBranchToken(),
2✔
1085
                        NewRunBranchToken: info.GetNewRunBranchToken(),
2✔
1086
                        CreationTime:      info.GetCreationTimestamp(),
2✔
1087
                }
2✔
1088
        }
1089
        var nextPageToken []byte
2✔
1090
        lastTaskID := rows[len(rows)-1].TaskID
2✔
1091
        if lastTaskID < requestMaxReadLevel {
4✔
1092
                nextPageToken = serializePageToken(lastTaskID)
2✔
1093
        }
2✔
1094
        return &p.InternalGetReplicationTasksResponse{
2✔
1095
                Tasks:         tasks,
2✔
1096
                NextPageToken: nextPageToken,
2✔
1097
        }, nil
2✔
1098
}
1099

1100
func (m *sqlExecutionStore) CompleteReplicationTask(
1101
        ctx context.Context,
1102
        request *p.CompleteReplicationTaskRequest,
1103
) error {
×
1104

×
1105
        if _, err := m.db.DeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
×
1106
                ShardID: m.shardID,
×
1107
                TaskID:  request.TaskID,
×
1108
        }); err != nil {
×
1109
                return convertCommonErrors(m.db, "CompleteReplicationTask", "", err)
×
1110
        }
×
1111
        return nil
×
1112
}
1113

1114
func (m *sqlExecutionStore) RangeCompleteReplicationTask(
1115
        ctx context.Context,
1116
        request *p.RangeCompleteReplicationTaskRequest,
1117
) (*p.RangeCompleteReplicationTaskResponse, error) {
85✔
1118
        result, err := m.db.RangeDeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
85✔
1119
                ShardID:            m.shardID,
85✔
1120
                InclusiveEndTaskID: request.InclusiveEndTaskID,
85✔
1121
                PageSize:           request.PageSize,
85✔
1122
        })
85✔
1123
        if err != nil {
85✔
1124
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
×
1125
        }
×
1126
        rowsDeleted, err := result.RowsAffected()
85✔
1127
        if err != nil {
85✔
1128
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
×
1129
        }
×
1130
        return &p.RangeCompleteReplicationTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
85✔
1131
}
1132

1133
func (m *sqlExecutionStore) GetReplicationTasksFromDLQ(
1134
        ctx context.Context,
1135
        request *p.GetReplicationTasksFromDLQRequest,
1136
) (*p.InternalGetReplicationTasksFromDLQResponse, error) {
2✔
1137

2✔
1138
        readLevel, maxReadLevelInclusive, err := getReadLevels(&request.GetReplicationTasksRequest)
2✔
1139
        if err != nil {
2✔
1140
                return nil, err
×
1141
        }
×
1142

1143
        filter := sqlplugin.ReplicationTasksFilter{
2✔
1144
                ShardID:   m.shardID,
2✔
1145
                MinTaskID: readLevel,
2✔
1146
                MaxTaskID: maxReadLevelInclusive,
2✔
1147
                PageSize:  request.BatchSize,
2✔
1148
        }
2✔
1149
        rows, err := m.db.SelectFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
2✔
1150
                ReplicationTasksFilter: filter,
2✔
1151
                SourceClusterName:      request.SourceClusterName,
2✔
1152
        })
2✔
1153

2✔
1154
        switch err {
2✔
1155
        case nil:
2✔
1156
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
2✔
1157
        case sql.ErrNoRows:
×
1158
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1159
        default:
×
1160
                return nil, convertCommonErrors(m.db, "GetReplicationTasksFromDLQ", "", err)
×
1161
        }
1162
}
1163

1164
func (m *sqlExecutionStore) GetReplicationDLQSize(
1165
        ctx context.Context,
1166
        request *p.GetReplicationDLQSizeRequest,
1167
) (*p.GetReplicationDLQSizeResponse, error) {
8✔
1168

8✔
1169
        size, err := m.db.SelectFromReplicationDLQ(ctx, &sqlplugin.ReplicationTaskDLQFilter{
8✔
1170
                SourceClusterName: request.SourceClusterName,
8✔
1171
                ShardID:           m.shardID,
8✔
1172
        })
8✔
1173

8✔
1174
        switch err {
8✔
1175
        case nil:
8✔
1176
                return &p.GetReplicationDLQSizeResponse{
8✔
1177
                        Size: size,
8✔
1178
                }, nil
8✔
1179
        case sql.ErrNoRows:
×
1180
                return &p.GetReplicationDLQSizeResponse{
×
1181
                        Size: 0,
×
1182
                }, nil
×
1183
        default:
×
1184
                return nil, convertCommonErrors(m.db, "GetReplicationDLQSize", "", err)
×
1185
        }
1186
}
1187

1188
func (m *sqlExecutionStore) DeleteReplicationTaskFromDLQ(
1189
        ctx context.Context,
1190
        request *p.DeleteReplicationTaskFromDLQRequest,
1191
) error {
×
1192

×
1193
        filter := sqlplugin.ReplicationTasksFilter{
×
1194
                ShardID: m.shardID,
×
1195
                TaskID:  request.TaskID,
×
1196
        }
×
1197

×
1198
        if _, err := m.db.DeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
×
1199
                ReplicationTasksFilter: filter,
×
1200
                SourceClusterName:      request.SourceClusterName,
×
1201
        }); err != nil {
×
1202
                return convertCommonErrors(m.db, "DeleteReplicationTaskFromDLQ", "", err)
×
1203
        }
×
1204
        return nil
×
1205
}
1206

1207
func (m *sqlExecutionStore) RangeDeleteReplicationTaskFromDLQ(
1208
        ctx context.Context,
1209
        request *p.RangeDeleteReplicationTaskFromDLQRequest,
1210
) (*p.RangeDeleteReplicationTaskFromDLQResponse, error) {
×
1211
        filter := sqlplugin.ReplicationTasksFilter{
×
1212
                ShardID:            m.shardID,
×
1213
                TaskID:             request.ExclusiveBeginTaskID,
×
1214
                InclusiveEndTaskID: request.InclusiveEndTaskID,
×
1215
                PageSize:           request.PageSize,
×
1216
        }
×
1217
        result, err := m.db.RangeDeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
×
1218
                ReplicationTasksFilter: filter,
×
1219
                SourceClusterName:      request.SourceClusterName,
×
1220
        })
×
1221
        if err != nil {
×
1222
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
×
1223
        }
×
1224
        rowsDeleted, err := result.RowsAffected()
×
1225
        if err != nil {
×
1226
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
×
1227
        }
×
1228
        return &p.RangeDeleteReplicationTaskFromDLQResponse{TasksCompleted: int(rowsDeleted)}, nil
×
1229
}
1230

1231
func (m *sqlExecutionStore) CreateFailoverMarkerTasks(
1232
        ctx context.Context,
1233
        request *p.CreateFailoverMarkersRequest,
1234
) error {
×
1235
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
×
1236
        return m.txExecuteShardLocked(ctx, dbShardID, "CreateFailoverMarkerTasks", request.RangeID, func(tx sqlplugin.Tx) error {
×
1237
                for _, task := range request.Markers {
×
1238
                        t := []p.Task{task}
×
1239
                        if err := createReplicationTasks(
×
1240
                                ctx,
×
1241
                                tx,
×
1242
                                t,
×
1243
                                m.shardID,
×
1244
                                serialization.MustParseUUID(task.DomainID),
×
1245
                                emptyWorkflowID,
×
1246
                                serialization.MustParseUUID(emptyReplicationRunID),
×
1247
                                m.parser,
×
1248
                        ); err != nil {
×
1249
                                rollBackErr := tx.Rollback()
×
1250
                                if rollBackErr != nil {
×
1251
                                        m.logger.Error("transaction rollback error", tag.Error(rollBackErr))
×
1252
                                }
×
1253
                                return err
×
1254
                        }
1255
                }
1256
                return nil
×
1257
        })
1258
}
1259

1260
type timerTaskPageToken struct {
1261
        TaskID    int64
1262
        Timestamp time.Time
1263
}
1264

1265
func (t *timerTaskPageToken) serialize() ([]byte, error) {
537✔
1266
        return json.Marshal(t)
537✔
1267
}
537✔
1268

1269
func (t *timerTaskPageToken) deserialize(payload []byte) error {
×
1270
        return json.Unmarshal(payload, t)
×
1271
}
×
1272

1273
func (m *sqlExecutionStore) GetTimerIndexTasks(
1274
        ctx context.Context,
1275
        request *p.GetTimerIndexTasksRequest,
1276
) (*p.GetTimerIndexTasksResponse, error) {
2,169✔
1277

2,169✔
1278
        pageToken := &timerTaskPageToken{TaskID: math.MinInt64, Timestamp: request.MinTimestamp}
2,169✔
1279
        if len(request.NextPageToken) > 0 {
2,169✔
1280
                if err := pageToken.deserialize(request.NextPageToken); err != nil {
×
1281
                        return nil, &types.InternalServiceError{
×
1282
                                Message: fmt.Sprintf("error deserializing timerTaskPageToken: %v", err),
×
1283
                        }
×
1284
                }
×
1285
        }
1286

1287
        rows, err := m.db.SelectFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
2,169✔
1288
                ShardID:                m.shardID,
2,169✔
1289
                MinVisibilityTimestamp: pageToken.Timestamp,
2,169✔
1290
                TaskID:                 pageToken.TaskID,
2,169✔
1291
                MaxVisibilityTimestamp: request.MaxTimestamp,
2,169✔
1292
                PageSize:               request.BatchSize + 1,
2,169✔
1293
        })
2,169✔
1294

2,169✔
1295
        if err != nil && err != sql.ErrNoRows {
2,169✔
1296
                return nil, convertCommonErrors(m.db, "GetTimerIndexTasks", "", err)
×
1297
        }
×
1298

1299
        resp := &p.GetTimerIndexTasksResponse{Timers: make([]*p.TimerTaskInfo, len(rows))}
2,169✔
1300
        for i, row := range rows {
8,009✔
1301
                info, err := m.parser.TimerTaskInfoFromBlob(row.Data, row.DataEncoding)
5,840✔
1302
                if err != nil {
5,840✔
1303
                        return nil, err
×
1304
                }
×
1305
                resp.Timers[i] = &p.TimerTaskInfo{
5,840✔
1306
                        VisibilityTimestamp: row.VisibilityTimestamp,
5,840✔
1307
                        TaskID:              row.TaskID,
5,840✔
1308
                        DomainID:            info.DomainID.String(),
5,840✔
1309
                        WorkflowID:          info.GetWorkflowID(),
5,840✔
1310
                        RunID:               info.RunID.String(),
5,840✔
1311
                        TaskType:            int(info.GetTaskType()),
5,840✔
1312
                        TimeoutType:         int(info.GetTimeoutType()),
5,840✔
1313
                        EventID:             info.GetEventID(),
5,840✔
1314
                        ScheduleAttempt:     info.GetScheduleAttempt(),
5,840✔
1315
                        Version:             info.GetVersion(),
5,840✔
1316
                }
5,840✔
1317
        }
1318

1319
        if len(resp.Timers) > request.BatchSize {
2,706✔
1320
                pageToken = &timerTaskPageToken{
537✔
1321
                        TaskID:    resp.Timers[request.BatchSize].TaskID,
537✔
1322
                        Timestamp: resp.Timers[request.BatchSize].VisibilityTimestamp,
537✔
1323
                }
537✔
1324
                resp.Timers = resp.Timers[:request.BatchSize]
537✔
1325
                nextToken, err := pageToken.serialize()
537✔
1326
                if err != nil {
537✔
1327
                        return nil, &types.InternalServiceError{
×
1328
                                Message: fmt.Sprintf("GetTimerTasks: error serializing page token: %v", err),
×
1329
                        }
×
1330
                }
×
1331
                resp.NextPageToken = nextToken
537✔
1332
        }
1333

1334
        return resp, nil
2,169✔
1335
}
1336

1337
func (m *sqlExecutionStore) CompleteTimerTask(
1338
        ctx context.Context,
1339
        request *p.CompleteTimerTaskRequest,
1340
) error {
×
1341

×
1342
        if _, err := m.db.DeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
×
1343
                ShardID:             m.shardID,
×
1344
                VisibilityTimestamp: request.VisibilityTimestamp,
×
1345
                TaskID:              request.TaskID,
×
1346
        }); err != nil {
×
1347
                return convertCommonErrors(m.db, "CompleteTimerTask", "", err)
×
1348
        }
×
1349
        return nil
×
1350
}
1351

1352
func (m *sqlExecutionStore) RangeCompleteTimerTask(
1353
        ctx context.Context,
1354
        request *p.RangeCompleteTimerTaskRequest,
1355
) (*p.RangeCompleteTimerTaskResponse, error) {
26✔
1356
        result, err := m.db.RangeDeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
26✔
1357
                ShardID:                m.shardID,
26✔
1358
                MinVisibilityTimestamp: request.InclusiveBeginTimestamp,
26✔
1359
                MaxVisibilityTimestamp: request.ExclusiveEndTimestamp,
26✔
1360
                PageSize:               request.PageSize,
26✔
1361
        })
26✔
1362
        if err != nil {
26✔
1363
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
×
1364
        }
×
1365
        rowsDeleted, err := result.RowsAffected()
26✔
1366
        if err != nil {
26✔
1367
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
×
1368
        }
×
1369
        return &p.RangeCompleteTimerTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
26✔
1370
}
1371

1372
func (m *sqlExecutionStore) PutReplicationTaskToDLQ(
1373
        ctx context.Context,
1374
        request *p.InternalPutReplicationTaskToDLQRequest,
1375
) error {
2✔
1376
        replicationTask := request.TaskInfo
2✔
1377
        blob, err := m.parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{
2✔
1378
                DomainID:                serialization.MustParseUUID(replicationTask.DomainID),
2✔
1379
                WorkflowID:              replicationTask.WorkflowID,
2✔
1380
                RunID:                   serialization.MustParseUUID(replicationTask.RunID),
2✔
1381
                TaskType:                int16(replicationTask.TaskType),
2✔
1382
                FirstEventID:            replicationTask.FirstEventID,
2✔
1383
                NextEventID:             replicationTask.NextEventID,
2✔
1384
                Version:                 replicationTask.Version,
2✔
1385
                ScheduledID:             replicationTask.ScheduledID,
2✔
1386
                EventStoreVersion:       p.EventStoreVersion,
2✔
1387
                NewRunEventStoreVersion: p.EventStoreVersion,
2✔
1388
                BranchToken:             replicationTask.BranchToken,
2✔
1389
                NewRunBranchToken:       replicationTask.NewRunBranchToken,
2✔
1390
                CreationTimestamp:       replicationTask.CreationTime,
2✔
1391
        })
2✔
1392
        if err != nil {
2✔
1393
                return err
×
1394
        }
×
1395

1396
        row := &sqlplugin.ReplicationTaskDLQRow{
2✔
1397
                SourceClusterName: request.SourceClusterName,
2✔
1398
                ShardID:           m.shardID,
2✔
1399
                TaskID:            replicationTask.TaskID,
2✔
1400
                Data:              blob.Data,
2✔
1401
                DataEncoding:      string(blob.Encoding),
2✔
1402
        }
2✔
1403

2✔
1404
        _, err = m.db.InsertIntoReplicationTasksDLQ(ctx, row)
2✔
1405

2✔
1406
        // Tasks are immutable. So it's fine if we already persisted it before.
2✔
1407
        // This can happen when tasks are retried (ack and cleanup can have lag on source side).
2✔
1408
        if err != nil && !m.db.IsDupEntryError(err) {
2✔
1409
                return convertCommonErrors(m.db, "PutReplicationTaskToDLQ", "", err)
×
1410
        }
×
1411

1412
        return nil
2✔
1413
}
1414

1415
func (m *sqlExecutionStore) populateWorkflowMutableState(
1416
        execution sqlplugin.ExecutionsRow,
1417
) (*p.InternalWorkflowMutableState, error) {
494✔
1418

494✔
1419
        info, err := m.parser.WorkflowExecutionInfoFromBlob(execution.Data, execution.DataEncoding)
494✔
1420
        if err != nil {
494✔
1421
                return nil, err
×
1422
        }
×
1423

1424
        state := &p.InternalWorkflowMutableState{}
494✔
1425
        state.ExecutionInfo = serialization.ToInternalWorkflowExecutionInfo(info)
494✔
1426
        state.ExecutionInfo.DomainID = execution.DomainID.String()
494✔
1427
        state.ExecutionInfo.WorkflowID = execution.WorkflowID
494✔
1428
        state.ExecutionInfo.RunID = execution.RunID.String()
494✔
1429
        state.ExecutionInfo.NextEventID = execution.NextEventID
494✔
1430
        // TODO: remove this after all 2DC workflows complete
494✔
1431
        if info.LastWriteEventID != nil {
494✔
1432
                state.ReplicationState = &p.ReplicationState{}
×
1433
                state.ReplicationState.StartVersion = info.GetStartVersion()
×
1434
                state.ReplicationState.LastWriteVersion = execution.LastWriteVersion
×
1435
                state.ReplicationState.LastWriteEventID = info.GetLastWriteEventID()
×
1436
        }
×
1437

1438
        if info.GetVersionHistories() != nil {
988✔
1439
                state.VersionHistories = p.NewDataBlob(
494✔
1440
                        info.GetVersionHistories(),
494✔
1441
                        common.EncodingType(info.GetVersionHistoriesEncoding()),
494✔
1442
                )
494✔
1443
        }
494✔
1444

1445
        return state, nil
494✔
1446
}
1447

1448
func (m *sqlExecutionStore) populateInternalListConcreteExecutions(
1449
        executions []sqlplugin.ExecutionsRow,
1450
) ([]*p.InternalListConcreteExecutionsEntity, error) {
×
1451

×
1452
        concreteExecutions := make([]*p.InternalListConcreteExecutionsEntity, 0, len(executions))
×
1453
        for _, execution := range executions {
×
1454
                mutableState, err := m.populateWorkflowMutableState(execution)
×
1455
                if err != nil {
×
1456
                        return nil, err
×
1457
                }
×
1458

1459
                concreteExecution := &p.InternalListConcreteExecutionsEntity{
×
1460
                        ExecutionInfo:    mutableState.ExecutionInfo,
×
1461
                        VersionHistories: mutableState.VersionHistories,
×
1462
                }
×
1463
                concreteExecutions = append(concreteExecutions, concreteExecution)
×
1464
        }
1465
        return concreteExecutions, nil
×
1466
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc