• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018965f0-fdaa-4535-a39b-924d1b4fe28b

17 Jul 2023 10:20PM UTC coverage: 57.058% (-0.09%) from 57.146%
018965f0-fdaa-4535-a39b-924d1b4fe28b

push

buildkite

web-flow
[dynamic config] add Filters method to dynamic config Key (#5346)

What changed?

Add Filters method to Key interface
Add implementations on most keys by parsing the comments on keys (assuming they are correct)
Why?

This is needed to know what dynamic config is domain specific. And this could possible simplify the collection struct by consolidating all GetPropertyFilterBy** methods.

How did you test it?

Potential risks

no risk since the method will be read only in non-critical path

Release notes

Documentation Changes

21 of 21 new or added lines in 1 file covered. (100.0%)

87154 of 152745 relevant lines covered (57.06%)

2500.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.86
/common/persistence/sql/sqlExecutionStore.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "bytes"
25
        "context"
26
        "database/sql"
27
        "encoding/json"
28
        "fmt"
29
        "math"
30
        "runtime/debug"
31
        "time"
32

33
        "golang.org/x/sync/errgroup"
34

35
        "github.com/uber/cadence/common"
36
        "github.com/uber/cadence/common/collection"
37
        "github.com/uber/cadence/common/log"
38
        "github.com/uber/cadence/common/log/tag"
39
        p "github.com/uber/cadence/common/persistence"
40
        "github.com/uber/cadence/common/persistence/serialization"
41
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
42
        "github.com/uber/cadence/common/types"
43
)
44

45
const (
46
        emptyWorkflowID       string = ""
47
        emptyReplicationRunID string = "30000000-5000-f000-f000-000000000000"
48
)
49

50
type sqlExecutionStore struct {
51
        sqlStore
52
        shardID int
53
}
54

55
var _ p.ExecutionStore = (*sqlExecutionStore)(nil)
56

57
// NewSQLExecutionStore creates an instance of ExecutionStore
58
func NewSQLExecutionStore(
59
        db sqlplugin.DB,
60
        logger log.Logger,
61
        shardID int,
62
        parser serialization.Parser,
63
        dc *p.DynamicConfiguration,
64
) (p.ExecutionStore, error) {
42✔
65

42✔
66
        return &sqlExecutionStore{
42✔
67
                shardID: shardID,
42✔
68
                sqlStore: sqlStore{
42✔
69
                        db:     db,
42✔
70
                        logger: logger,
42✔
71
                        parser: parser,
42✔
72
                        dc:     dc,
42✔
73
                },
42✔
74
        }, nil
42✔
75
}
42✔
76

77
// txExecuteShardLocked executes f under transaction and with read lock on shard row
78
func (m *sqlExecutionStore) txExecuteShardLocked(
79
        ctx context.Context,
80
        dbShardID int,
81
        operation string,
82
        rangeID int64,
83
        fn func(tx sqlplugin.Tx) error,
84
) error {
3,261✔
85

3,261✔
86
        return m.txExecute(ctx, dbShardID, operation, func(tx sqlplugin.Tx) error {
6,522✔
87
                if err := readLockShard(ctx, tx, m.shardID, rangeID); err != nil {
3,261✔
88
                        return err
×
89
                }
×
90
                err := fn(tx)
3,261✔
91
                if err != nil {
3,291✔
92
                        return err
30✔
93
                }
30✔
94
                return nil
3,233✔
95
        })
96
}
97

98
func (m *sqlExecutionStore) GetShardID() int {
7,694✔
99
        return m.shardID
7,694✔
100
}
7,694✔
101

102
func (m *sqlExecutionStore) CreateWorkflowExecution(
103
        ctx context.Context,
104
        request *p.InternalCreateWorkflowExecutionRequest,
105
) (response *p.CreateWorkflowExecutionResponse, err error) {
336✔
106
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
336✔
107

336✔
108
        err = m.txExecuteShardLocked(ctx, dbShardID, "CreateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
672✔
109
                response, err = m.createWorkflowExecutionTx(ctx, tx, request)
336✔
110
                return err
336✔
111
        })
336✔
112
        return
336✔
113
}
114

115
func (m *sqlExecutionStore) createWorkflowExecutionTx(
116
        ctx context.Context,
117
        tx sqlplugin.Tx,
118
        request *p.InternalCreateWorkflowExecutionRequest,
119
) (*p.CreateWorkflowExecutionResponse, error) {
336✔
120

336✔
121
        newWorkflow := request.NewWorkflowSnapshot
336✔
122
        executionInfo := newWorkflow.ExecutionInfo
336✔
123
        startVersion := newWorkflow.StartVersion
336✔
124
        lastWriteVersion := newWorkflow.LastWriteVersion
336✔
125
        shardID := m.shardID
336✔
126
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
336✔
127
        workflowID := executionInfo.WorkflowID
336✔
128
        runID := serialization.MustParseUUID(executionInfo.RunID)
336✔
129

336✔
130
        if err := p.ValidateCreateWorkflowModeState(
336✔
131
                request.Mode,
336✔
132
                newWorkflow,
336✔
133
        ); err != nil {
336✔
134
                return nil, err
×
135
        }
×
136

137
        var err error
336✔
138
        var row *sqlplugin.CurrentExecutionsRow
336✔
139
        if row, err = lockCurrentExecutionIfExists(ctx, tx, m.shardID, domainID, workflowID); err != nil {
336✔
140
                return nil, err
×
141
        }
×
142

143
        // current workflow record check
144
        if row != nil {
386✔
145
                // current run ID, last write version, current workflow state check
50✔
146
                switch request.Mode {
50✔
147
                case p.CreateWorkflowModeBrandNew:
28✔
148
                        return nil, &p.WorkflowExecutionAlreadyStartedError{
28✔
149
                                Msg:              fmt.Sprintf("Workflow execution already running. WorkflowId: %v", row.WorkflowID),
28✔
150
                                StartRequestID:   row.CreateRequestID,
28✔
151
                                RunID:            row.RunID.String(),
28✔
152
                                State:            int(row.State),
28✔
153
                                CloseStatus:      int(row.CloseStatus),
28✔
154
                                LastWriteVersion: row.LastWriteVersion,
28✔
155
                        }
28✔
156

157
                case p.CreateWorkflowModeWorkflowIDReuse:
20✔
158
                        if request.PreviousLastWriteVersion != row.LastWriteVersion {
20✔
159
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
160
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
161
                                                "LastWriteVersion: %v, PreviousLastWriteVersion: %v",
×
162
                                                workflowID, row.LastWriteVersion, request.PreviousLastWriteVersion),
×
163
                                }
×
164
                        }
×
165
                        if row.State != p.WorkflowStateCompleted {
20✔
166
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
167
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
168
                                                "State: %v, Expected: %v",
×
169
                                                workflowID, row.State, p.WorkflowStateCompleted),
×
170
                                }
×
171
                        }
×
172
                        runIDStr := row.RunID.String()
20✔
173
                        if runIDStr != request.PreviousRunID {
20✔
174
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
175
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
176
                                                "RunID: %v, PreviousRunID: %v",
×
177
                                                workflowID, runIDStr, request.PreviousRunID),
×
178
                                }
×
179
                        }
×
180

181
                case p.CreateWorkflowModeZombie:
2✔
182
                        // zombie workflow creation with existence of current record, this is a noop
2✔
183
                        if err := assertRunIDMismatch(serialization.MustParseUUID(executionInfo.RunID), row.RunID); err != nil {
2✔
184
                                return nil, err
×
185
                        }
×
186

187
                case p.CreateWorkflowModeContinueAsNew:
2✔
188
                        // continueAsNew mode expects a current run exists
2✔
189
                        if err := assertRunIDMismatch(serialization.MustParseUUID(executionInfo.RunID), row.RunID); err != nil {
2✔
190
                                return nil, err
×
191
                        }
×
192

193
                default:
×
194
                        return nil, &types.InternalServiceError{
×
195
                                Message: fmt.Sprintf(
×
196
                                        "CreteWorkflowExecution: unknown mode: %v",
×
197
                                        request.Mode,
×
198
                                ),
×
199
                        }
×
200
                }
201
        }
202

203
        if err := createOrUpdateCurrentExecution(
308✔
204
                ctx,
308✔
205
                tx,
308✔
206
                request.Mode,
308✔
207
                m.shardID,
308✔
208
                domainID,
308✔
209
                workflowID,
308✔
210
                runID,
308✔
211
                executionInfo.State,
308✔
212
                executionInfo.CloseStatus,
308✔
213
                executionInfo.CreateRequestID,
308✔
214
                startVersion,
308✔
215
                lastWriteVersion); err != nil {
308✔
216
                return nil, err
×
217
        }
×
218

219
        if err := m.applyWorkflowSnapshotTxAsNew(ctx, tx, shardID, &request.NewWorkflowSnapshot, m.parser); err != nil {
310✔
220
                return nil, err
2✔
221
        }
2✔
222

223
        return &p.CreateWorkflowExecutionResponse{}, nil
308✔
224
}
225

226
func (m *sqlExecutionStore) getExecutions(
227
        ctx context.Context,
228
        request *p.InternalGetWorkflowExecutionRequest,
229
        domainID serialization.UUID,
230
        wfID string,
231
        runID serialization.UUID,
232
) ([]sqlplugin.ExecutionsRow, error) {
760✔
233
        executions, err := m.db.SelectFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
760✔
234
                ShardID: m.shardID, DomainID: domainID, WorkflowID: wfID, RunID: runID})
760✔
235

760✔
236
        if err != nil {
1,028✔
237
                if err == sql.ErrNoRows {
536✔
238
                        return nil, &types.EntityNotExistsError{
268✔
239
                                Message: fmt.Sprintf(
268✔
240
                                        "Workflow execution not found.  WorkflowId: %v, RunId: %v",
268✔
241
                                        request.Execution.GetWorkflowID(),
268✔
242
                                        request.Execution.GetRunID(),
268✔
243
                                ),
268✔
244
                        }
268✔
245
                }
268✔
246
                return nil, convertCommonErrors(m.db, "GetWorkflowExecution", "", err)
×
247
        }
248

249
        if len(executions) == 0 {
494✔
250
                return nil, &types.EntityNotExistsError{
×
251
                        Message: fmt.Sprintf(
×
252
                                "Workflow execution not found.  WorkflowId: %v, RunId: %v",
×
253
                                request.Execution.GetWorkflowID(),
×
254
                                request.Execution.GetRunID(),
×
255
                        ),
×
256
                }
×
257
        }
×
258

259
        if len(executions) != 1 {
494✔
260
                return nil, &types.InternalServiceError{
×
261
                        Message: "GetWorkflowExecution return more than one results.",
×
262
                }
×
263
        }
×
264
        return executions, nil
494✔
265
}
266

267
func (m *sqlExecutionStore) GetWorkflowExecution(
268
        ctx context.Context,
269
        request *p.InternalGetWorkflowExecutionRequest,
270
) (resp *p.InternalGetWorkflowExecutionResponse, e error) {
760✔
271
        recoverPanic := func(recovered interface{}, err *error) {
6,826✔
272
                if recovered != nil {
6,066✔
273
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
274
                }
×
275
        }
276

277
        domainID := serialization.MustParseUUID(request.DomainID)
760✔
278
        runID := serialization.MustParseUUID(request.Execution.RunID)
760✔
279
        wfID := request.Execution.WorkflowID
760✔
280

760✔
281
        var executions []sqlplugin.ExecutionsRow
760✔
282
        var activityInfos map[int64]*p.InternalActivityInfo
760✔
283
        var timerInfos map[string]*p.TimerInfo
760✔
284
        var childExecutionInfos map[int64]*p.InternalChildExecutionInfo
760✔
285
        var requestCancelInfos map[int64]*p.RequestCancelInfo
760✔
286
        var signalInfos map[int64]*p.SignalInfo
760✔
287
        var bufferedEvents []*p.DataBlob
760✔
288
        var signalsRequested map[string]struct{}
760✔
289

760✔
290
        g, ctx := errgroup.WithContext(ctx)
760✔
291

760✔
292
        g.Go(func() (e error) {
1,520✔
293
                defer func() { recoverPanic(recover(), &e) }()
1,520✔
294
                executions, e = m.getExecutions(ctx, request, domainID, wfID, runID)
760✔
295
                return e
760✔
296
        })
297

298
        g.Go(func() (e error) {
1,520✔
299
                defer func() { recoverPanic(recover(), &e) }()
1,520✔
300
                activityInfos, e = getActivityInfoMap(
760✔
301
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
760✔
302
                return e
760✔
303
        })
304

305
        g.Go(func() (e error) {
1,520✔
306
                defer func() { recoverPanic(recover(), &e) }()
1,520✔
307
                timerInfos, e = getTimerInfoMap(
760✔
308
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
760✔
309
                return e
760✔
310
        })
311

312
        g.Go(func() (e error) {
1,520✔
313
                defer func() { recoverPanic(recover(), &e) }()
1,520✔
314
                childExecutionInfos, e = getChildExecutionInfoMap(
760✔
315
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
760✔
316
                return e
760✔
317
        })
318

319
        g.Go(func() (e error) {
1,520✔
320
                defer func() { recoverPanic(recover(), &e) }()
1,520✔
321
                requestCancelInfos, e = getRequestCancelInfoMap(
760✔
322
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
760✔
323
                return e
760✔
324
        })
325

326
        g.Go(func() (e error) {
1,520✔
327
                defer func() { recoverPanic(recover(), &e) }()
1,520✔
328
                signalInfos, e = getSignalInfoMap(
760✔
329
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
760✔
330
                return e
760✔
331
        })
332

333
        g.Go(func() (e error) {
1,520✔
334
                defer func() { recoverPanic(recover(), &e) }()
1,520✔
335
                bufferedEvents, e = getBufferedEvents(
760✔
336
                        ctx, m.db, m.shardID, domainID, wfID, runID)
760✔
337
                return e
760✔
338
        })
339

340
        g.Go(func() (e error) {
1,520✔
341
                defer func() { recoverPanic(recover(), &e) }()
1,520✔
342
                signalsRequested, e = getSignalsRequested(
760✔
343
                        ctx, m.db, m.shardID, domainID, wfID, runID)
760✔
344
                return e
760✔
345
        })
346

347
        err := g.Wait()
760✔
348
        if err != nil {
1,028✔
349
                return nil, err
268✔
350
        }
268✔
351

352
        state, err := m.populateWorkflowMutableState(executions[0])
494✔
353
        if err != nil {
494✔
354
                return nil, &types.InternalServiceError{
×
355
                        Message: fmt.Sprintf("GetWorkflowExecution: failed. Error: %v", err),
×
356
                }
×
357
        }
×
358
        state.ActivityInfos = activityInfos
494✔
359
        state.TimerInfos = timerInfos
494✔
360
        state.ChildExecutionInfos = childExecutionInfos
494✔
361
        state.RequestCancelInfos = requestCancelInfos
494✔
362
        state.SignalInfos = signalInfos
494✔
363
        state.BufferedEvents = bufferedEvents
494✔
364
        state.SignalRequestedIDs = signalsRequested
494✔
365

494✔
366
        return &p.InternalGetWorkflowExecutionResponse{State: state}, nil
494✔
367
}
368

369
func (m *sqlExecutionStore) UpdateWorkflowExecution(
370
        ctx context.Context,
371
        request *p.InternalUpdateWorkflowExecutionRequest,
372
) error {
2,927✔
373
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
2,927✔
374
        return m.txExecuteShardLocked(ctx, dbShardID, "UpdateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
5,854✔
375
                return m.updateWorkflowExecutionTx(ctx, tx, request)
2,927✔
376
        })
2,927✔
377
}
378

379
func (m *sqlExecutionStore) updateWorkflowExecutionTx(
380
        ctx context.Context,
381
        tx sqlplugin.Tx,
382
        request *p.InternalUpdateWorkflowExecutionRequest,
383
) error {
2,927✔
384

2,927✔
385
        updateWorkflow := request.UpdateWorkflowMutation
2,927✔
386
        newWorkflow := request.NewWorkflowSnapshot
2,927✔
387

2,927✔
388
        executionInfo := updateWorkflow.ExecutionInfo
2,927✔
389
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
2,927✔
390
        workflowID := executionInfo.WorkflowID
2,927✔
391
        runID := serialization.MustParseUUID(executionInfo.RunID)
2,927✔
392
        shardID := m.shardID
2,927✔
393

2,927✔
394
        if err := p.ValidateUpdateWorkflowModeState(
2,927✔
395
                request.Mode,
2,927✔
396
                updateWorkflow,
2,927✔
397
                newWorkflow,
2,927✔
398
        ); err != nil {
2,927✔
399
                return err
×
400
        }
×
401

402
        switch request.Mode {
2,927✔
403
        case p.UpdateWorkflowModeIgnoreCurrent:
×
404
                // no-op
405
        case p.UpdateWorkflowModeBypassCurrent:
2✔
406
                if err := assertNotCurrentExecution(
2✔
407
                        ctx,
2✔
408
                        tx,
2✔
409
                        shardID,
2✔
410
                        domainID,
2✔
411
                        workflowID,
2✔
412
                        runID); err != nil {
2✔
413
                        return err
×
414
                }
×
415

416
        case p.UpdateWorkflowModeUpdateCurrent:
2,927✔
417
                if newWorkflow != nil {
3,045✔
418
                        newExecutionInfo := newWorkflow.ExecutionInfo
118✔
419
                        startVersion := newWorkflow.StartVersion
118✔
420
                        lastWriteVersion := newWorkflow.LastWriteVersion
118✔
421
                        newDomainID := serialization.MustParseUUID(newExecutionInfo.DomainID)
118✔
422
                        newRunID := serialization.MustParseUUID(newExecutionInfo.RunID)
118✔
423

118✔
424
                        if !bytes.Equal(domainID, newDomainID) {
118✔
425
                                return &types.InternalServiceError{
×
426
                                        Message: "UpdateWorkflowExecution: cannot continue as new to another domain",
×
427
                                }
×
428
                        }
×
429

430
                        if err := assertRunIDAndUpdateCurrentExecution(
118✔
431
                                ctx,
118✔
432
                                tx,
118✔
433
                                shardID,
118✔
434
                                domainID,
118✔
435
                                workflowID,
118✔
436
                                newRunID,
118✔
437
                                runID,
118✔
438
                                newWorkflow.ExecutionInfo.CreateRequestID,
118✔
439
                                newWorkflow.ExecutionInfo.State,
118✔
440
                                newWorkflow.ExecutionInfo.CloseStatus,
118✔
441
                                startVersion,
118✔
442
                                lastWriteVersion); err != nil {
118✔
443
                                return err
×
444
                        }
×
445
                } else {
2,811✔
446
                        startVersion := updateWorkflow.StartVersion
2,811✔
447
                        lastWriteVersion := updateWorkflow.LastWriteVersion
2,811✔
448
                        // this is only to update the current record
2,811✔
449
                        if err := assertRunIDAndUpdateCurrentExecution(
2,811✔
450
                                ctx,
2,811✔
451
                                tx,
2,811✔
452
                                shardID,
2,811✔
453
                                domainID,
2,811✔
454
                                workflowID,
2,811✔
455
                                runID,
2,811✔
456
                                runID,
2,811✔
457
                                executionInfo.CreateRequestID,
2,811✔
458
                                executionInfo.State,
2,811✔
459
                                executionInfo.CloseStatus,
2,811✔
460
                                startVersion,
2,811✔
461
                                lastWriteVersion); err != nil {
2,811✔
462
                                return err
×
463
                        }
×
464
                }
465

466
        default:
×
467
                return &types.InternalServiceError{
×
468
                        Message: fmt.Sprintf("UpdateWorkflowExecution: unknown mode: %v", request.Mode),
×
469
                }
×
470
        }
471

472
        if m.useAsyncTransaction() { // async transaction is enabled
2,927✔
473
                // TODO: it's possible to merge some operations in the following 2 functions in a batch, should refactor the code later
×
474
                if err := applyWorkflowMutationAsyncTx(ctx, tx, shardID, &updateWorkflow, m.parser); err != nil {
×
475
                        return err
×
476
                }
×
477
                if newWorkflow != nil {
×
478
                        if err := m.applyWorkflowSnapshotAsyncTxAsNew(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
×
479
                                return err
×
480
                        }
×
481
                }
482
                return nil
×
483
        }
484

485
        if err := applyWorkflowMutationTx(ctx, tx, shardID, &updateWorkflow, m.parser); err != nil {
2,927✔
486
                return err
×
487
        }
×
488
        if newWorkflow != nil {
3,045✔
489
                if err := m.applyWorkflowSnapshotTxAsNew(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
118✔
490
                        return err
×
491
                }
×
492
        }
493
        return nil
2,927✔
494
}
495

496
func (m *sqlExecutionStore) ConflictResolveWorkflowExecution(
497
        ctx context.Context,
498
        request *p.InternalConflictResolveWorkflowExecutionRequest,
499
) error {
2✔
500
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
2✔
501
        return m.txExecuteShardLocked(ctx, dbShardID, "ConflictResolveWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
4✔
502
                return m.conflictResolveWorkflowExecutionTx(ctx, tx, request)
2✔
503
        })
2✔
504
}
505

506
func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx(
507
        ctx context.Context,
508
        tx sqlplugin.Tx,
509
        request *p.InternalConflictResolveWorkflowExecutionRequest,
510
) error {
2✔
511

2✔
512
        currentWorkflow := request.CurrentWorkflowMutation
2✔
513
        resetWorkflow := request.ResetWorkflowSnapshot
2✔
514
        newWorkflow := request.NewWorkflowSnapshot
2✔
515

2✔
516
        shardID := m.shardID
2✔
517

2✔
518
        domainID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.DomainID)
2✔
519
        workflowID := resetWorkflow.ExecutionInfo.WorkflowID
2✔
520

2✔
521
        if err := p.ValidateConflictResolveWorkflowModeState(
2✔
522
                request.Mode,
2✔
523
                resetWorkflow,
2✔
524
                newWorkflow,
2✔
525
                currentWorkflow,
2✔
526
        ); err != nil {
2✔
527
                return err
×
528
        }
×
529

530
        switch request.Mode {
2✔
531
        case p.ConflictResolveWorkflowModeBypassCurrent:
2✔
532
                if err := assertNotCurrentExecution(
2✔
533
                        ctx,
2✔
534
                        tx,
2✔
535
                        shardID,
2✔
536
                        domainID,
2✔
537
                        workflowID,
2✔
538
                        serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)); err != nil {
2✔
539
                        return err
×
540
                }
×
541

542
        case p.ConflictResolveWorkflowModeUpdateCurrent:
2✔
543
                executionInfo := resetWorkflow.ExecutionInfo
2✔
544
                startVersion := resetWorkflow.StartVersion
2✔
545
                lastWriteVersion := resetWorkflow.LastWriteVersion
2✔
546
                if newWorkflow != nil {
2✔
547
                        executionInfo = newWorkflow.ExecutionInfo
×
548
                        startVersion = newWorkflow.StartVersion
×
549
                        lastWriteVersion = newWorkflow.LastWriteVersion
×
550
                }
×
551
                runID := serialization.MustParseUUID(executionInfo.RunID)
2✔
552
                createRequestID := executionInfo.CreateRequestID
2✔
553
                state := executionInfo.State
2✔
554
                closeStatus := executionInfo.CloseStatus
2✔
555

2✔
556
                if currentWorkflow != nil {
2✔
557
                        prevRunID := serialization.MustParseUUID(currentWorkflow.ExecutionInfo.RunID)
×
558

×
559
                        if err := assertRunIDAndUpdateCurrentExecution(
×
560
                                ctx,
×
561
                                tx,
×
562
                                m.shardID,
×
563
                                domainID,
×
564
                                workflowID,
×
565
                                runID,
×
566
                                prevRunID,
×
567
                                createRequestID,
×
568
                                state,
×
569
                                closeStatus,
×
570
                                startVersion,
×
571
                                lastWriteVersion); err != nil {
×
572
                                return err
×
573
                        }
×
574
                } else {
2✔
575
                        // reset workflow is current
2✔
576
                        prevRunID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)
2✔
577

2✔
578
                        if err := assertRunIDAndUpdateCurrentExecution(
2✔
579
                                ctx,
2✔
580
                                tx,
2✔
581
                                m.shardID,
2✔
582
                                domainID,
2✔
583
                                workflowID,
2✔
584
                                runID,
2✔
585
                                prevRunID,
2✔
586
                                createRequestID,
2✔
587
                                state,
2✔
588
                                closeStatus,
2✔
589
                                startVersion,
2✔
590
                                lastWriteVersion); err != nil {
2✔
591
                                return err
×
592
                        }
×
593
                }
594

595
        default:
×
596
                return &types.InternalServiceError{
×
597
                        Message: fmt.Sprintf("ConflictResolveWorkflowExecution: unknown mode: %v", request.Mode),
×
598
                }
×
599
        }
600

601
        if err := applyWorkflowSnapshotTxAsReset(ctx, tx, shardID, &resetWorkflow, m.parser); err != nil {
2✔
602
                return err
×
603
        }
×
604
        if currentWorkflow != nil {
2✔
605
                if err := applyWorkflowMutationTx(ctx, tx, shardID, currentWorkflow, m.parser); err != nil {
×
606
                        return err
×
607
                }
×
608
        }
609
        if newWorkflow != nil {
2✔
610
                if err := m.applyWorkflowSnapshotTxAsNew(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
×
611
                        return err
×
612
                }
×
613
        }
614
        return nil
2✔
615
}
616

617
func (m *sqlExecutionStore) DeleteWorkflowExecution(
618
        ctx context.Context,
619
        request *p.DeleteWorkflowExecutionRequest,
620
) error {
36✔
621
        recoverPanic := func(recovered interface{}, err *error) {
310✔
622
                if recovered != nil {
274✔
623
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
624
                }
×
625
        }
626
        domainID := serialization.MustParseUUID(request.DomainID)
36✔
627
        runID := serialization.MustParseUUID(request.RunID)
36✔
628
        wfID := request.WorkflowID
36✔
629
        g, ctx := errgroup.WithContext(ctx)
36✔
630

36✔
631
        g.Go(func() (e error) {
72✔
632
                defer func() { recoverPanic(recover(), &e) }()
72✔
633
                _, e = m.db.DeleteFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
36✔
634
                        ShardID:    m.shardID,
36✔
635
                        DomainID:   domainID,
36✔
636
                        WorkflowID: wfID,
36✔
637
                        RunID:      runID,
36✔
638
                })
36✔
639
                return e
36✔
640
        })
641

642
        g.Go(func() (e error) {
72✔
643
                defer func() { recoverPanic(recover(), &e) }()
72✔
644
                _, e = m.db.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
36✔
645
                        ShardID:    int64(m.shardID),
36✔
646
                        DomainID:   domainID,
36✔
647
                        WorkflowID: wfID,
36✔
648
                        RunID:      runID,
36✔
649
                })
36✔
650
                return e
36✔
651
        })
652

653
        g.Go(func() (e error) {
72✔
654
                defer func() { recoverPanic(recover(), &e) }()
72✔
655
                _, e = m.db.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
36✔
656
                        ShardID:    int64(m.shardID),
36✔
657
                        DomainID:   domainID,
36✔
658
                        WorkflowID: wfID,
36✔
659
                        RunID:      runID,
36✔
660
                })
36✔
661
                return e
36✔
662
        })
663

664
        g.Go(func() (e error) {
72✔
665
                defer func() { recoverPanic(recover(), &e) }()
72✔
666
                _, e = m.db.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
36✔
667
                        ShardID:    int64(m.shardID),
36✔
668
                        DomainID:   domainID,
36✔
669
                        WorkflowID: wfID,
36✔
670
                        RunID:      runID,
36✔
671
                })
36✔
672
                return e
36✔
673
        })
674

675
        g.Go(func() (e error) {
72✔
676
                defer func() { recoverPanic(recover(), &e) }()
72✔
677
                _, e = m.db.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
36✔
678
                        ShardID:    int64(m.shardID),
36✔
679
                        DomainID:   domainID,
36✔
680
                        WorkflowID: wfID,
36✔
681
                        RunID:      runID,
36✔
682
                })
36✔
683
                return e
36✔
684
        })
685

686
        g.Go(func() (e error) {
72✔
687
                defer func() { recoverPanic(recover(), &e) }()
72✔
688
                _, e = m.db.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
36✔
689
                        ShardID:    int64(m.shardID),
36✔
690
                        DomainID:   domainID,
36✔
691
                        WorkflowID: wfID,
36✔
692
                        RunID:      runID,
36✔
693
                })
36✔
694
                return e
36✔
695
        })
696

697
        g.Go(func() (e error) {
72✔
698
                defer func() { recoverPanic(recover(), &e) }()
72✔
699
                _, e = m.db.DeleteFromBufferedEvents(ctx, &sqlplugin.BufferedEventsFilter{
36✔
700
                        ShardID:    m.shardID,
36✔
701
                        DomainID:   domainID,
36✔
702
                        WorkflowID: wfID,
36✔
703
                        RunID:      runID,
36✔
704
                })
36✔
705
                return e
36✔
706
        })
707

708
        g.Go(func() (e error) {
72✔
709
                defer func() { recoverPanic(recover(), &e) }()
72✔
710
                _, e = m.db.DeleteFromSignalsRequestedSets(ctx, &sqlplugin.SignalsRequestedSetsFilter{
36✔
711
                        ShardID:    int64(m.shardID),
36✔
712
                        DomainID:   domainID,
36✔
713
                        WorkflowID: wfID,
36✔
714
                        RunID:      runID,
36✔
715
                })
36✔
716
                return e
36✔
717
        })
718
        return g.Wait()
36✔
719
}
720

721
// its possible for a new run of the same workflow to have started after the run we are deleting
722
// here was finished. In that case, current_executions table will have the same workflowID but different
723
// runID. The following code will delete the row from current_executions if and only if the runID is
724
// same as the one we are trying to delete here
725
func (m *sqlExecutionStore) DeleteCurrentWorkflowExecution(
726
        ctx context.Context,
727
        request *p.DeleteCurrentWorkflowExecutionRequest,
728
) error {
36✔
729

36✔
730
        domainID := serialization.MustParseUUID(request.DomainID)
36✔
731
        runID := serialization.MustParseUUID(request.RunID)
36✔
732
        _, err := m.db.DeleteFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
36✔
733
                ShardID:    int64(m.shardID),
36✔
734
                DomainID:   domainID,
36✔
735
                WorkflowID: request.WorkflowID,
36✔
736
                RunID:      runID,
36✔
737
        })
36✔
738
        if err != nil {
36✔
739
                return convertCommonErrors(m.db, "DeleteCurrentWorkflowExecution", "", err)
×
740
        }
×
741
        return nil
36✔
742
}
743

744
func (m *sqlExecutionStore) GetCurrentExecution(
745
        ctx context.Context,
746
        request *p.GetCurrentExecutionRequest,
747
) (*p.GetCurrentExecutionResponse, error) {
122✔
748

122✔
749
        row, err := m.db.SelectFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
122✔
750
                ShardID:    int64(m.shardID),
122✔
751
                DomainID:   serialization.MustParseUUID(request.DomainID),
122✔
752
                WorkflowID: request.WorkflowID,
122✔
753
        })
122✔
754
        if err != nil {
130✔
755
                return nil, convertCommonErrors(m.db, "GetCurrentExecution", "", err)
8✔
756
        }
8✔
757
        return &p.GetCurrentExecutionResponse{
116✔
758
                StartRequestID:   row.CreateRequestID,
116✔
759
                RunID:            row.RunID.String(),
116✔
760
                State:            int(row.State),
116✔
761
                CloseStatus:      int(row.CloseStatus),
116✔
762
                LastWriteVersion: row.LastWriteVersion,
116✔
763
        }, nil
116✔
764
}
765

766
func (m *sqlExecutionStore) ListCurrentExecutions(
767
        _ context.Context,
768
        _ *p.ListCurrentExecutionsRequest,
769
) (*p.ListCurrentExecutionsResponse, error) {
×
770
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
771
}
×
772

773
func (m *sqlExecutionStore) IsWorkflowExecutionExists(
774
        _ context.Context,
775
        _ *p.IsWorkflowExecutionExistsRequest,
776
) (*p.IsWorkflowExecutionExistsResponse, error) {
×
777
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
778
}
×
779

780
func (m *sqlExecutionStore) ListConcreteExecutions(
781
        ctx context.Context,
782
        request *p.ListConcreteExecutionsRequest,
783
) (*p.InternalListConcreteExecutionsResponse, error) {
×
784

×
785
        filter := &sqlplugin.ExecutionsFilter{}
×
786
        if len(request.PageToken) > 0 {
×
787
                err := gobDeserialize(request.PageToken, &filter)
×
788
                if err != nil {
×
789
                        return nil, &types.InternalServiceError{
×
790
                                Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
791
                        }
×
792
                }
×
793
        } else {
×
794
                filter = &sqlplugin.ExecutionsFilter{
×
795
                        ShardID:    m.shardID,
×
796
                        WorkflowID: "",
×
797
                }
×
798
        }
×
799
        filter.Size = request.PageSize
×
800

×
801
        executions, err := m.db.SelectFromExecutions(ctx, filter)
×
802
        if err != nil {
×
803
                if err == sql.ErrNoRows {
×
804
                        return &p.InternalListConcreteExecutionsResponse{}, nil
×
805
                }
×
806
                return nil, convertCommonErrors(m.db, "ListConcreteExecutions", "", err)
×
807
        }
808

809
        if len(executions) == 0 {
×
810
                return &p.InternalListConcreteExecutionsResponse{}, nil
×
811
        }
×
812
        lastExecution := executions[len(executions)-1]
×
813
        nextFilter := &sqlplugin.ExecutionsFilter{
×
814
                ShardID:    m.shardID,
×
815
                WorkflowID: lastExecution.WorkflowID,
×
816
        }
×
817
        token, err := gobSerialize(nextFilter)
×
818
        if err != nil {
×
819
                return nil, &types.InternalServiceError{
×
820
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
821
                }
×
822
        }
×
823
        concreteExecutions, err := m.populateInternalListConcreteExecutions(executions)
×
824
        if err != nil {
×
825
                return nil, &types.InternalServiceError{
×
826
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
827
                }
×
828
        }
×
829

830
        return &p.InternalListConcreteExecutionsResponse{
×
831
                Executions:    concreteExecutions,
×
832
                NextPageToken: token,
×
833
        }, nil
×
834
}
835

836
func (m *sqlExecutionStore) GetTransferTasks(
837
        ctx context.Context,
838
        request *p.GetTransferTasksRequest,
839
) (*p.GetTransferTasksResponse, error) {
1,545✔
840
        minReadLevel := request.ReadLevel
1,545✔
841
        if len(request.NextPageToken) > 0 {
1,545✔
842
                readLevel, err := deserializePageToken(request.NextPageToken)
×
843
                if err != nil {
×
844
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "failed to deserialize page token", err)
×
845
                }
×
846
                minReadLevel = readLevel
×
847
        }
848
        rows, err := m.db.SelectFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
1,545✔
849
                ShardID:   m.shardID,
1,545✔
850
                MinTaskID: minReadLevel,
1,545✔
851
                MaxTaskID: request.MaxReadLevel,
1,545✔
852
                PageSize:  request.BatchSize,
1,545✔
853
        })
1,545✔
854
        if err != nil {
1,545✔
855
                if err != sql.ErrNoRows {
×
856
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "", err)
×
857
                }
×
858
        }
859
        resp := &p.GetTransferTasksResponse{Tasks: make([]*p.TransferTaskInfo, len(rows))}
1,545✔
860
        for i, row := range rows {
5,276✔
861
                info, err := m.parser.TransferTaskInfoFromBlob(row.Data, row.DataEncoding)
3,731✔
862
                if err != nil {
3,731✔
863
                        return nil, err
×
864
                }
×
865
                resp.Tasks[i] = &p.TransferTaskInfo{
3,731✔
866
                        TaskID:                  row.TaskID,
3,731✔
867
                        DomainID:                info.DomainID.String(),
3,731✔
868
                        WorkflowID:              info.GetWorkflowID(),
3,731✔
869
                        RunID:                   info.RunID.String(),
3,731✔
870
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
3,731✔
871
                        TargetDomainID:          info.TargetDomainID.String(),
3,731✔
872
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
3,731✔
873
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
3,731✔
874
                        TargetRunID:             info.TargetRunID.String(),
3,731✔
875
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
3,731✔
876
                        TaskList:                info.GetTaskList(),
3,731✔
877
                        TaskType:                int(info.GetTaskType()),
3,731✔
878
                        ScheduleID:              info.GetScheduleID(),
3,731✔
879
                        Version:                 info.GetVersion(),
3,731✔
880
                }
3,731✔
881
        }
882
        if len(rows) > 0 {
3,026✔
883
                lastTaskID := rows[len(rows)-1].TaskID
1,481✔
884
                if lastTaskID < request.MaxReadLevel {
1,484✔
885
                        resp.NextPageToken = serializePageToken(lastTaskID)
3✔
886
                }
3✔
887
        }
888
        return resp, nil
1,545✔
889
}
890

891
func (m *sqlExecutionStore) CompleteTransferTask(
892
        ctx context.Context,
893
        request *p.CompleteTransferTaskRequest,
894
) error {
×
895

×
896
        if _, err := m.db.DeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
×
897
                ShardID: m.shardID,
×
898
                TaskID:  request.TaskID,
×
899
        }); err != nil {
×
900
                return convertCommonErrors(m.db, "CompleteTransferTask", "", err)
×
901
        }
×
902
        return nil
×
903
}
904

905
func (m *sqlExecutionStore) RangeCompleteTransferTask(
906
        ctx context.Context,
907
        request *p.RangeCompleteTransferTaskRequest,
908
) (*p.RangeCompleteTransferTaskResponse, error) {
75✔
909
        result, err := m.db.RangeDeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
75✔
910
                ShardID:   m.shardID,
75✔
911
                MinTaskID: request.ExclusiveBeginTaskID,
75✔
912
                MaxTaskID: request.InclusiveEndTaskID,
75✔
913
                PageSize:  request.PageSize,
75✔
914
        })
75✔
915
        if err != nil {
75✔
916
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
×
917
        }
×
918
        rowsDeleted, err := result.RowsAffected()
75✔
919
        if err != nil {
75✔
920
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
×
921
        }
×
922
        return &p.RangeCompleteTransferTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
75✔
923
}
924

925
func (m *sqlExecutionStore) GetCrossClusterTasks(
926
        ctx context.Context,
927
        request *p.GetCrossClusterTasksRequest,
928
) (*p.GetCrossClusterTasksResponse, error) {
105✔
929
        minReadLevel := request.ReadLevel
105✔
930
        if len(request.NextPageToken) > 0 {
105✔
931
                readLevel, err := deserializePageToken(request.NextPageToken)
×
932
                if err != nil {
×
933
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "failed to deserialize page token", err)
×
934
                }
×
935
                minReadLevel = readLevel
×
936
        }
937
        rows, err := m.db.SelectFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
105✔
938
                TargetCluster: request.TargetCluster,
105✔
939
                ShardID:       m.shardID,
105✔
940
                MinTaskID:     minReadLevel,
105✔
941
                MaxTaskID:     request.MaxReadLevel,
105✔
942
                PageSize:      request.BatchSize,
105✔
943
        })
105✔
944
        if err != nil {
105✔
945
                if err != sql.ErrNoRows {
×
946
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "", err)
×
947
                }
×
948
        }
949
        resp := &p.GetCrossClusterTasksResponse{Tasks: make([]*p.CrossClusterTaskInfo, len(rows))}
105✔
950
        for i, row := range rows {
105✔
951
                info, err := m.parser.CrossClusterTaskInfoFromBlob(row.Data, row.DataEncoding)
×
952
                if err != nil {
×
953
                        return nil, err
×
954
                }
×
955
                resp.Tasks[i] = &p.CrossClusterTaskInfo{
×
956
                        TaskID:                  row.TaskID,
×
957
                        DomainID:                info.DomainID.String(),
×
958
                        WorkflowID:              info.GetWorkflowID(),
×
959
                        RunID:                   info.RunID.String(),
×
960
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
×
961
                        TargetDomainID:          info.TargetDomainID.String(),
×
962
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
×
963
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
×
964
                        TargetRunID:             info.TargetRunID.String(),
×
965
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
×
966
                        TaskList:                info.GetTaskList(),
×
967
                        TaskType:                int(info.GetTaskType()),
×
968
                        ScheduleID:              info.GetScheduleID(),
×
969
                        Version:                 info.GetVersion(),
×
970
                }
×
971
        }
972
        if len(rows) > 0 {
105✔
973
                lastTaskID := rows[len(rows)-1].TaskID
×
974
                if lastTaskID < request.MaxReadLevel {
×
975
                        resp.NextPageToken = serializePageToken(lastTaskID)
×
976
                }
×
977
        }
978
        return resp, nil
105✔
979

980
}
981

982
func (m *sqlExecutionStore) CompleteCrossClusterTask(
983
        ctx context.Context,
984
        request *p.CompleteCrossClusterTaskRequest,
985
) error {
×
986
        if _, err := m.db.DeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
×
987
                TargetCluster: request.TargetCluster,
×
988
                ShardID:       m.shardID,
×
989
                TaskID:        request.TaskID,
×
990
        }); err != nil {
×
991
                return convertCommonErrors(m.db, "CompleteCrossClusterTask", "", err)
×
992
        }
×
993
        return nil
×
994
}
995

996
func (m *sqlExecutionStore) RangeCompleteCrossClusterTask(
997
        ctx context.Context,
998
        request *p.RangeCompleteCrossClusterTaskRequest,
999
) (*p.RangeCompleteCrossClusterTaskResponse, error) {
90✔
1000
        result, err := m.db.RangeDeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
90✔
1001
                TargetCluster: request.TargetCluster,
90✔
1002
                ShardID:       m.shardID,
90✔
1003
                MinTaskID:     request.ExclusiveBeginTaskID,
90✔
1004
                MaxTaskID:     request.InclusiveEndTaskID,
90✔
1005
                PageSize:      request.PageSize,
90✔
1006
        })
90✔
1007
        if err != nil {
90✔
1008
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
×
1009
        }
×
1010
        rowsDeleted, err := result.RowsAffected()
90✔
1011
        if err != nil {
90✔
1012
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
×
1013
        }
×
1014
        return &p.RangeCompleteCrossClusterTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
90✔
1015
}
1016

1017
func (m *sqlExecutionStore) GetReplicationTasks(
1018
        ctx context.Context,
1019
        request *p.GetReplicationTasksRequest,
1020
) (*p.InternalGetReplicationTasksResponse, error) {
80✔
1021

80✔
1022
        readLevel, maxReadLevelInclusive, err := getReadLevels(request)
80✔
1023
        if err != nil {
80✔
1024
                return nil, err
×
1025
        }
×
1026

1027
        rows, err := m.db.SelectFromReplicationTasks(
80✔
1028
                ctx,
80✔
1029
                &sqlplugin.ReplicationTasksFilter{
80✔
1030
                        ShardID:   m.shardID,
80✔
1031
                        MinTaskID: readLevel,
80✔
1032
                        MaxTaskID: maxReadLevelInclusive,
80✔
1033
                        PageSize:  request.BatchSize,
80✔
1034
                })
80✔
1035

80✔
1036
        switch err {
80✔
1037
        case nil:
80✔
1038
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
80✔
1039
        case sql.ErrNoRows:
×
1040
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1041
        default:
×
1042
                return nil, convertCommonErrors(m.db, "GetReplicationTasks", "", err)
×
1043
        }
1044
}
1045

1046
func getReadLevels(request *p.GetReplicationTasksRequest) (readLevel int64, maxReadLevelInclusive int64, err error) {
82✔
1047
        readLevel = request.ReadLevel
82✔
1048
        if len(request.NextPageToken) > 0 {
84✔
1049
                readLevel, err = deserializePageToken(request.NextPageToken)
2✔
1050
                if err != nil {
2✔
1051
                        return 0, 0, err
×
1052
                }
×
1053
        }
1054

1055
        maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxReadLevel)
82✔
1056
        return readLevel, maxReadLevelInclusive, nil
82✔
1057
}
1058

1059
func (m *sqlExecutionStore) populateGetReplicationTasksResponse(
1060
        rows []sqlplugin.ReplicationTasksRow,
1061
        requestMaxReadLevel int64,
1062
) (*p.InternalGetReplicationTasksResponse, error) {
82✔
1063
        if len(rows) == 0 {
164✔
1064
                return &p.InternalGetReplicationTasksResponse{}, nil
82✔
1065
        }
82✔
1066

1067
        var tasks = make([]*p.InternalReplicationTaskInfo, len(rows))
2✔
1068
        for i, row := range rows {
4✔
1069
                info, err := m.parser.ReplicationTaskInfoFromBlob(row.Data, row.DataEncoding)
2✔
1070
                if err != nil {
2✔
1071
                        return nil, err
×
1072
                }
×
1073

1074
                tasks[i] = &p.InternalReplicationTaskInfo{
2✔
1075
                        TaskID:            row.TaskID,
2✔
1076
                        DomainID:          info.DomainID.String(),
2✔
1077
                        WorkflowID:        info.GetWorkflowID(),
2✔
1078
                        RunID:             info.RunID.String(),
2✔
1079
                        TaskType:          int(info.GetTaskType()),
2✔
1080
                        FirstEventID:      info.GetFirstEventID(),
2✔
1081
                        NextEventID:       info.GetNextEventID(),
2✔
1082
                        Version:           info.GetVersion(),
2✔
1083
                        ScheduledID:       info.GetScheduledID(),
2✔
1084
                        BranchToken:       info.GetBranchToken(),
2✔
1085
                        NewRunBranchToken: info.GetNewRunBranchToken(),
2✔
1086
                        CreationTime:      info.GetCreationTimestamp(),
2✔
1087
                }
2✔
1088
        }
1089
        var nextPageToken []byte
2✔
1090
        lastTaskID := rows[len(rows)-1].TaskID
2✔
1091
        if lastTaskID < requestMaxReadLevel {
4✔
1092
                nextPageToken = serializePageToken(lastTaskID)
2✔
1093
        }
2✔
1094
        return &p.InternalGetReplicationTasksResponse{
2✔
1095
                Tasks:         tasks,
2✔
1096
                NextPageToken: nextPageToken,
2✔
1097
        }, nil
2✔
1098
}
1099

1100
func (m *sqlExecutionStore) CompleteReplicationTask(
1101
        ctx context.Context,
1102
        request *p.CompleteReplicationTaskRequest,
1103
) error {
×
1104

×
1105
        if _, err := m.db.DeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
×
1106
                ShardID: m.shardID,
×
1107
                TaskID:  request.TaskID,
×
1108
        }); err != nil {
×
1109
                return convertCommonErrors(m.db, "CompleteReplicationTask", "", err)
×
1110
        }
×
1111
        return nil
×
1112
}
1113

1114
func (m *sqlExecutionStore) RangeCompleteReplicationTask(
1115
        ctx context.Context,
1116
        request *p.RangeCompleteReplicationTaskRequest,
1117
) (*p.RangeCompleteReplicationTaskResponse, error) {
80✔
1118
        result, err := m.db.RangeDeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
80✔
1119
                ShardID:            m.shardID,
80✔
1120
                InclusiveEndTaskID: request.InclusiveEndTaskID,
80✔
1121
                PageSize:           request.PageSize,
80✔
1122
        })
80✔
1123
        if err != nil {
80✔
1124
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
×
1125
        }
×
1126
        rowsDeleted, err := result.RowsAffected()
80✔
1127
        if err != nil {
80✔
1128
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
×
1129
        }
×
1130
        return &p.RangeCompleteReplicationTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
80✔
1131
}
1132

1133
func (m *sqlExecutionStore) GetReplicationTasksFromDLQ(
1134
        ctx context.Context,
1135
        request *p.GetReplicationTasksFromDLQRequest,
1136
) (*p.InternalGetReplicationTasksFromDLQResponse, error) {
2✔
1137

2✔
1138
        readLevel, maxReadLevelInclusive, err := getReadLevels(&request.GetReplicationTasksRequest)
2✔
1139
        if err != nil {
2✔
1140
                return nil, err
×
1141
        }
×
1142

1143
        filter := sqlplugin.ReplicationTasksFilter{
2✔
1144
                ShardID:   m.shardID,
2✔
1145
                MinTaskID: readLevel,
2✔
1146
                MaxTaskID: maxReadLevelInclusive,
2✔
1147
                PageSize:  request.BatchSize,
2✔
1148
        }
2✔
1149
        rows, err := m.db.SelectFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
2✔
1150
                ReplicationTasksFilter: filter,
2✔
1151
                SourceClusterName:      request.SourceClusterName,
2✔
1152
        })
2✔
1153

2✔
1154
        switch err {
2✔
1155
        case nil:
2✔
1156
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
2✔
1157
        case sql.ErrNoRows:
×
1158
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1159
        default:
×
1160
                return nil, convertCommonErrors(m.db, "GetReplicationTasksFromDLQ", "", err)
×
1161
        }
1162
}
1163

1164
func (m *sqlExecutionStore) GetReplicationDLQSize(
1165
        ctx context.Context,
1166
        request *p.GetReplicationDLQSizeRequest,
1167
) (*p.GetReplicationDLQSizeResponse, error) {
8✔
1168

8✔
1169
        size, err := m.db.SelectFromReplicationDLQ(ctx, &sqlplugin.ReplicationTaskDLQFilter{
8✔
1170
                SourceClusterName: request.SourceClusterName,
8✔
1171
                ShardID:           m.shardID,
8✔
1172
        })
8✔
1173

8✔
1174
        switch err {
8✔
1175
        case nil:
8✔
1176
                return &p.GetReplicationDLQSizeResponse{
8✔
1177
                        Size: size,
8✔
1178
                }, nil
8✔
1179
        case sql.ErrNoRows:
×
1180
                return &p.GetReplicationDLQSizeResponse{
×
1181
                        Size: 0,
×
1182
                }, nil
×
1183
        default:
×
1184
                return nil, convertCommonErrors(m.db, "GetReplicationDLQSize", "", err)
×
1185
        }
1186
}
1187

1188
func (m *sqlExecutionStore) DeleteReplicationTaskFromDLQ(
1189
        ctx context.Context,
1190
        request *p.DeleteReplicationTaskFromDLQRequest,
1191
) error {
×
1192

×
1193
        filter := sqlplugin.ReplicationTasksFilter{
×
1194
                ShardID: m.shardID,
×
1195
                TaskID:  request.TaskID,
×
1196
        }
×
1197

×
1198
        if _, err := m.db.DeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
×
1199
                ReplicationTasksFilter: filter,
×
1200
                SourceClusterName:      request.SourceClusterName,
×
1201
        }); err != nil {
×
1202
                return convertCommonErrors(m.db, "DeleteReplicationTaskFromDLQ", "", err)
×
1203
        }
×
1204
        return nil
×
1205
}
1206

1207
func (m *sqlExecutionStore) RangeDeleteReplicationTaskFromDLQ(
1208
        ctx context.Context,
1209
        request *p.RangeDeleteReplicationTaskFromDLQRequest,
1210
) (*p.RangeDeleteReplicationTaskFromDLQResponse, error) {
×
1211
        filter := sqlplugin.ReplicationTasksFilter{
×
1212
                ShardID:            m.shardID,
×
1213
                TaskID:             request.ExclusiveBeginTaskID,
×
1214
                InclusiveEndTaskID: request.InclusiveEndTaskID,
×
1215
                PageSize:           request.PageSize,
×
1216
        }
×
1217
        result, err := m.db.RangeDeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
×
1218
                ReplicationTasksFilter: filter,
×
1219
                SourceClusterName:      request.SourceClusterName,
×
1220
        })
×
1221
        if err != nil {
×
1222
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
×
1223
        }
×
1224
        rowsDeleted, err := result.RowsAffected()
×
1225
        if err != nil {
×
1226
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
×
1227
        }
×
1228
        return &p.RangeDeleteReplicationTaskFromDLQResponse{TasksCompleted: int(rowsDeleted)}, nil
×
1229
}
1230

1231
func (m *sqlExecutionStore) CreateFailoverMarkerTasks(
1232
        ctx context.Context,
1233
        request *p.CreateFailoverMarkersRequest,
1234
) error {
×
1235
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
×
1236
        return m.txExecuteShardLocked(ctx, dbShardID, "CreateFailoverMarkerTasks", request.RangeID, func(tx sqlplugin.Tx) error {
×
1237
                for _, task := range request.Markers {
×
1238
                        t := []p.Task{task}
×
1239
                        if err := createReplicationTasks(
×
1240
                                ctx,
×
1241
                                tx,
×
1242
                                t,
×
1243
                                m.shardID,
×
1244
                                serialization.MustParseUUID(task.DomainID),
×
1245
                                emptyWorkflowID,
×
1246
                                serialization.MustParseUUID(emptyReplicationRunID),
×
1247
                                m.parser,
×
1248
                        ); err != nil {
×
1249
                                rollBackErr := tx.Rollback()
×
1250
                                if rollBackErr != nil {
×
1251
                                        m.logger.Error("transaction rollback error", tag.Error(rollBackErr))
×
1252
                                }
×
1253
                                return err
×
1254
                        }
1255
                }
1256
                return nil
×
1257
        })
1258
}
1259

1260
type timerTaskPageToken struct {
1261
        TaskID    int64
1262
        Timestamp time.Time
1263
}
1264

1265
func (t *timerTaskPageToken) serialize() ([]byte, error) {
537✔
1266
        return json.Marshal(t)
537✔
1267
}
537✔
1268

1269
func (t *timerTaskPageToken) deserialize(payload []byte) error {
×
1270
        return json.Unmarshal(payload, t)
×
1271
}
×
1272

1273
func (m *sqlExecutionStore) GetTimerIndexTasks(
1274
        ctx context.Context,
1275
        request *p.GetTimerIndexTasksRequest,
1276
) (*p.GetTimerIndexTasksResponse, error) {
2,155✔
1277

2,155✔
1278
        pageToken := &timerTaskPageToken{TaskID: math.MinInt64, Timestamp: request.MinTimestamp}
2,155✔
1279
        if len(request.NextPageToken) > 0 {
2,155✔
1280
                if err := pageToken.deserialize(request.NextPageToken); err != nil {
×
1281
                        return nil, &types.InternalServiceError{
×
1282
                                Message: fmt.Sprintf("error deserializing timerTaskPageToken: %v", err),
×
1283
                        }
×
1284
                }
×
1285
        }
1286

1287
        rows, err := m.db.SelectFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
2,155✔
1288
                ShardID:                m.shardID,
2,155✔
1289
                MinVisibilityTimestamp: pageToken.Timestamp,
2,155✔
1290
                TaskID:                 pageToken.TaskID,
2,155✔
1291
                MaxVisibilityTimestamp: request.MaxTimestamp,
2,155✔
1292
                PageSize:               request.BatchSize + 1,
2,155✔
1293
        })
2,155✔
1294

2,155✔
1295
        if err != nil && err != sql.ErrNoRows {
2,155✔
1296
                return nil, convertCommonErrors(m.db, "GetTimerIndexTasks", "", err)
×
1297
        }
×
1298

1299
        resp := &p.GetTimerIndexTasksResponse{Timers: make([]*p.TimerTaskInfo, len(rows))}
2,155✔
1300
        for i, row := range rows {
7,913✔
1301
                info, err := m.parser.TimerTaskInfoFromBlob(row.Data, row.DataEncoding)
5,758✔
1302
                if err != nil {
5,758✔
1303
                        return nil, err
×
1304
                }
×
1305
                resp.Timers[i] = &p.TimerTaskInfo{
5,758✔
1306
                        VisibilityTimestamp: row.VisibilityTimestamp,
5,758✔
1307
                        TaskID:              row.TaskID,
5,758✔
1308
                        DomainID:            info.DomainID.String(),
5,758✔
1309
                        WorkflowID:          info.GetWorkflowID(),
5,758✔
1310
                        RunID:               info.RunID.String(),
5,758✔
1311
                        TaskType:            int(info.GetTaskType()),
5,758✔
1312
                        TimeoutType:         int(info.GetTimeoutType()),
5,758✔
1313
                        EventID:             info.GetEventID(),
5,758✔
1314
                        ScheduleAttempt:     info.GetScheduleAttempt(),
5,758✔
1315
                        Version:             info.GetVersion(),
5,758✔
1316
                }
5,758✔
1317
        }
1318

1319
        if len(resp.Timers) > request.BatchSize {
2,692✔
1320
                pageToken = &timerTaskPageToken{
537✔
1321
                        TaskID:    resp.Timers[request.BatchSize].TaskID,
537✔
1322
                        Timestamp: resp.Timers[request.BatchSize].VisibilityTimestamp,
537✔
1323
                }
537✔
1324
                resp.Timers = resp.Timers[:request.BatchSize]
537✔
1325
                nextToken, err := pageToken.serialize()
537✔
1326
                if err != nil {
537✔
1327
                        return nil, &types.InternalServiceError{
×
1328
                                Message: fmt.Sprintf("GetTimerTasks: error serializing page token: %v", err),
×
1329
                        }
×
1330
                }
×
1331
                resp.NextPageToken = nextToken
537✔
1332
        }
1333

1334
        return resp, nil
2,155✔
1335
}
1336

1337
func (m *sqlExecutionStore) CompleteTimerTask(
1338
        ctx context.Context,
1339
        request *p.CompleteTimerTaskRequest,
1340
) error {
×
1341

×
1342
        if _, err := m.db.DeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
×
1343
                ShardID:             m.shardID,
×
1344
                VisibilityTimestamp: request.VisibilityTimestamp,
×
1345
                TaskID:              request.TaskID,
×
1346
        }); err != nil {
×
1347
                return convertCommonErrors(m.db, "CompleteTimerTask", "", err)
×
1348
        }
×
1349
        return nil
×
1350
}
1351

1352
func (m *sqlExecutionStore) RangeCompleteTimerTask(
1353
        ctx context.Context,
1354
        request *p.RangeCompleteTimerTaskRequest,
1355
) (*p.RangeCompleteTimerTaskResponse, error) {
24✔
1356
        result, err := m.db.RangeDeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
24✔
1357
                ShardID:                m.shardID,
24✔
1358
                MinVisibilityTimestamp: request.InclusiveBeginTimestamp,
24✔
1359
                MaxVisibilityTimestamp: request.ExclusiveEndTimestamp,
24✔
1360
                PageSize:               request.PageSize,
24✔
1361
        })
24✔
1362
        if err != nil {
24✔
1363
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
×
1364
        }
×
1365
        rowsDeleted, err := result.RowsAffected()
24✔
1366
        if err != nil {
24✔
1367
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
×
1368
        }
×
1369
        return &p.RangeCompleteTimerTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
24✔
1370
}
1371

1372
func (m *sqlExecutionStore) PutReplicationTaskToDLQ(
1373
        ctx context.Context,
1374
        request *p.InternalPutReplicationTaskToDLQRequest,
1375
) error {
2✔
1376
        replicationTask := request.TaskInfo
2✔
1377
        blob, err := m.parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{
2✔
1378
                DomainID:                serialization.MustParseUUID(replicationTask.DomainID),
2✔
1379
                WorkflowID:              replicationTask.WorkflowID,
2✔
1380
                RunID:                   serialization.MustParseUUID(replicationTask.RunID),
2✔
1381
                TaskType:                int16(replicationTask.TaskType),
2✔
1382
                FirstEventID:            replicationTask.FirstEventID,
2✔
1383
                NextEventID:             replicationTask.NextEventID,
2✔
1384
                Version:                 replicationTask.Version,
2✔
1385
                ScheduledID:             replicationTask.ScheduledID,
2✔
1386
                EventStoreVersion:       p.EventStoreVersion,
2✔
1387
                NewRunEventStoreVersion: p.EventStoreVersion,
2✔
1388
                BranchToken:             replicationTask.BranchToken,
2✔
1389
                NewRunBranchToken:       replicationTask.NewRunBranchToken,
2✔
1390
                CreationTimestamp:       replicationTask.CreationTime,
2✔
1391
        })
2✔
1392
        if err != nil {
2✔
1393
                return err
×
1394
        }
×
1395

1396
        row := &sqlplugin.ReplicationTaskDLQRow{
2✔
1397
                SourceClusterName: request.SourceClusterName,
2✔
1398
                ShardID:           m.shardID,
2✔
1399
                TaskID:            replicationTask.TaskID,
2✔
1400
                Data:              blob.Data,
2✔
1401
                DataEncoding:      string(blob.Encoding),
2✔
1402
        }
2✔
1403

2✔
1404
        _, err = m.db.InsertIntoReplicationTasksDLQ(ctx, row)
2✔
1405

2✔
1406
        // Tasks are immutable. So it's fine if we already persisted it before.
2✔
1407
        // This can happen when tasks are retried (ack and cleanup can have lag on source side).
2✔
1408
        if err != nil && !m.db.IsDupEntryError(err) {
2✔
1409
                return convertCommonErrors(m.db, "PutReplicationTaskToDLQ", "", err)
×
1410
        }
×
1411

1412
        return nil
2✔
1413
}
1414

1415
func (m *sqlExecutionStore) populateWorkflowMutableState(
1416
        execution sqlplugin.ExecutionsRow,
1417
) (*p.InternalWorkflowMutableState, error) {
494✔
1418

494✔
1419
        info, err := m.parser.WorkflowExecutionInfoFromBlob(execution.Data, execution.DataEncoding)
494✔
1420
        if err != nil {
494✔
1421
                return nil, err
×
1422
        }
×
1423

1424
        state := &p.InternalWorkflowMutableState{}
494✔
1425
        state.ExecutionInfo = serialization.ToInternalWorkflowExecutionInfo(info)
494✔
1426
        state.ExecutionInfo.DomainID = execution.DomainID.String()
494✔
1427
        state.ExecutionInfo.WorkflowID = execution.WorkflowID
494✔
1428
        state.ExecutionInfo.RunID = execution.RunID.String()
494✔
1429
        state.ExecutionInfo.NextEventID = execution.NextEventID
494✔
1430
        // TODO: remove this after all 2DC workflows complete
494✔
1431
        if info.LastWriteEventID != nil {
494✔
1432
                state.ReplicationState = &p.ReplicationState{}
×
1433
                state.ReplicationState.StartVersion = info.GetStartVersion()
×
1434
                state.ReplicationState.LastWriteVersion = execution.LastWriteVersion
×
1435
                state.ReplicationState.LastWriteEventID = info.GetLastWriteEventID()
×
1436
        }
×
1437

1438
        if info.GetVersionHistories() != nil {
988✔
1439
                state.VersionHistories = p.NewDataBlob(
494✔
1440
                        info.GetVersionHistories(),
494✔
1441
                        common.EncodingType(info.GetVersionHistoriesEncoding()),
494✔
1442
                )
494✔
1443
        }
494✔
1444

1445
        return state, nil
494✔
1446
}
1447

1448
func (m *sqlExecutionStore) populateInternalListConcreteExecutions(
1449
        executions []sqlplugin.ExecutionsRow,
1450
) ([]*p.InternalListConcreteExecutionsEntity, error) {
×
1451

×
1452
        concreteExecutions := make([]*p.InternalListConcreteExecutionsEntity, 0, len(executions))
×
1453
        for _, execution := range executions {
×
1454
                mutableState, err := m.populateWorkflowMutableState(execution)
×
1455
                if err != nil {
×
1456
                        return nil, err
×
1457
                }
×
1458

1459
                concreteExecution := &p.InternalListConcreteExecutionsEntity{
×
1460
                        ExecutionInfo:    mutableState.ExecutionInfo,
×
1461
                        VersionHistories: mutableState.VersionHistories,
×
1462
                }
×
1463
                concreteExecutions = append(concreteExecutions, concreteExecution)
×
1464
        }
1465
        return concreteExecutions, nil
×
1466
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc