• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01909e15-00cc-4a94-bf01-3784d6403d00

27 Jun 2024 01:09AM UTC coverage: 71.431% (-0.1%) from 71.557%
01909e15-00cc-4a94-bf01-3784d6403d00

push

buildkite

web-flow
Refactor/removing cross cluster feature (#6121)

## What changed?

This mostly* removes the cross-cluster feature.

## Background
The Cross-cluster feature was the ability to launch and interact with child workflows in another domain. It included the ability to start child workflows and signal them. The feature allowed child workflows to be launched in the target domain even if it was active in another region.

## Problems
The feature itself was something that very very few of our customers apparently needed, with very few customers interested in the problem of launching child workflows in another cluster, and zero who weren’t able to simply use an activity to make an RPC call to the other domain as one would with any normal workflow.
The feature-itself was quite resource intensive: It was pull-based; spinning up a polling stack which polled the other cluster for work, similar to the replication stack. This polling behaviour made the latency characteristics fairly unpredictable and used considerable DB resources, to the point that we just turned it off. The Uber/Cadence team resolved that were there sufficient demand for the feature in the future, a push based mechanism would probably be significantly preferable.
The feature itself added a nontrivial amount of complexity to the codebase in a few areas such as task processing and domain error handling which introduced difficult to understand bugs such as the child workflow dropping error #5919

Decision to deprecate and alternatives
As of releases June 2024, the feature will be removed. The Cadence team is not aware of any users of the feature outside Uber (as it was broken until mid 2021 anyway), but as an FYI, it will cease to be available.

If this behaviour is desirable, an easy workaround is as previously mentioned: Use an activity to launch or signal the workflows in the other domain and block as needed.

PR details
This is a fairly high-risk refactor so it'll take some time to ... (continued)

118 of 134 new or added lines in 9 files covered. (88.06%)

330 existing lines in 30 files now uncovered.

104674 of 146539 relevant lines covered (71.43%)

2619.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.7
/common/persistence/sql/sql_execution_store.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "bytes"
25
        "context"
26
        "database/sql"
27
        "encoding/json"
28
        "fmt"
29
        "math"
30
        "runtime/debug"
31
        "time"
32

33
        "golang.org/x/sync/errgroup"
34

35
        "github.com/uber/cadence/common"
36
        "github.com/uber/cadence/common/collection"
37
        "github.com/uber/cadence/common/log"
38
        p "github.com/uber/cadence/common/persistence"
39
        "github.com/uber/cadence/common/persistence/serialization"
40
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
41
        "github.com/uber/cadence/common/types"
42
)
43

44
const (
45
        emptyWorkflowID       string = ""
46
        emptyReplicationRunID string = "30000000-5000-f000-f000-000000000000"
47
)
48

49
type sqlExecutionStore struct {
50
        sqlStore
51
        shardID                                int
52
        txExecuteShardLockedFn                 func(context.Context, int, string, int64, func(sqlplugin.Tx) error) error
53
        lockCurrentExecutionIfExistsFn         func(context.Context, sqlplugin.Tx, int, serialization.UUID, string) (*sqlplugin.CurrentExecutionsRow, error)
54
        createOrUpdateCurrentExecutionFn       func(context.Context, sqlplugin.Tx, p.CreateWorkflowMode, int, serialization.UUID, string, serialization.UUID, int, int, string, int64, int64) error
55
        assertNotCurrentExecutionFn            func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID) error
56
        assertRunIDAndUpdateCurrentExecutionFn func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID, serialization.UUID, string, int, int, int64, int64) error
57
        applyWorkflowSnapshotTxAsNewFn         func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowSnapshot, serialization.Parser) error
58
        applyWorkflowMutationTxFn              func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowMutation, serialization.Parser) error
59
        applyWorkflowSnapshotTxAsResetFn       func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowSnapshot, serialization.Parser) error
60
}
61

62
var _ p.ExecutionStore = (*sqlExecutionStore)(nil)
63

64
// NewSQLExecutionStore creates an instance of ExecutionStore
65
func NewSQLExecutionStore(
66
        db sqlplugin.DB,
67
        logger log.Logger,
68
        shardID int,
69
        parser serialization.Parser,
70
        dc *p.DynamicConfiguration,
71
) (p.ExecutionStore, error) {
103✔
72

103✔
73
        store := &sqlExecutionStore{
103✔
74
                shardID:                                shardID,
103✔
75
                lockCurrentExecutionIfExistsFn:         lockCurrentExecutionIfExists,
103✔
76
                createOrUpdateCurrentExecutionFn:       createOrUpdateCurrentExecution,
103✔
77
                assertNotCurrentExecutionFn:            assertNotCurrentExecution,
103✔
78
                assertRunIDAndUpdateCurrentExecutionFn: assertRunIDAndUpdateCurrentExecution,
103✔
79
                applyWorkflowSnapshotTxAsNewFn:         applyWorkflowSnapshotTxAsNew,
103✔
80
                applyWorkflowMutationTxFn:              applyWorkflowMutationTx,
103✔
81
                applyWorkflowSnapshotTxAsResetFn:       applyWorkflowSnapshotTxAsReset,
103✔
82
                sqlStore: sqlStore{
103✔
83
                        db:     db,
103✔
84
                        logger: logger,
103✔
85
                        parser: parser,
103✔
86
                        dc:     dc,
103✔
87
                },
103✔
88
        }
103✔
89
        store.txExecuteShardLockedFn = store.txExecuteShardLocked
103✔
90
        return store, nil
103✔
91
}
103✔
92

93
// txExecuteShardLocked executes f under transaction and with read lock on shard row
94
func (m *sqlExecutionStore) txExecuteShardLocked(
95
        ctx context.Context,
96
        dbShardID int,
97
        operation string,
98
        rangeID int64,
99
        fn func(tx sqlplugin.Tx) error,
100
) error {
3,317✔
101

3,317✔
102
        return m.txExecute(ctx, dbShardID, operation, func(tx sqlplugin.Tx) error {
6,634✔
103
                if err := readLockShard(ctx, tx, m.shardID, rangeID); err != nil {
3,318✔
104
                        return err
1✔
105
                }
1✔
106
                err := fn(tx)
3,316✔
107
                if err != nil {
3,357✔
108
                        return err
41✔
109
                }
41✔
110
                return nil
3,277✔
111
        })
112
}
113

114
func (m *sqlExecutionStore) GetShardID() int {
4,322✔
115
        return m.shardID
4,322✔
116
}
4,322✔
117

118
func (m *sqlExecutionStore) CreateWorkflowExecution(
119
        ctx context.Context,
120
        request *p.InternalCreateWorkflowExecutionRequest,
121
) (response *p.CreateWorkflowExecutionResponse, err error) {
363✔
122
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
363✔
123

363✔
124
        err = m.txExecuteShardLockedFn(ctx, dbShardID, "CreateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
726✔
125
                response, err = m.createWorkflowExecutionTx(ctx, tx, request)
363✔
126
                return err
363✔
127
        })
363✔
128
        return
363✔
129
}
130

131
func (m *sqlExecutionStore) createWorkflowExecutionTx(
132
        ctx context.Context,
133
        tx sqlplugin.Tx,
134
        request *p.InternalCreateWorkflowExecutionRequest,
135
) (*p.CreateWorkflowExecutionResponse, error) {
363✔
136

363✔
137
        newWorkflow := request.NewWorkflowSnapshot
363✔
138
        executionInfo := newWorkflow.ExecutionInfo
363✔
139
        startVersion := newWorkflow.StartVersion
363✔
140
        lastWriteVersion := newWorkflow.LastWriteVersion
363✔
141
        shardID := m.shardID
363✔
142
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
363✔
143
        workflowID := executionInfo.WorkflowID
363✔
144
        runID := serialization.MustParseUUID(executionInfo.RunID)
363✔
145

363✔
146
        if err := p.ValidateCreateWorkflowModeState(
363✔
147
                request.Mode,
363✔
148
                newWorkflow,
363✔
149
        ); err != nil {
365✔
150
                return nil, err
2✔
151
        }
2✔
152

153
        var err error
361✔
154
        var row *sqlplugin.CurrentExecutionsRow
361✔
155
        if row, err = m.lockCurrentExecutionIfExistsFn(ctx, tx, m.shardID, domainID, workflowID); err != nil {
362✔
156
                return nil, err
1✔
157
        }
1✔
158

159
        // current workflow record check
160
        if row != nil {
427✔
161
                // current run ID, last write version, current workflow state check
67✔
162
                switch request.Mode {
67✔
163
                case p.CreateWorkflowModeBrandNew:
39✔
164
                        return nil, &p.WorkflowExecutionAlreadyStartedError{
39✔
165
                                Msg:              fmt.Sprintf("Workflow execution already running. WorkflowId: %v", row.WorkflowID),
39✔
166
                                StartRequestID:   row.CreateRequestID,
39✔
167
                                RunID:            row.RunID.String(),
39✔
168
                                State:            int(row.State),
39✔
169
                                CloseStatus:      int(row.CloseStatus),
39✔
170
                                LastWriteVersion: row.LastWriteVersion,
39✔
171
                        }
39✔
172

173
                case p.CreateWorkflowModeWorkflowIDReuse:
24✔
174
                        if request.PreviousLastWriteVersion != row.LastWriteVersion {
25✔
175
                                return nil, &p.CurrentWorkflowConditionFailedError{
1✔
176
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
1✔
177
                                                "LastWriteVersion: %v, PreviousLastWriteVersion: %v",
1✔
178
                                                workflowID, row.LastWriteVersion, request.PreviousLastWriteVersion),
1✔
179
                                }
1✔
180
                        }
1✔
181
                        if row.State != p.WorkflowStateCompleted {
24✔
182
                                return nil, &p.CurrentWorkflowConditionFailedError{
1✔
183
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
1✔
184
                                                "State: %v, Expected: %v",
1✔
185
                                                workflowID, row.State, p.WorkflowStateCompleted),
1✔
186
                                }
1✔
187
                        }
1✔
188
                        runIDStr := row.RunID.String()
22✔
189
                        if runIDStr != request.PreviousRunID {
23✔
190
                                return nil, &p.CurrentWorkflowConditionFailedError{
1✔
191
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
1✔
192
                                                "RunID: %v, PreviousRunID: %v",
1✔
193
                                                workflowID, runIDStr, request.PreviousRunID),
1✔
194
                                }
1✔
195
                        }
1✔
196

197
                case p.CreateWorkflowModeZombie:
4✔
198
                        // zombie workflow creation with existence of current record, this is a noop
4✔
199
                        if err := assertRunIDMismatch(serialization.MustParseUUID(executionInfo.RunID), row.RunID); err != nil {
5✔
200
                                return nil, err
1✔
201
                        }
1✔
202

203
                case p.CreateWorkflowModeContinueAsNew:
2✔
204
                        runIDStr := row.RunID.String()
2✔
205
                        if runIDStr != request.PreviousRunID {
2✔
206
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
207
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
208
                                                "RunID: %v, PreviousRunID: %v",
×
209
                                                workflowID, runIDStr, request.PreviousRunID),
×
210
                                }
×
211
                        }
×
212

213
                default:
×
214
                        return nil, &types.InternalServiceError{
×
215
                                Message: fmt.Sprintf(
×
216
                                        "CreteWorkflowExecution: unknown mode: %v",
×
217
                                        request.Mode,
×
218
                                ),
×
219
                        }
×
220
                }
221
        }
222

223
        if err := m.createOrUpdateCurrentExecutionFn(
317✔
224
                ctx,
317✔
225
                tx,
317✔
226
                request.Mode,
317✔
227
                m.shardID,
317✔
228
                domainID,
317✔
229
                workflowID,
317✔
230
                runID,
317✔
231
                executionInfo.State,
317✔
232
                executionInfo.CloseStatus,
317✔
233
                executionInfo.CreateRequestID,
317✔
234
                startVersion,
317✔
235
                lastWriteVersion); err != nil {
318✔
236
                return nil, err
1✔
237
        }
1✔
238

239
        if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, &request.NewWorkflowSnapshot, m.parser); err != nil {
319✔
240
                return nil, err
3✔
241
        }
3✔
242

243
        return &p.CreateWorkflowExecutionResponse{}, nil
315✔
244
}
245

246
func (m *sqlExecutionStore) getExecutions(
247
        ctx context.Context,
248
        request *p.InternalGetWorkflowExecutionRequest,
249
        domainID serialization.UUID,
250
        wfID string,
251
        runID serialization.UUID,
252
) ([]sqlplugin.ExecutionsRow, error) {
765✔
253
        executions, err := m.db.SelectFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
765✔
254
                ShardID: m.shardID, DomainID: domainID, WorkflowID: wfID, RunID: runID})
765✔
255

765✔
256
        if err != nil {
1,036✔
257
                if err == sql.ErrNoRows {
541✔
258
                        return nil, &types.EntityNotExistsError{
270✔
259
                                Message: fmt.Sprintf(
270✔
260
                                        "Workflow execution not found.  WorkflowId: %v, RunId: %v",
270✔
261
                                        request.Execution.GetWorkflowID(),
270✔
262
                                        request.Execution.GetRunID(),
270✔
263
                                ),
270✔
264
                        }
270✔
265
                }
270✔
266
                return nil, convertCommonErrors(m.db, "GetWorkflowExecution", "", err)
1✔
267
        }
268

269
        if len(executions) == 0 {
496✔
270
                return nil, &types.EntityNotExistsError{
×
271
                        Message: fmt.Sprintf(
×
272
                                "Workflow execution not found.  WorkflowId: %v, RunId: %v",
×
273
                                request.Execution.GetWorkflowID(),
×
274
                                request.Execution.GetRunID(),
×
275
                        ),
×
276
                }
×
277
        }
×
278

279
        if len(executions) != 1 {
496✔
280
                return nil, &types.InternalServiceError{
×
281
                        Message: "GetWorkflowExecution return more than one results.",
×
282
                }
×
283
        }
×
284
        return executions, nil
496✔
285
}
286

287
func (m *sqlExecutionStore) GetWorkflowExecution(
288
        ctx context.Context,
289
        request *p.InternalGetWorkflowExecutionRequest,
290
) (resp *p.InternalGetWorkflowExecutionResponse, e error) {
772✔
291
        recoverPanic := func(recovered interface{}, err *error) {
6,164✔
292
                if recovered != nil {
5,392✔
293
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
294
                }
×
295
        }
296

297
        domainID := serialization.MustParseUUID(request.DomainID)
772✔
298
        runID := serialization.MustParseUUID(request.Execution.RunID)
772✔
299
        wfID := request.Execution.WorkflowID
772✔
300

772✔
301
        var executions []sqlplugin.ExecutionsRow
772✔
302
        var activityInfos map[int64]*p.InternalActivityInfo
772✔
303
        var timerInfos map[string]*p.TimerInfo
772✔
304
        var childExecutionInfos map[int64]*p.InternalChildExecutionInfo
772✔
305
        var requestCancelInfos map[int64]*p.RequestCancelInfo
772✔
306
        var signalInfos map[int64]*p.SignalInfo
772✔
307
        var bufferedEvents []*p.DataBlob
772✔
308
        var signalsRequested map[string]struct{}
772✔
309

772✔
310
        g, childCtx := errgroup.WithContext(ctx)
772✔
311

772✔
312
        g.Go(func() (e error) {
1,544✔
313
                defer func() { recoverPanic(recover(), &e) }()
1,544✔
314
                activityInfos, e = getActivityInfoMap(
772✔
315
                        childCtx, m.db, m.shardID, domainID, wfID, runID, m.parser)
772✔
316
                return e
772✔
317
        })
318

319
        g.Go(func() (e error) {
1,544✔
320
                defer func() { recoverPanic(recover(), &e) }()
1,544✔
321
                timerInfos, e = getTimerInfoMap(
772✔
322
                        childCtx, m.db, m.shardID, domainID, wfID, runID, m.parser)
772✔
323
                return e
772✔
324
        })
325

326
        g.Go(func() (e error) {
1,544✔
327
                defer func() { recoverPanic(recover(), &e) }()
1,544✔
328
                childExecutionInfos, e = getChildExecutionInfoMap(
772✔
329
                        childCtx, m.db, m.shardID, domainID, wfID, runID, m.parser)
772✔
330
                return e
772✔
331
        })
332

333
        g.Go(func() (e error) {
1,544✔
334
                defer func() { recoverPanic(recover(), &e) }()
1,544✔
335
                requestCancelInfos, e = getRequestCancelInfoMap(
772✔
336
                        childCtx, m.db, m.shardID, domainID, wfID, runID, m.parser)
772✔
337
                return e
772✔
338
        })
339

340
        g.Go(func() (e error) {
1,544✔
341
                defer func() { recoverPanic(recover(), &e) }()
1,544✔
342
                signalInfos, e = getSignalInfoMap(
772✔
343
                        childCtx, m.db, m.shardID, domainID, wfID, runID, m.parser)
772✔
344
                return e
772✔
345
        })
346

347
        g.Go(func() (e error) {
1,544✔
348
                defer func() { recoverPanic(recover(), &e) }()
1,544✔
349
                bufferedEvents, e = getBufferedEvents(
772✔
350
                        childCtx, m.db, m.shardID, domainID, wfID, runID)
772✔
351
                return e
772✔
352
        })
353

354
        g.Go(func() (e error) {
1,544✔
355
                defer func() { recoverPanic(recover(), &e) }()
1,544✔
356
                signalsRequested, e = getSignalsRequested(
772✔
357
                        childCtx, m.db, m.shardID, domainID, wfID, runID)
772✔
358
                return e
772✔
359
        })
360

361
        err := g.Wait()
772✔
362
        if err != nil {
779✔
363
                return nil, err
7✔
364
        }
7✔
365

366
        // there is a race condition with delete workflow. What could happen is that
367
        // a delete workflow transaction can be committed between 2 concurrent read operations
368
        // and in that case we can get checksum error because data is partially read.
369
        // Since checksum is stored in the executions table, we make it the last step of reading,
370
        // in this case, either we read full data with checksum or we don't get checksum and return error.
371
        executions, err = m.getExecutions(ctx, request, domainID, wfID, runID)
765✔
372
        if err != nil {
1,036✔
373
                return nil, err
271✔
374
        }
271✔
375

376
        state, err := m.populateWorkflowMutableState(executions[0])
496✔
377
        if err != nil {
496✔
378
                return nil, &types.InternalServiceError{
×
379
                        Message: fmt.Sprintf("GetWorkflowExecution: failed. Error: %v", err),
×
380
                }
×
381
        }
×
382
        // if we have checksum, we need to make sure the rangeID did not change
383
        // if the rangeID changed, it means the shard ownership might have changed
384
        // and the workflow might have been updated when we read the data, so the data
385
        // we read might not be from a consistent view, the checksum validation might fail
386
        // in that case, we clear the checksum data so that we will not perform the validation
387
        if state.ChecksumData != nil {
499✔
388
                row, err := m.db.SelectFromShards(ctx, &sqlplugin.ShardsFilter{ShardID: int64(m.shardID)})
3✔
389
                if err != nil {
4✔
390
                        return nil, convertCommonErrors(m.db, "GetWorkflowExecution", "", err)
1✔
391
                }
1✔
392
                if row.RangeID != request.RangeID {
3✔
393
                        // The GetWorkflowExecution operation will not be impacted by this. ChecksumData is purely for validation purposes.
1✔
394
                        m.logger.Warn("GetWorkflowExecution's checksum is discarded. The shard might have changed owner.")
1✔
395
                        state.ChecksumData = nil
1✔
396
                }
1✔
397
        }
398

399
        state.ActivityInfos = activityInfos
495✔
400
        state.TimerInfos = timerInfos
495✔
401
        state.ChildExecutionInfos = childExecutionInfos
495✔
402
        state.RequestCancelInfos = requestCancelInfos
495✔
403
        state.SignalInfos = signalInfos
495✔
404
        state.BufferedEvents = bufferedEvents
495✔
405
        state.SignalRequestedIDs = signalsRequested
495✔
406

495✔
407
        return &p.InternalGetWorkflowExecutionResponse{State: state}, nil
495✔
408
}
409

410
func (m *sqlExecutionStore) UpdateWorkflowExecution(
411
        ctx context.Context,
412
        request *p.InternalUpdateWorkflowExecutionRequest,
413
) error {
2,976✔
414
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
2,976✔
415
        return m.txExecuteShardLockedFn(ctx, dbShardID, "UpdateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
5,952✔
416
                return m.updateWorkflowExecutionTx(ctx, tx, request)
2,976✔
417
        })
2,976✔
418
}
419

420
func (m *sqlExecutionStore) updateWorkflowExecutionTx(
421
        ctx context.Context,
422
        tx sqlplugin.Tx,
423
        request *p.InternalUpdateWorkflowExecutionRequest,
424
) error {
2,976✔
425

2,976✔
426
        updateWorkflow := request.UpdateWorkflowMutation
2,976✔
427
        newWorkflow := request.NewWorkflowSnapshot
2,976✔
428

2,976✔
429
        executionInfo := updateWorkflow.ExecutionInfo
2,976✔
430
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
2,976✔
431
        workflowID := executionInfo.WorkflowID
2,976✔
432
        runID := serialization.MustParseUUID(executionInfo.RunID)
2,976✔
433
        shardID := m.shardID
2,976✔
434

2,976✔
435
        if err := p.ValidateUpdateWorkflowModeState(
2,976✔
436
                request.Mode,
2,976✔
437
                updateWorkflow,
2,976✔
438
                newWorkflow,
2,976✔
439
        ); err != nil {
2,977✔
440
                return err
1✔
441
        }
1✔
442

443
        switch request.Mode {
2,975✔
444
        case p.UpdateWorkflowModeIgnoreCurrent:
1✔
445
                // no-op
446
        case p.UpdateWorkflowModeBypassCurrent:
4✔
447
                if err := m.assertNotCurrentExecutionFn(
4✔
448
                        ctx,
4✔
449
                        tx,
4✔
450
                        shardID,
4✔
451
                        domainID,
4✔
452
                        workflowID,
4✔
453
                        runID); err != nil {
5✔
454
                        return err
1✔
455
                }
1✔
456

457
        case p.UpdateWorkflowModeUpdateCurrent:
2,972✔
458
                if newWorkflow != nil {
3,091✔
459
                        newExecutionInfo := newWorkflow.ExecutionInfo
119✔
460
                        startVersion := newWorkflow.StartVersion
119✔
461
                        lastWriteVersion := newWorkflow.LastWriteVersion
119✔
462
                        newDomainID := serialization.MustParseUUID(newExecutionInfo.DomainID)
119✔
463
                        newRunID := serialization.MustParseUUID(newExecutionInfo.RunID)
119✔
464

119✔
465
                        if !bytes.Equal(domainID, newDomainID) {
120✔
466
                                return &types.InternalServiceError{
1✔
467
                                        Message: "UpdateWorkflowExecution: cannot continue as new to another domain",
1✔
468
                                }
1✔
469
                        }
1✔
470

471
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
118✔
472
                                ctx,
118✔
473
                                tx,
118✔
474
                                shardID,
118✔
475
                                domainID,
118✔
476
                                workflowID,
118✔
477
                                newRunID,
118✔
478
                                runID,
118✔
479
                                newWorkflow.ExecutionInfo.CreateRequestID,
118✔
480
                                newWorkflow.ExecutionInfo.State,
118✔
481
                                newWorkflow.ExecutionInfo.CloseStatus,
118✔
482
                                startVersion,
118✔
483
                                lastWriteVersion); err != nil {
118✔
484
                                return err
×
485
                        }
×
486
                } else {
2,855✔
487
                        startVersion := updateWorkflow.StartVersion
2,855✔
488
                        lastWriteVersion := updateWorkflow.LastWriteVersion
2,855✔
489
                        // this is only to update the current record
2,855✔
490
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
2,855✔
491
                                ctx,
2,855✔
492
                                tx,
2,855✔
493
                                shardID,
2,855✔
494
                                domainID,
2,855✔
495
                                workflowID,
2,855✔
496
                                runID,
2,855✔
497
                                runID,
2,855✔
498
                                executionInfo.CreateRequestID,
2,855✔
499
                                executionInfo.State,
2,855✔
500
                                executionInfo.CloseStatus,
2,855✔
501
                                startVersion,
2,855✔
502
                                lastWriteVersion); err != nil {
2,856✔
503
                                return err
1✔
504
                        }
1✔
505
                }
506

507
        default:
×
508
                return &types.InternalServiceError{
×
509
                        Message: fmt.Sprintf("UpdateWorkflowExecution: unknown mode: %v", request.Mode),
×
510
                }
×
511
        }
512

513
        if err := m.applyWorkflowMutationTxFn(ctx, tx, shardID, &updateWorkflow, m.parser); err != nil {
2,973✔
514
                return err
1✔
515
        }
1✔
516
        if newWorkflow != nil {
3,089✔
517
                if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
119✔
518
                        return err
1✔
519
                }
1✔
520
        }
521
        return nil
2,970✔
522
}
523

524
func (m *sqlExecutionStore) ConflictResolveWorkflowExecution(
525
        ctx context.Context,
526
        request *p.InternalConflictResolveWorkflowExecutionRequest,
527
) error {
11✔
528
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
11✔
529
        return m.txExecuteShardLockedFn(ctx, dbShardID, "ConflictResolveWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
22✔
530
                return m.conflictResolveWorkflowExecutionTx(ctx, tx, request)
11✔
531
        })
11✔
532
}
533

534
func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx(
535
        ctx context.Context,
536
        tx sqlplugin.Tx,
537
        request *p.InternalConflictResolveWorkflowExecutionRequest,
538
) error {
11✔
539

11✔
540
        currentWorkflow := request.CurrentWorkflowMutation
11✔
541
        resetWorkflow := request.ResetWorkflowSnapshot
11✔
542
        newWorkflow := request.NewWorkflowSnapshot
11✔
543

11✔
544
        shardID := m.shardID
11✔
545

11✔
546
        domainID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.DomainID)
11✔
547
        workflowID := resetWorkflow.ExecutionInfo.WorkflowID
11✔
548

11✔
549
        if err := p.ValidateConflictResolveWorkflowModeState(
11✔
550
                request.Mode,
11✔
551
                resetWorkflow,
11✔
552
                newWorkflow,
11✔
553
                currentWorkflow,
11✔
554
        ); err != nil {
12✔
555
                return err
1✔
556
        }
1✔
557

558
        switch request.Mode {
10✔
559
        case p.ConflictResolveWorkflowModeBypassCurrent:
5✔
560
                if err := m.assertNotCurrentExecutionFn(
5✔
561
                        ctx,
5✔
562
                        tx,
5✔
563
                        shardID,
5✔
564
                        domainID,
5✔
565
                        workflowID,
5✔
566
                        serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)); err != nil {
6✔
567
                        return err
1✔
568
                }
1✔
569

570
        case p.ConflictResolveWorkflowModeUpdateCurrent:
7✔
571
                executionInfo := resetWorkflow.ExecutionInfo
7✔
572
                startVersion := resetWorkflow.StartVersion
7✔
573
                lastWriteVersion := resetWorkflow.LastWriteVersion
7✔
574
                if newWorkflow != nil {
12✔
575
                        executionInfo = newWorkflow.ExecutionInfo
5✔
576
                        startVersion = newWorkflow.StartVersion
5✔
577
                        lastWriteVersion = newWorkflow.LastWriteVersion
5✔
578
                }
5✔
579
                runID := serialization.MustParseUUID(executionInfo.RunID)
7✔
580
                createRequestID := executionInfo.CreateRequestID
7✔
581
                state := executionInfo.State
7✔
582
                closeStatus := executionInfo.CloseStatus
7✔
583

7✔
584
                if currentWorkflow != nil {
11✔
585
                        prevRunID := serialization.MustParseUUID(currentWorkflow.ExecutionInfo.RunID)
4✔
586

4✔
587
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
4✔
588
                                ctx,
4✔
589
                                tx,
4✔
590
                                m.shardID,
4✔
591
                                domainID,
4✔
592
                                workflowID,
4✔
593
                                runID,
4✔
594
                                prevRunID,
4✔
595
                                createRequestID,
4✔
596
                                state,
4✔
597
                                closeStatus,
4✔
598
                                startVersion,
4✔
599
                                lastWriteVersion); err != nil {
5✔
600
                                return err
1✔
601
                        }
1✔
602
                } else {
3✔
603
                        // reset workflow is current
3✔
604
                        prevRunID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)
3✔
605

3✔
606
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
3✔
607
                                ctx,
3✔
608
                                tx,
3✔
609
                                m.shardID,
3✔
610
                                domainID,
3✔
611
                                workflowID,
3✔
612
                                runID,
3✔
613
                                prevRunID,
3✔
614
                                createRequestID,
3✔
615
                                state,
3✔
616
                                closeStatus,
3✔
617
                                startVersion,
3✔
618
                                lastWriteVersion); err != nil {
3✔
619
                                return err
×
620
                        }
×
621
                }
622

623
        default:
×
624
                return &types.InternalServiceError{
×
625
                        Message: fmt.Sprintf("ConflictResolveWorkflowExecution: unknown mode: %v", request.Mode),
×
626
                }
×
627
        }
628

629
        if err := m.applyWorkflowSnapshotTxAsResetFn(ctx, tx, shardID, &resetWorkflow, m.parser); err != nil {
9✔
630
                return err
1✔
631
        }
1✔
632
        if currentWorkflow != nil {
10✔
633
                if err := m.applyWorkflowMutationTxFn(ctx, tx, shardID, currentWorkflow, m.parser); err != nil {
4✔
634
                        return err
1✔
635
                }
1✔
636
        }
637
        if newWorkflow != nil {
9✔
638
                if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
4✔
639
                        return err
1✔
640
                }
1✔
641
        }
642
        return nil
5✔
643
}
644

645
func (m *sqlExecutionStore) DeleteWorkflowExecution(
646
        ctx context.Context,
647
        request *p.DeleteWorkflowExecutionRequest,
648
) error {
38✔
649
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
38✔
650
        domainID := serialization.MustParseUUID(request.DomainID)
38✔
651
        runID := serialization.MustParseUUID(request.RunID)
38✔
652
        wfID := request.WorkflowID
38✔
653
        return m.txExecute(ctx, dbShardID, "DeleteWorkflowExecution", func(tx sqlplugin.Tx) error {
76✔
654
                if _, err := tx.DeleteFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
38✔
655
                        ShardID:    m.shardID,
38✔
656
                        DomainID:   domainID,
38✔
657
                        WorkflowID: wfID,
38✔
658
                        RunID:      runID,
38✔
659
                }); err != nil {
39✔
660
                        return convertCommonErrors(tx, "DeleteWorkflowExecution", "", err)
1✔
661
                }
1✔
662
                if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
37✔
663
                        ShardID:    int64(m.shardID),
37✔
664
                        DomainID:   domainID,
37✔
665
                        WorkflowID: wfID,
37✔
666
                        RunID:      runID,
37✔
667
                }); err != nil {
37✔
668
                        return convertCommonErrors(tx, "DeleteFromActivityInfoMaps", "", err)
×
669
                }
×
670
                if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
37✔
671
                        ShardID:    int64(m.shardID),
37✔
672
                        DomainID:   domainID,
37✔
673
                        WorkflowID: wfID,
37✔
674
                        RunID:      runID,
37✔
675
                }); err != nil {
37✔
676
                        return convertCommonErrors(tx, "DeleteFromTimerInfoMaps", "", err)
×
677
                }
×
678
                if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
37✔
679
                        ShardID:    int64(m.shardID),
37✔
680
                        DomainID:   domainID,
37✔
681
                        WorkflowID: wfID,
37✔
682
                        RunID:      runID,
37✔
683
                }); err != nil {
37✔
684
                        return convertCommonErrors(tx, "DeleteFromChildExecutionInfoMaps", "", err)
×
685
                }
×
686
                if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
37✔
687
                        ShardID:    int64(m.shardID),
37✔
688
                        DomainID:   domainID,
37✔
689
                        WorkflowID: wfID,
37✔
690
                        RunID:      runID,
37✔
691
                }); err != nil {
37✔
692
                        return convertCommonErrors(tx, "DeleteFromRequestCancelInfoMaps", "", err)
×
693
                }
×
694
                if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
37✔
695
                        ShardID:    int64(m.shardID),
37✔
696
                        DomainID:   domainID,
37✔
697
                        WorkflowID: wfID,
37✔
698
                        RunID:      runID,
37✔
699
                }); err != nil {
37✔
700
                        return convertCommonErrors(tx, "DeleteFromSignalInfoMaps", "", err)
×
701
                }
×
702
                if _, err := tx.DeleteFromBufferedEvents(ctx, &sqlplugin.BufferedEventsFilter{
37✔
703
                        ShardID:    m.shardID,
37✔
704
                        DomainID:   domainID,
37✔
705
                        WorkflowID: wfID,
37✔
706
                        RunID:      runID,
37✔
707
                }); err != nil {
37✔
708
                        return convertCommonErrors(tx, "DeleteFromBufferedEvents", "", err)
×
709
                }
×
710
                if _, err := tx.DeleteFromSignalsRequestedSets(ctx, &sqlplugin.SignalsRequestedSetsFilter{
37✔
711
                        ShardID:    int64(m.shardID),
37✔
712
                        DomainID:   domainID,
37✔
713
                        WorkflowID: wfID,
37✔
714
                        RunID:      runID,
37✔
715
                }); err != nil {
37✔
716
                        return convertCommonErrors(tx, "DeleteFromSignalsRequestedSets", "", err)
×
717
                }
×
718
                return nil
37✔
719
        })
720
}
721

722
// its possible for a new run of the same workflow to have started after the run we are deleting
723
// here was finished. In that case, current_executions table will have the same workflowID but different
724
// runID. The following code will delete the row from current_executions if and only if the runID is
725
// same as the one we are trying to delete here
726
func (m *sqlExecutionStore) DeleteCurrentWorkflowExecution(
727
        ctx context.Context,
728
        request *p.DeleteCurrentWorkflowExecutionRequest,
729
) error {
38✔
730

38✔
731
        domainID := serialization.MustParseUUID(request.DomainID)
38✔
732
        runID := serialization.MustParseUUID(request.RunID)
38✔
733
        _, err := m.db.DeleteFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
38✔
734
                ShardID:    int64(m.shardID),
38✔
735
                DomainID:   domainID,
38✔
736
                WorkflowID: request.WorkflowID,
38✔
737
                RunID:      runID,
38✔
738
        })
38✔
739
        if err != nil {
39✔
740
                return convertCommonErrors(m.db, "DeleteCurrentWorkflowExecution", "", err)
1✔
741
        }
1✔
742
        return nil
37✔
743
}
744

745
func (m *sqlExecutionStore) GetCurrentExecution(
746
        ctx context.Context,
747
        request *p.GetCurrentExecutionRequest,
748
) (*p.GetCurrentExecutionResponse, error) {
126✔
749

126✔
750
        row, err := m.db.SelectFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
126✔
751
                ShardID:    int64(m.shardID),
126✔
752
                DomainID:   serialization.MustParseUUID(request.DomainID),
126✔
753
                WorkflowID: request.WorkflowID,
126✔
754
        })
126✔
755
        if err != nil {
135✔
756
                return nil, convertCommonErrors(m.db, "GetCurrentExecution", "", err)
9✔
757
        }
9✔
758
        return &p.GetCurrentExecutionResponse{
119✔
759
                StartRequestID:   row.CreateRequestID,
119✔
760
                RunID:            row.RunID.String(),
119✔
761
                State:            int(row.State),
119✔
762
                CloseStatus:      int(row.CloseStatus),
119✔
763
                LastWriteVersion: row.LastWriteVersion,
119✔
764
        }, nil
119✔
765
}
766

767
func (m *sqlExecutionStore) ListCurrentExecutions(
768
        _ context.Context,
769
        _ *p.ListCurrentExecutionsRequest,
770
) (*p.ListCurrentExecutionsResponse, error) {
×
771
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
772
}
×
773

774
func (m *sqlExecutionStore) IsWorkflowExecutionExists(
775
        _ context.Context,
776
        _ *p.IsWorkflowExecutionExistsRequest,
777
) (*p.IsWorkflowExecutionExistsResponse, error) {
×
778
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
779
}
×
780

781
func (m *sqlExecutionStore) ListConcreteExecutions(
782
        ctx context.Context,
783
        request *p.ListConcreteExecutionsRequest,
784
) (*p.InternalListConcreteExecutionsResponse, error) {
×
785

×
786
        filter := &sqlplugin.ExecutionsFilter{}
×
787
        if len(request.PageToken) > 0 {
×
788
                err := gobDeserialize(request.PageToken, &filter)
×
789
                if err != nil {
×
790
                        return nil, &types.InternalServiceError{
×
791
                                Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
792
                        }
×
793
                }
×
794
        } else {
×
795
                filter = &sqlplugin.ExecutionsFilter{
×
796
                        ShardID:    m.shardID,
×
797
                        WorkflowID: "",
×
798
                }
×
799
        }
×
800
        filter.Size = request.PageSize
×
801

×
802
        executions, err := m.db.SelectFromExecutions(ctx, filter)
×
803
        if err != nil {
×
804
                if err == sql.ErrNoRows {
×
805
                        return &p.InternalListConcreteExecutionsResponse{}, nil
×
806
                }
×
807
                return nil, convertCommonErrors(m.db, "ListConcreteExecutions", "", err)
×
808
        }
809

810
        if len(executions) == 0 {
×
811
                return &p.InternalListConcreteExecutionsResponse{}, nil
×
812
        }
×
813
        lastExecution := executions[len(executions)-1]
×
814
        nextFilter := &sqlplugin.ExecutionsFilter{
×
815
                ShardID:    m.shardID,
×
816
                WorkflowID: lastExecution.WorkflowID,
×
817
        }
×
818
        token, err := gobSerialize(nextFilter)
×
819
        if err != nil {
×
820
                return nil, &types.InternalServiceError{
×
821
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
822
                }
×
823
        }
×
824
        concreteExecutions, err := m.populateInternalListConcreteExecutions(executions)
×
825
        if err != nil {
×
826
                return nil, &types.InternalServiceError{
×
827
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
828
                }
×
829
        }
×
830

831
        return &p.InternalListConcreteExecutionsResponse{
×
832
                Executions:    concreteExecutions,
×
833
                NextPageToken: token,
×
834
        }, nil
×
835
}
836

837
func (m *sqlExecutionStore) GetTransferTasks(
838
        ctx context.Context,
839
        request *p.GetTransferTasksRequest,
840
) (*p.GetTransferTasksResponse, error) {
1,587✔
841
        minReadLevel := request.ReadLevel
1,587✔
842
        if len(request.NextPageToken) > 0 {
1,590✔
843
                readLevel, err := deserializePageToken(request.NextPageToken)
3✔
844
                if err != nil {
3✔
845
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "failed to deserialize page token", err)
×
846
                }
×
847
                minReadLevel = readLevel
3✔
848
        }
849
        rows, err := m.db.SelectFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
1,587✔
850
                ShardID:   m.shardID,
1,587✔
851
                MinTaskID: minReadLevel,
1,587✔
852
                MaxTaskID: request.MaxReadLevel,
1,587✔
853
                PageSize:  request.BatchSize,
1,587✔
854
        })
1,587✔
855
        if err != nil {
1,588✔
856
                if err != sql.ErrNoRows {
2✔
857
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "", err)
1✔
858
                }
1✔
859
        }
860
        resp := &p.GetTransferTasksResponse{Tasks: make([]*p.TransferTaskInfo, len(rows))}
1,586✔
861
        for i, row := range rows {
5,280✔
862
                info, err := m.parser.TransferTaskInfoFromBlob(row.Data, row.DataEncoding)
3,694✔
863
                if err != nil {
3,695✔
864
                        return nil, err
1✔
865
                }
1✔
866
                resp.Tasks[i] = &p.TransferTaskInfo{
3,693✔
867
                        TaskID:                  row.TaskID,
3,693✔
868
                        DomainID:                info.DomainID.String(),
3,693✔
869
                        WorkflowID:              info.GetWorkflowID(),
3,693✔
870
                        RunID:                   info.RunID.String(),
3,693✔
871
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
3,693✔
872
                        TargetDomainID:          info.TargetDomainID.String(),
3,693✔
873
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
3,693✔
874
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
3,693✔
875
                        TargetRunID:             info.TargetRunID.String(),
3,693✔
876
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
3,693✔
877
                        TaskList:                info.GetTaskList(),
3,693✔
878
                        TaskType:                int(info.GetTaskType()),
3,693✔
879
                        ScheduleID:              info.GetScheduleID(),
3,693✔
880
                        Version:                 info.GetVersion(),
3,693✔
881
                }
3,693✔
882
        }
883
        if len(rows) > 0 {
3,074✔
884
                lastTaskID := rows[len(rows)-1].TaskID
1,489✔
885
                if lastTaskID < request.MaxReadLevel {
1,494✔
886
                        resp.NextPageToken = serializePageToken(lastTaskID)
5✔
887
                }
5✔
888
        }
889
        return resp, nil
1,585✔
890
}
891

892
func (m *sqlExecutionStore) CompleteTransferTask(
893
        ctx context.Context,
894
        request *p.CompleteTransferTaskRequest,
895
) error {
2✔
896

2✔
897
        if _, err := m.db.DeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
2✔
898
                ShardID: m.shardID,
2✔
899
                TaskID:  request.TaskID,
2✔
900
        }); err != nil {
3✔
901
                return convertCommonErrors(m.db, "CompleteTransferTask", "", err)
1✔
902
        }
1✔
903
        return nil
1✔
904
}
905

906
func (m *sqlExecutionStore) RangeCompleteTransferTask(
907
        ctx context.Context,
908
        request *p.RangeCompleteTransferTaskRequest,
909
) (*p.RangeCompleteTransferTaskResponse, error) {
67✔
910
        result, err := m.db.RangeDeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
67✔
911
                ShardID:   m.shardID,
67✔
912
                MinTaskID: request.ExclusiveBeginTaskID,
67✔
913
                MaxTaskID: request.InclusiveEndTaskID,
67✔
914
                PageSize:  request.PageSize,
67✔
915
        })
67✔
916
        if err != nil {
68✔
917
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
1✔
918
        }
1✔
919
        rowsDeleted, err := result.RowsAffected()
66✔
920
        if err != nil {
66✔
921
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
×
922
        }
×
923
        return &p.RangeCompleteTransferTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
66✔
924
}
925

926
func (m *sqlExecutionStore) GetCrossClusterTasks(
927
        ctx context.Context,
928
        request *p.GetCrossClusterTasksRequest,
UNCOV
929
) (*p.GetCrossClusterTasksResponse, error) {
×
UNCOV
930
        minReadLevel := request.ReadLevel
×
UNCOV
931
        if len(request.NextPageToken) > 0 {
×
UNCOV
932
                readLevel, err := deserializePageToken(request.NextPageToken)
×
UNCOV
933
                if err != nil {
×
934
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "failed to deserialize page token", err)
×
935
                }
×
UNCOV
936
                minReadLevel = readLevel
×
937
        }
UNCOV
938
        rows, err := m.db.SelectFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
×
UNCOV
939
                TargetCluster: request.TargetCluster,
×
UNCOV
940
                ShardID:       m.shardID,
×
UNCOV
941
                MinTaskID:     minReadLevel,
×
UNCOV
942
                MaxTaskID:     request.MaxReadLevel,
×
UNCOV
943
                PageSize:      request.BatchSize,
×
UNCOV
944
        })
×
UNCOV
945
        if err != nil {
×
UNCOV
946
                if err != sql.ErrNoRows {
×
UNCOV
947
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "", err)
×
UNCOV
948
                }
×
949
        }
UNCOV
950
        resp := &p.GetCrossClusterTasksResponse{Tasks: make([]*p.CrossClusterTaskInfo, len(rows))}
×
UNCOV
951
        for i, row := range rows {
×
UNCOV
952
                info, err := m.parser.CrossClusterTaskInfoFromBlob(row.Data, row.DataEncoding)
×
UNCOV
953
                if err != nil {
×
UNCOV
954
                        return nil, err
×
UNCOV
955
                }
×
UNCOV
956
                resp.Tasks[i] = &p.CrossClusterTaskInfo{
×
UNCOV
957
                        TaskID:                  row.TaskID,
×
UNCOV
958
                        DomainID:                info.DomainID.String(),
×
UNCOV
959
                        WorkflowID:              info.GetWorkflowID(),
×
UNCOV
960
                        RunID:                   info.RunID.String(),
×
UNCOV
961
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
×
UNCOV
962
                        TargetDomainID:          info.TargetDomainID.String(),
×
UNCOV
963
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
×
UNCOV
964
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
×
UNCOV
965
                        TargetRunID:             info.TargetRunID.String(),
×
UNCOV
966
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
×
UNCOV
967
                        TaskList:                info.GetTaskList(),
×
UNCOV
968
                        TaskType:                int(info.GetTaskType()),
×
UNCOV
969
                        ScheduleID:              info.GetScheduleID(),
×
UNCOV
970
                        Version:                 info.GetVersion(),
×
UNCOV
971
                }
×
972
        }
UNCOV
973
        if len(rows) > 0 {
×
UNCOV
974
                lastTaskID := rows[len(rows)-1].TaskID
×
UNCOV
975
                if lastTaskID < request.MaxReadLevel {
×
UNCOV
976
                        resp.NextPageToken = serializePageToken(lastTaskID)
×
UNCOV
977
                }
×
978
        }
UNCOV
979
        return resp, nil
×
980

981
}
982

983
func (m *sqlExecutionStore) CompleteCrossClusterTask(
984
        ctx context.Context,
985
        request *p.CompleteCrossClusterTaskRequest,
UNCOV
986
) error {
×
UNCOV
987
        if _, err := m.db.DeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
×
UNCOV
988
                TargetCluster: request.TargetCluster,
×
UNCOV
989
                ShardID:       m.shardID,
×
UNCOV
990
                TaskID:        request.TaskID,
×
UNCOV
991
        }); err != nil {
×
UNCOV
992
                return convertCommonErrors(m.db, "CompleteCrossClusterTask", "", err)
×
UNCOV
993
        }
×
UNCOV
994
        return nil
×
995
}
996

997
func (m *sqlExecutionStore) RangeCompleteCrossClusterTask(
998
        ctx context.Context,
999
        request *p.RangeCompleteCrossClusterTaskRequest,
UNCOV
1000
) (*p.RangeCompleteCrossClusterTaskResponse, error) {
×
UNCOV
1001
        result, err := m.db.RangeDeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
×
UNCOV
1002
                TargetCluster: request.TargetCluster,
×
UNCOV
1003
                ShardID:       m.shardID,
×
UNCOV
1004
                MinTaskID:     request.ExclusiveBeginTaskID,
×
UNCOV
1005
                MaxTaskID:     request.InclusiveEndTaskID,
×
UNCOV
1006
                PageSize:      request.PageSize,
×
UNCOV
1007
        })
×
UNCOV
1008
        if err != nil {
×
UNCOV
1009
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
×
UNCOV
1010
        }
×
UNCOV
1011
        rowsDeleted, err := result.RowsAffected()
×
UNCOV
1012
        if err != nil {
×
1013
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
×
1014
        }
×
UNCOV
1015
        return &p.RangeCompleteCrossClusterTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
×
1016
}
1017

1018
func (m *sqlExecutionStore) GetReplicationTasks(
1019
        ctx context.Context,
1020
        request *p.GetReplicationTasksRequest,
1021
) (*p.InternalGetReplicationTasksResponse, error) {
73✔
1022

73✔
1023
        readLevel, maxReadLevelInclusive, err := getReadLevels(request)
73✔
1024
        if err != nil {
73✔
1025
                return nil, err
×
1026
        }
×
1027

1028
        rows, err := m.db.SelectFromReplicationTasks(
73✔
1029
                ctx,
73✔
1030
                &sqlplugin.ReplicationTasksFilter{
73✔
1031
                        ShardID:   m.shardID,
73✔
1032
                        MinTaskID: readLevel,
73✔
1033
                        MaxTaskID: maxReadLevelInclusive,
73✔
1034
                        PageSize:  request.BatchSize,
73✔
1035
                })
73✔
1036

73✔
1037
        switch err {
73✔
1038
        case nil:
72✔
1039
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
72✔
1040
        case sql.ErrNoRows:
×
1041
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1042
        default:
1✔
1043
                return nil, convertCommonErrors(m.db, "GetReplicationTasks", "", err)
1✔
1044
        }
1045
}
1046

1047
func getReadLevels(request *p.GetReplicationTasksRequest) (readLevel int64, maxReadLevelInclusive int64, err error) {
76✔
1048
        readLevel = request.ReadLevel
76✔
1049
        if len(request.NextPageToken) > 0 {
84✔
1050
                readLevel, err = deserializePageToken(request.NextPageToken)
8✔
1051
                if err != nil {
8✔
1052
                        return 0, 0, err
×
1053
                }
×
1054
        }
1055

1056
        maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxReadLevel)
76✔
1057
        return readLevel, maxReadLevelInclusive, nil
76✔
1058
}
1059

1060
func (m *sqlExecutionStore) populateGetReplicationTasksResponse(
1061
        rows []sqlplugin.ReplicationTasksRow,
1062
        requestMaxReadLevel int64,
1063
) (*p.InternalGetReplicationTasksResponse, error) {
74✔
1064
        if len(rows) == 0 {
144✔
1065
                return &p.InternalGetReplicationTasksResponse{}, nil
70✔
1066
        }
70✔
1067

1068
        var tasks = make([]*p.InternalReplicationTaskInfo, len(rows))
6✔
1069
        for i, row := range rows {
12✔
1070
                info, err := m.parser.ReplicationTaskInfoFromBlob(row.Data, row.DataEncoding)
6✔
1071
                if err != nil {
8✔
1072
                        return nil, err
2✔
1073
                }
2✔
1074

1075
                tasks[i] = &p.InternalReplicationTaskInfo{
4✔
1076
                        TaskID:            row.TaskID,
4✔
1077
                        DomainID:          info.DomainID.String(),
4✔
1078
                        WorkflowID:        info.GetWorkflowID(),
4✔
1079
                        RunID:             info.RunID.String(),
4✔
1080
                        TaskType:          int(info.GetTaskType()),
4✔
1081
                        FirstEventID:      info.GetFirstEventID(),
4✔
1082
                        NextEventID:       info.GetNextEventID(),
4✔
1083
                        Version:           info.GetVersion(),
4✔
1084
                        ScheduledID:       info.GetScheduledID(),
4✔
1085
                        BranchToken:       info.GetBranchToken(),
4✔
1086
                        NewRunBranchToken: info.GetNewRunBranchToken(),
4✔
1087
                        CreationTime:      info.GetCreationTimestamp(),
4✔
1088
                }
4✔
1089
        }
1090
        var nextPageToken []byte
4✔
1091
        lastTaskID := rows[len(rows)-1].TaskID
4✔
1092
        if lastTaskID < requestMaxReadLevel {
8✔
1093
                nextPageToken = serializePageToken(lastTaskID)
4✔
1094
        }
4✔
1095
        return &p.InternalGetReplicationTasksResponse{
4✔
1096
                Tasks:         tasks,
4✔
1097
                NextPageToken: nextPageToken,
4✔
1098
        }, nil
4✔
1099
}
1100

1101
func (m *sqlExecutionStore) CompleteReplicationTask(
1102
        ctx context.Context,
1103
        request *p.CompleteReplicationTaskRequest,
1104
) error {
2✔
1105

2✔
1106
        if _, err := m.db.DeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
2✔
1107
                ShardID: m.shardID,
2✔
1108
                TaskID:  request.TaskID,
2✔
1109
        }); err != nil {
3✔
1110
                return convertCommonErrors(m.db, "CompleteReplicationTask", "", err)
1✔
1111
        }
1✔
1112
        return nil
1✔
1113
}
1114

1115
func (m *sqlExecutionStore) RangeCompleteReplicationTask(
1116
        ctx context.Context,
1117
        request *p.RangeCompleteReplicationTaskRequest,
1118
) (*p.RangeCompleteReplicationTaskResponse, error) {
77✔
1119
        result, err := m.db.RangeDeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
77✔
1120
                ShardID:            m.shardID,
77✔
1121
                InclusiveEndTaskID: request.InclusiveEndTaskID,
77✔
1122
                PageSize:           request.PageSize,
77✔
1123
        })
77✔
1124
        if err != nil {
78✔
1125
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
1✔
1126
        }
1✔
1127
        rowsDeleted, err := result.RowsAffected()
76✔
1128
        if err != nil {
76✔
1129
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
×
1130
        }
×
1131
        return &p.RangeCompleteReplicationTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
76✔
1132
}
1133

1134
func (m *sqlExecutionStore) GetReplicationTasksFromDLQ(
1135
        ctx context.Context,
1136
        request *p.GetReplicationTasksFromDLQRequest,
1137
) (*p.InternalGetReplicationTasksFromDLQResponse, error) {
5✔
1138

5✔
1139
        readLevel, maxReadLevelInclusive, err := getReadLevels(&request.GetReplicationTasksRequest)
5✔
1140
        if err != nil {
5✔
1141
                return nil, err
×
1142
        }
×
1143

1144
        filter := sqlplugin.ReplicationTasksFilter{
5✔
1145
                ShardID:   m.shardID,
5✔
1146
                MinTaskID: readLevel,
5✔
1147
                MaxTaskID: maxReadLevelInclusive,
5✔
1148
                PageSize:  request.BatchSize,
5✔
1149
        }
5✔
1150
        rows, err := m.db.SelectFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
5✔
1151
                ReplicationTasksFilter: filter,
5✔
1152
                SourceClusterName:      request.SourceClusterName,
5✔
1153
        })
5✔
1154

5✔
1155
        switch err {
5✔
1156
        case nil:
4✔
1157
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
4✔
1158
        case sql.ErrNoRows:
×
1159
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1160
        default:
1✔
1161
                return nil, convertCommonErrors(m.db, "GetReplicationTasksFromDLQ", "", err)
1✔
1162
        }
1163
}
1164

1165
func (m *sqlExecutionStore) GetReplicationDLQSize(
1166
        ctx context.Context,
1167
        request *p.GetReplicationDLQSizeRequest,
1168
) (*p.GetReplicationDLQSizeResponse, error) {
11✔
1169

11✔
1170
        size, err := m.db.SelectFromReplicationDLQ(ctx, &sqlplugin.ReplicationTaskDLQFilter{
11✔
1171
                SourceClusterName: request.SourceClusterName,
11✔
1172
                ShardID:           m.shardID,
11✔
1173
        })
11✔
1174

11✔
1175
        switch err {
11✔
1176
        case nil:
9✔
1177
                return &p.GetReplicationDLQSizeResponse{
9✔
1178
                        Size: size,
9✔
1179
                }, nil
9✔
1180
        case sql.ErrNoRows:
1✔
1181
                return &p.GetReplicationDLQSizeResponse{
1✔
1182
                        Size: 0,
1✔
1183
                }, nil
1✔
1184
        default:
1✔
1185
                return nil, convertCommonErrors(m.db, "GetReplicationDLQSize", "", err)
1✔
1186
        }
1187
}
1188

1189
func (m *sqlExecutionStore) DeleteReplicationTaskFromDLQ(
1190
        ctx context.Context,
1191
        request *p.DeleteReplicationTaskFromDLQRequest,
1192
) error {
2✔
1193

2✔
1194
        filter := sqlplugin.ReplicationTasksFilter{
2✔
1195
                ShardID: m.shardID,
2✔
1196
                TaskID:  request.TaskID,
2✔
1197
        }
2✔
1198

2✔
1199
        if _, err := m.db.DeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
2✔
1200
                ReplicationTasksFilter: filter,
2✔
1201
                SourceClusterName:      request.SourceClusterName,
2✔
1202
        }); err != nil {
3✔
1203
                return convertCommonErrors(m.db, "DeleteReplicationTaskFromDLQ", "", err)
1✔
1204
        }
1✔
1205
        return nil
1✔
1206
}
1207

1208
func (m *sqlExecutionStore) RangeDeleteReplicationTaskFromDLQ(
1209
        ctx context.Context,
1210
        request *p.RangeDeleteReplicationTaskFromDLQRequest,
1211
) (*p.RangeDeleteReplicationTaskFromDLQResponse, error) {
2✔
1212
        filter := sqlplugin.ReplicationTasksFilter{
2✔
1213
                ShardID:            m.shardID,
2✔
1214
                TaskID:             request.ExclusiveBeginTaskID,
2✔
1215
                InclusiveEndTaskID: request.InclusiveEndTaskID,
2✔
1216
                PageSize:           request.PageSize,
2✔
1217
        }
2✔
1218
        result, err := m.db.RangeDeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
2✔
1219
                ReplicationTasksFilter: filter,
2✔
1220
                SourceClusterName:      request.SourceClusterName,
2✔
1221
        })
2✔
1222
        if err != nil {
3✔
1223
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
1✔
1224
        }
1✔
1225
        rowsDeleted, err := result.RowsAffected()
1✔
1226
        if err != nil {
1✔
1227
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
×
1228
        }
×
1229
        return &p.RangeDeleteReplicationTaskFromDLQResponse{TasksCompleted: int(rowsDeleted)}, nil
1✔
1230
}
1231

1232
func (m *sqlExecutionStore) CreateFailoverMarkerTasks(
1233
        ctx context.Context,
1234
        request *p.CreateFailoverMarkersRequest,
1235
) error {
5✔
1236
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
5✔
1237
        return m.txExecuteShardLockedFn(ctx, dbShardID, "CreateFailoverMarkerTasks", request.RangeID, func(tx sqlplugin.Tx) error {
10✔
1238
                replicationTasksRows := make([]sqlplugin.ReplicationTasksRow, len(request.Markers))
5✔
1239
                for i, task := range request.Markers {
10✔
1240
                        blob, err := m.parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{
5✔
1241
                                DomainID:                serialization.MustParseUUID(task.DomainID),
5✔
1242
                                WorkflowID:              emptyWorkflowID,
5✔
1243
                                RunID:                   serialization.MustParseUUID(emptyReplicationRunID),
5✔
1244
                                TaskType:                int16(task.GetType()),
5✔
1245
                                FirstEventID:            common.EmptyEventID,
5✔
1246
                                NextEventID:             common.EmptyEventID,
5✔
1247
                                Version:                 task.GetVersion(),
5✔
1248
                                ScheduledID:             common.EmptyEventID,
5✔
1249
                                EventStoreVersion:       p.EventStoreVersion,
5✔
1250
                                NewRunEventStoreVersion: p.EventStoreVersion,
5✔
1251
                                BranchToken:             nil,
5✔
1252
                                NewRunBranchToken:       nil,
5✔
1253
                                CreationTimestamp:       task.GetVisibilityTimestamp(),
5✔
1254
                        })
5✔
1255
                        if err != nil {
6✔
1256
                                return err
1✔
1257
                        }
1✔
1258
                        replicationTasksRows[i].ShardID = m.shardID
4✔
1259
                        replicationTasksRows[i].TaskID = task.GetTaskID()
4✔
1260
                        replicationTasksRows[i].Data = blob.Data
4✔
1261
                        replicationTasksRows[i].DataEncoding = string(blob.Encoding)
4✔
1262
                }
1263
                result, err := tx.InsertIntoReplicationTasks(ctx, replicationTasksRows)
4✔
1264
                if err != nil {
5✔
1265
                        return convertCommonErrors(tx, "CreateFailoverMarkerTasks", "", err)
1✔
1266
                }
1✔
1267
                rowsAffected, err := result.RowsAffected()
3✔
1268
                if err != nil {
4✔
1269
                        return &types.InternalServiceError{Message: fmt.Sprintf("CreateFailoverMarkerTasks failed. Could not verify number of rows inserted. Error: %v", err)}
1✔
1270
                }
1✔
1271
                if int(rowsAffected) != len(replicationTasksRows) {
3✔
1272
                        return &types.InternalServiceError{Message: fmt.Sprintf("CreateFailoverMarkerTasks failed. Inserted %v instead of %v rows into replication_tasks.", rowsAffected, len(replicationTasksRows))}
1✔
1273
                }
1✔
1274
                return nil
1✔
1275
        })
1276
}
1277

1278
type timerTaskPageToken struct {
1279
        TaskID    int64
1280
        Timestamp time.Time
1281
}
1282

1283
func (t *timerTaskPageToken) serialize() ([]byte, error) {
526✔
1284
        return json.Marshal(t)
526✔
1285
}
526✔
1286

1287
func (t *timerTaskPageToken) deserialize(payload []byte) error {
3✔
1288
        return json.Unmarshal(payload, t)
3✔
1289
}
3✔
1290

1291
func (m *sqlExecutionStore) GetTimerIndexTasks(
1292
        ctx context.Context,
1293
        request *p.GetTimerIndexTasksRequest,
1294
) (*p.GetTimerIndexTasksResponse, error) {
2,196✔
1295

2,196✔
1296
        pageToken := &timerTaskPageToken{TaskID: math.MinInt64, Timestamp: request.MinTimestamp}
2,196✔
1297
        if len(request.NextPageToken) > 0 {
2,199✔
1298
                if err := pageToken.deserialize(request.NextPageToken); err != nil {
3✔
1299
                        return nil, &types.InternalServiceError{
×
1300
                                Message: fmt.Sprintf("error deserializing timerTaskPageToken: %v", err),
×
1301
                        }
×
1302
                }
×
1303
        }
1304

1305
        rows, err := m.db.SelectFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
2,196✔
1306
                ShardID:                m.shardID,
2,196✔
1307
                MinVisibilityTimestamp: pageToken.Timestamp,
2,196✔
1308
                TaskID:                 pageToken.TaskID,
2,196✔
1309
                MaxVisibilityTimestamp: request.MaxTimestamp,
2,196✔
1310
                PageSize:               request.BatchSize + 1,
2,196✔
1311
        })
2,196✔
1312

2,196✔
1313
        if err != nil && err != sql.ErrNoRows {
2,197✔
1314
                return nil, convertCommonErrors(m.db, "GetTimerIndexTasks", "", err)
1✔
1315
        }
1✔
1316

1317
        resp := &p.GetTimerIndexTasksResponse{Timers: make([]*p.TimerTaskInfo, len(rows))}
2,195✔
1318
        for i, row := range rows {
9,419✔
1319
                info, err := m.parser.TimerTaskInfoFromBlob(row.Data, row.DataEncoding)
7,224✔
1320
                if err != nil {
7,225✔
1321
                        return nil, err
1✔
1322
                }
1✔
1323
                resp.Timers[i] = &p.TimerTaskInfo{
7,223✔
1324
                        VisibilityTimestamp: row.VisibilityTimestamp,
7,223✔
1325
                        TaskID:              row.TaskID,
7,223✔
1326
                        DomainID:            info.DomainID.String(),
7,223✔
1327
                        WorkflowID:          info.GetWorkflowID(),
7,223✔
1328
                        RunID:               info.RunID.String(),
7,223✔
1329
                        TaskType:            int(info.GetTaskType()),
7,223✔
1330
                        TimeoutType:         int(info.GetTimeoutType()),
7,223✔
1331
                        EventID:             info.GetEventID(),
7,223✔
1332
                        ScheduleAttempt:     info.GetScheduleAttempt(),
7,223✔
1333
                        Version:             info.GetVersion(),
7,223✔
1334
                }
7,223✔
1335
        }
1336

1337
        if len(resp.Timers) > request.BatchSize {
2,716✔
1338
                pageToken = &timerTaskPageToken{
522✔
1339
                        TaskID:    resp.Timers[request.BatchSize].TaskID,
522✔
1340
                        Timestamp: resp.Timers[request.BatchSize].VisibilityTimestamp,
522✔
1341
                }
522✔
1342
                resp.Timers = resp.Timers[:request.BatchSize]
522✔
1343
                nextToken, err := pageToken.serialize()
522✔
1344
                if err != nil {
522✔
1345
                        return nil, &types.InternalServiceError{
×
1346
                                Message: fmt.Sprintf("GetTimerTasks: error serializing page token: %v", err),
×
1347
                        }
×
1348
                }
×
1349
                resp.NextPageToken = nextToken
522✔
1350
        }
1351

1352
        return resp, nil
2,194✔
1353
}
1354

1355
func (m *sqlExecutionStore) CompleteTimerTask(
1356
        ctx context.Context,
1357
        request *p.CompleteTimerTaskRequest,
1358
) error {
2✔
1359

2✔
1360
        if _, err := m.db.DeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
2✔
1361
                ShardID:             m.shardID,
2✔
1362
                VisibilityTimestamp: request.VisibilityTimestamp,
2✔
1363
                TaskID:              request.TaskID,
2✔
1364
        }); err != nil {
3✔
1365
                return convertCommonErrors(m.db, "CompleteTimerTask", "", err)
1✔
1366
        }
1✔
1367
        return nil
1✔
1368
}
1369

1370
func (m *sqlExecutionStore) RangeCompleteTimerTask(
1371
        ctx context.Context,
1372
        request *p.RangeCompleteTimerTaskRequest,
1373
) (*p.RangeCompleteTimerTaskResponse, error) {
24✔
1374
        result, err := m.db.RangeDeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
24✔
1375
                ShardID:                m.shardID,
24✔
1376
                MinVisibilityTimestamp: request.InclusiveBeginTimestamp,
24✔
1377
                MaxVisibilityTimestamp: request.ExclusiveEndTimestamp,
24✔
1378
                PageSize:               request.PageSize,
24✔
1379
        })
24✔
1380
        if err != nil {
25✔
1381
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
1✔
1382
        }
1✔
1383
        rowsDeleted, err := result.RowsAffected()
23✔
1384
        if err != nil {
23✔
1385
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
×
1386
        }
×
1387
        return &p.RangeCompleteTimerTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
23✔
1388
}
1389

1390
func (m *sqlExecutionStore) PutReplicationTaskToDLQ(
1391
        ctx context.Context,
1392
        request *p.InternalPutReplicationTaskToDLQRequest,
1393
) error {
5✔
1394
        replicationTask := request.TaskInfo
5✔
1395
        blob, err := m.parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{
5✔
1396
                DomainID:                serialization.MustParseUUID(replicationTask.DomainID),
5✔
1397
                WorkflowID:              replicationTask.WorkflowID,
5✔
1398
                RunID:                   serialization.MustParseUUID(replicationTask.RunID),
5✔
1399
                TaskType:                int16(replicationTask.TaskType),
5✔
1400
                FirstEventID:            replicationTask.FirstEventID,
5✔
1401
                NextEventID:             replicationTask.NextEventID,
5✔
1402
                Version:                 replicationTask.Version,
5✔
1403
                ScheduledID:             replicationTask.ScheduledID,
5✔
1404
                EventStoreVersion:       p.EventStoreVersion,
5✔
1405
                NewRunEventStoreVersion: p.EventStoreVersion,
5✔
1406
                BranchToken:             replicationTask.BranchToken,
5✔
1407
                NewRunBranchToken:       replicationTask.NewRunBranchToken,
5✔
1408
                CreationTimestamp:       replicationTask.CreationTime,
5✔
1409
        })
5✔
1410
        if err != nil {
6✔
1411
                return err
1✔
1412
        }
1✔
1413

1414
        row := &sqlplugin.ReplicationTaskDLQRow{
4✔
1415
                SourceClusterName: request.SourceClusterName,
4✔
1416
                ShardID:           m.shardID,
4✔
1417
                TaskID:            replicationTask.TaskID,
4✔
1418
                Data:              blob.Data,
4✔
1419
                DataEncoding:      string(blob.Encoding),
4✔
1420
        }
4✔
1421

4✔
1422
        _, err = m.db.InsertIntoReplicationTasksDLQ(ctx, row)
4✔
1423

4✔
1424
        // Tasks are immutable. So it's fine if we already persisted it before.
4✔
1425
        // This can happen when tasks are retried (ack and cleanup can have lag on source side).
4✔
1426
        if err != nil && !m.db.IsDupEntryError(err) {
5✔
1427
                return convertCommonErrors(m.db, "PutReplicationTaskToDLQ", "", err)
1✔
1428
        }
1✔
1429

1430
        return nil
3✔
1431
}
1432

1433
func (m *sqlExecutionStore) populateWorkflowMutableState(
1434
        execution sqlplugin.ExecutionsRow,
1435
) (*p.InternalWorkflowMutableState, error) {
496✔
1436

496✔
1437
        info, err := m.parser.WorkflowExecutionInfoFromBlob(execution.Data, execution.DataEncoding)
496✔
1438
        if err != nil {
496✔
1439
                return nil, err
×
1440
        }
×
1441

1442
        state := &p.InternalWorkflowMutableState{}
496✔
1443
        state.ExecutionInfo = serialization.ToInternalWorkflowExecutionInfo(info)
496✔
1444
        state.ExecutionInfo.DomainID = execution.DomainID.String()
496✔
1445
        state.ExecutionInfo.WorkflowID = execution.WorkflowID
496✔
1446
        state.ExecutionInfo.RunID = execution.RunID.String()
496✔
1447
        state.ExecutionInfo.NextEventID = execution.NextEventID
496✔
1448
        // TODO: remove this after all 2DC workflows complete
496✔
1449
        if info.LastWriteEventID != nil {
497✔
1450
                state.ReplicationState = &p.ReplicationState{}
1✔
1451
                state.ReplicationState.StartVersion = info.GetStartVersion()
1✔
1452
                state.ReplicationState.LastWriteVersion = execution.LastWriteVersion
1✔
1453
                state.ReplicationState.LastWriteEventID = info.GetLastWriteEventID()
1✔
1454
        }
1✔
1455

1456
        if info.GetVersionHistories() != nil {
990✔
1457
                state.VersionHistories = p.NewDataBlob(
494✔
1458
                        info.GetVersionHistories(),
494✔
1459
                        common.EncodingType(info.GetVersionHistoriesEncoding()),
494✔
1460
                )
494✔
1461
        }
494✔
1462

1463
        if info.GetChecksum() != nil {
499✔
1464
                state.ChecksumData = p.NewDataBlob(
3✔
1465
                        info.GetChecksum(),
3✔
1466
                        common.EncodingType(info.GetChecksumEncoding()),
3✔
1467
                )
3✔
1468
        }
3✔
1469

1470
        return state, nil
496✔
1471
}
1472

1473
func (m *sqlExecutionStore) populateInternalListConcreteExecutions(
1474
        executions []sqlplugin.ExecutionsRow,
1475
) ([]*p.InternalListConcreteExecutionsEntity, error) {
×
1476

×
1477
        concreteExecutions := make([]*p.InternalListConcreteExecutionsEntity, 0, len(executions))
×
1478
        for _, execution := range executions {
×
1479
                mutableState, err := m.populateWorkflowMutableState(execution)
×
1480
                if err != nil {
×
1481
                        return nil, err
×
1482
                }
×
1483

1484
                concreteExecution := &p.InternalListConcreteExecutionsEntity{
×
1485
                        ExecutionInfo:    mutableState.ExecutionInfo,
×
1486
                        VersionHistories: mutableState.VersionHistories,
×
1487
                }
×
1488
                concreteExecutions = append(concreteExecutions, concreteExecution)
×
1489
        }
1490
        return concreteExecutions, nil
×
1491
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc