• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e5375-7faa-4355-92c2-53b7ff622a40

18 Mar 2024 09:26PM UTC coverage: 64.931% (-0.004%) from 64.935%
018e5375-7faa-4355-92c2-53b7ff622a40

push

buildkite

web-flow
Fix workflow deletion (#5793)

27 of 45 new or added lines in 2 files covered. (60.0%)

59 existing lines in 17 files now uncovered.

94771 of 145956 relevant lines covered (64.93%)

2384.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.65
/common/persistence/sql/sql_execution_store.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "bytes"
25
        "context"
26
        "database/sql"
27
        "encoding/json"
28
        "fmt"
29
        "math"
30
        "runtime/debug"
31
        "time"
32

33
        "golang.org/x/sync/errgroup"
34

35
        "github.com/uber/cadence/common"
36
        "github.com/uber/cadence/common/collection"
37
        "github.com/uber/cadence/common/log"
38
        p "github.com/uber/cadence/common/persistence"
39
        "github.com/uber/cadence/common/persistence/serialization"
40
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
41
        "github.com/uber/cadence/common/types"
42
)
43

44
const (
45
        emptyWorkflowID       string = ""
46
        emptyReplicationRunID string = "30000000-5000-f000-f000-000000000000"
47
)
48

49
type sqlExecutionStore struct {
50
        sqlStore
51
        shardID                                int
52
        txExecuteShardLockedFn                 func(context.Context, int, string, int64, func(sqlplugin.Tx) error) error
53
        lockCurrentExecutionIfExistsFn         func(context.Context, sqlplugin.Tx, int, serialization.UUID, string) (*sqlplugin.CurrentExecutionsRow, error)
54
        createOrUpdateCurrentExecutionFn       func(context.Context, sqlplugin.Tx, p.CreateWorkflowMode, int, serialization.UUID, string, serialization.UUID, int, int, string, int64, int64) error
55
        assertNotCurrentExecutionFn            func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID) error
56
        assertRunIDAndUpdateCurrentExecutionFn func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID, serialization.UUID, string, int, int, int64, int64) error
57
        applyWorkflowSnapshotTxAsNewFn         func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowSnapshot, serialization.Parser) error
58
        applyWorkflowMutationTxFn              func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowMutation, serialization.Parser) error
59
        applyWorkflowSnapshotTxAsResetFn       func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowSnapshot, serialization.Parser) error
60
}
61

62
var _ p.ExecutionStore = (*sqlExecutionStore)(nil)
63

64
// NewSQLExecutionStore creates an instance of ExecutionStore
65
func NewSQLExecutionStore(
66
        db sqlplugin.DB,
67
        logger log.Logger,
68
        shardID int,
69
        parser serialization.Parser,
70
        dc *p.DynamicConfiguration,
71
) (p.ExecutionStore, error) {
90✔
72

90✔
73
        store := &sqlExecutionStore{
90✔
74
                shardID:                                shardID,
90✔
75
                lockCurrentExecutionIfExistsFn:         lockCurrentExecutionIfExists,
90✔
76
                createOrUpdateCurrentExecutionFn:       createOrUpdateCurrentExecution,
90✔
77
                assertNotCurrentExecutionFn:            assertNotCurrentExecution,
90✔
78
                assertRunIDAndUpdateCurrentExecutionFn: assertRunIDAndUpdateCurrentExecution,
90✔
79
                applyWorkflowSnapshotTxAsNewFn:         applyWorkflowSnapshotTxAsNew,
90✔
80
                applyWorkflowMutationTxFn:              applyWorkflowMutationTx,
90✔
81
                applyWorkflowSnapshotTxAsResetFn:       applyWorkflowSnapshotTxAsReset,
90✔
82
                sqlStore: sqlStore{
90✔
83
                        db:     db,
90✔
84
                        logger: logger,
90✔
85
                        parser: parser,
90✔
86
                        dc:     dc,
90✔
87
                },
90✔
88
        }
90✔
89
        store.txExecuteShardLockedFn = store.txExecuteShardLocked
90✔
90
        return store, nil
90✔
91
}
90✔
92

93
// txExecuteShardLocked executes f under transaction and with read lock on shard row
94
func (m *sqlExecutionStore) txExecuteShardLocked(
95
        ctx context.Context,
96
        dbShardID int,
97
        operation string,
98
        rangeID int64,
99
        fn func(tx sqlplugin.Tx) error,
100
) error {
3,260✔
101

3,260✔
102
        return m.txExecute(ctx, dbShardID, operation, func(tx sqlplugin.Tx) error {
6,520✔
103
                if err := readLockShard(ctx, tx, m.shardID, rangeID); err != nil {
3,261✔
104
                        return err
1✔
105
                }
1✔
106
                err := fn(tx)
3,259✔
107
                if err != nil {
3,290✔
108
                        return err
31✔
109
                }
31✔
110
                return nil
3,230✔
111
        })
112
}
113

114
func (m *sqlExecutionStore) GetShardID() int {
4,279✔
115
        return m.shardID
4,279✔
116
}
4,279✔
117

118
func (m *sqlExecutionStore) CreateWorkflowExecution(
119
        ctx context.Context,
120
        request *p.InternalCreateWorkflowExecutionRequest,
121
) (response *p.CreateWorkflowExecutionResponse, err error) {
349✔
122
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
349✔
123

349✔
124
        err = m.txExecuteShardLockedFn(ctx, dbShardID, "CreateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
698✔
125
                response, err = m.createWorkflowExecutionTx(ctx, tx, request)
349✔
126
                return err
349✔
127
        })
349✔
128
        return
349✔
129
}
130

131
func (m *sqlExecutionStore) createWorkflowExecutionTx(
132
        ctx context.Context,
133
        tx sqlplugin.Tx,
134
        request *p.InternalCreateWorkflowExecutionRequest,
135
) (*p.CreateWorkflowExecutionResponse, error) {
349✔
136

349✔
137
        newWorkflow := request.NewWorkflowSnapshot
349✔
138
        executionInfo := newWorkflow.ExecutionInfo
349✔
139
        startVersion := newWorkflow.StartVersion
349✔
140
        lastWriteVersion := newWorkflow.LastWriteVersion
349✔
141
        shardID := m.shardID
349✔
142
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
349✔
143
        workflowID := executionInfo.WorkflowID
349✔
144
        runID := serialization.MustParseUUID(executionInfo.RunID)
349✔
145

349✔
146
        if err := p.ValidateCreateWorkflowModeState(
349✔
147
                request.Mode,
349✔
148
                newWorkflow,
349✔
149
        ); err != nil {
351✔
150
                return nil, err
2✔
151
        }
2✔
152

153
        var err error
347✔
154
        var row *sqlplugin.CurrentExecutionsRow
347✔
155
        if row, err = m.lockCurrentExecutionIfExistsFn(ctx, tx, m.shardID, domainID, workflowID); err != nil {
348✔
156
                return nil, err
1✔
157
        }
1✔
158

159
        // current workflow record check
160
        if row != nil {
403✔
161
                // current run ID, last write version, current workflow state check
57✔
162
                switch request.Mode {
57✔
163
                case p.CreateWorkflowModeBrandNew:
29✔
164
                        return nil, &p.WorkflowExecutionAlreadyStartedError{
29✔
165
                                Msg:              fmt.Sprintf("Workflow execution already running. WorkflowId: %v", row.WorkflowID),
29✔
166
                                StartRequestID:   row.CreateRequestID,
29✔
167
                                RunID:            row.RunID.String(),
29✔
168
                                State:            int(row.State),
29✔
169
                                CloseStatus:      int(row.CloseStatus),
29✔
170
                                LastWriteVersion: row.LastWriteVersion,
29✔
171
                        }
29✔
172

173
                case p.CreateWorkflowModeWorkflowIDReuse:
24✔
174
                        if request.PreviousLastWriteVersion != row.LastWriteVersion {
25✔
175
                                return nil, &p.CurrentWorkflowConditionFailedError{
1✔
176
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
1✔
177
                                                "LastWriteVersion: %v, PreviousLastWriteVersion: %v",
1✔
178
                                                workflowID, row.LastWriteVersion, request.PreviousLastWriteVersion),
1✔
179
                                }
1✔
180
                        }
1✔
181
                        if row.State != p.WorkflowStateCompleted {
24✔
182
                                return nil, &p.CurrentWorkflowConditionFailedError{
1✔
183
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
1✔
184
                                                "State: %v, Expected: %v",
1✔
185
                                                workflowID, row.State, p.WorkflowStateCompleted),
1✔
186
                                }
1✔
187
                        }
1✔
188
                        runIDStr := row.RunID.String()
22✔
189
                        if runIDStr != request.PreviousRunID {
23✔
190
                                return nil, &p.CurrentWorkflowConditionFailedError{
1✔
191
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
1✔
192
                                                "RunID: %v, PreviousRunID: %v",
1✔
193
                                                workflowID, runIDStr, request.PreviousRunID),
1✔
194
                                }
1✔
195
                        }
1✔
196

197
                case p.CreateWorkflowModeZombie:
4✔
198
                        // zombie workflow creation with existence of current record, this is a noop
4✔
199
                        if err := assertRunIDMismatch(serialization.MustParseUUID(executionInfo.RunID), row.RunID); err != nil {
5✔
200
                                return nil, err
1✔
201
                        }
1✔
202

203
                case p.CreateWorkflowModeContinueAsNew:
2✔
204
                        runIDStr := row.RunID.String()
2✔
205
                        if runIDStr != request.PreviousRunID {
2✔
206
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
207
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
208
                                                "RunID: %v, PreviousRunID: %v",
×
209
                                                workflowID, runIDStr, request.PreviousRunID),
×
210
                                }
×
211
                        }
×
212

213
                default:
×
214
                        return nil, &types.InternalServiceError{
×
215
                                Message: fmt.Sprintf(
×
216
                                        "CreteWorkflowExecution: unknown mode: %v",
×
217
                                        request.Mode,
×
218
                                ),
×
219
                        }
×
220
                }
221
        }
222

223
        if err := m.createOrUpdateCurrentExecutionFn(
313✔
224
                ctx,
313✔
225
                tx,
313✔
226
                request.Mode,
313✔
227
                m.shardID,
313✔
228
                domainID,
313✔
229
                workflowID,
313✔
230
                runID,
313✔
231
                executionInfo.State,
313✔
232
                executionInfo.CloseStatus,
313✔
233
                executionInfo.CreateRequestID,
313✔
234
                startVersion,
313✔
235
                lastWriteVersion); err != nil {
314✔
236
                return nil, err
1✔
237
        }
1✔
238

239
        if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, &request.NewWorkflowSnapshot, m.parser); err != nil {
315✔
240
                return nil, err
3✔
241
        }
3✔
242

243
        return &p.CreateWorkflowExecutionResponse{}, nil
311✔
244
}
245

246
func (m *sqlExecutionStore) getExecutions(
247
        ctx context.Context,
248
        request *p.InternalGetWorkflowExecutionRequest,
249
        domainID serialization.UUID,
250
        wfID string,
251
        runID serialization.UUID,
252
) ([]sqlplugin.ExecutionsRow, error) {
805✔
253
        executions, err := m.db.SelectFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
805✔
254
                ShardID: m.shardID, DomainID: domainID, WorkflowID: wfID, RunID: runID})
805✔
255

805✔
256
        if err != nil {
1,125✔
257
                if err == sql.ErrNoRows {
639✔
258
                        return nil, &types.EntityNotExistsError{
319✔
259
                                Message: fmt.Sprintf(
319✔
260
                                        "Workflow execution not found.  WorkflowId: %v, RunId: %v",
319✔
261
                                        request.Execution.GetWorkflowID(),
319✔
262
                                        request.Execution.GetRunID(),
319✔
263
                                ),
319✔
264
                        }
319✔
265
                }
319✔
266
                return nil, convertCommonErrors(m.db, "GetWorkflowExecution", "", err)
1✔
267
        }
268

269
        if len(executions) == 0 {
487✔
270
                return nil, &types.EntityNotExistsError{
×
271
                        Message: fmt.Sprintf(
×
272
                                "Workflow execution not found.  WorkflowId: %v, RunId: %v",
×
273
                                request.Execution.GetWorkflowID(),
×
274
                                request.Execution.GetRunID(),
×
275
                        ),
×
276
                }
×
277
        }
×
278

279
        if len(executions) != 1 {
487✔
280
                return nil, &types.InternalServiceError{
×
281
                        Message: "GetWorkflowExecution return more than one results.",
×
282
                }
×
283
        }
×
284
        return executions, nil
487✔
285
}
286

287
func (m *sqlExecutionStore) GetWorkflowExecution(
288
        ctx context.Context,
289
        request *p.InternalGetWorkflowExecutionRequest,
290
) (resp *p.InternalGetWorkflowExecutionResponse, e error) {
805✔
291
        recoverPanic := func(recovered interface{}, err *error) {
7,231✔
292
                if recovered != nil {
6,426✔
293
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
294
                }
×
295
        }
296

297
        domainID := serialization.MustParseUUID(request.DomainID)
805✔
298
        runID := serialization.MustParseUUID(request.Execution.RunID)
805✔
299
        wfID := request.Execution.WorkflowID
805✔
300

805✔
301
        var executions []sqlplugin.ExecutionsRow
805✔
302
        var activityInfos map[int64]*p.InternalActivityInfo
805✔
303
        var timerInfos map[string]*p.TimerInfo
805✔
304
        var childExecutionInfos map[int64]*p.InternalChildExecutionInfo
805✔
305
        var requestCancelInfos map[int64]*p.RequestCancelInfo
805✔
306
        var signalInfos map[int64]*p.SignalInfo
805✔
307
        var bufferedEvents []*p.DataBlob
805✔
308
        var signalsRequested map[string]struct{}
805✔
309

805✔
310
        g, ctx := errgroup.WithContext(ctx)
805✔
311

805✔
312
        g.Go(func() (e error) {
1,610✔
313
                defer func() { recoverPanic(recover(), &e) }()
1,610✔
314
                executions, e = m.getExecutions(ctx, request, domainID, wfID, runID)
805✔
315
                return e
805✔
316
        })
317

318
        g.Go(func() (e error) {
1,610✔
319
                defer func() { recoverPanic(recover(), &e) }()
1,610✔
320
                activityInfos, e = getActivityInfoMap(
805✔
321
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
805✔
322
                return e
805✔
323
        })
324

325
        g.Go(func() (e error) {
1,610✔
326
                defer func() { recoverPanic(recover(), &e) }()
1,610✔
327
                timerInfos, e = getTimerInfoMap(
805✔
328
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
805✔
329
                return e
805✔
330
        })
331

332
        g.Go(func() (e error) {
1,610✔
333
                defer func() { recoverPanic(recover(), &e) }()
1,610✔
334
                childExecutionInfos, e = getChildExecutionInfoMap(
805✔
335
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
805✔
336
                return e
805✔
337
        })
338

339
        g.Go(func() (e error) {
1,610✔
340
                defer func() { recoverPanic(recover(), &e) }()
1,610✔
341
                requestCancelInfos, e = getRequestCancelInfoMap(
805✔
342
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
805✔
343
                return e
805✔
344
        })
345

346
        g.Go(func() (e error) {
1,610✔
347
                defer func() { recoverPanic(recover(), &e) }()
1,610✔
348
                signalInfos, e = getSignalInfoMap(
805✔
349
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
805✔
350
                return e
805✔
351
        })
352

353
        g.Go(func() (e error) {
1,610✔
354
                defer func() { recoverPanic(recover(), &e) }()
1,610✔
355
                bufferedEvents, e = getBufferedEvents(
805✔
356
                        ctx, m.db, m.shardID, domainID, wfID, runID)
805✔
357
                return e
805✔
358
        })
359

360
        g.Go(func() (e error) {
1,610✔
361
                defer func() { recoverPanic(recover(), &e) }()
1,610✔
362
                signalsRequested, e = getSignalsRequested(
805✔
363
                        ctx, m.db, m.shardID, domainID, wfID, runID)
805✔
364
                return e
805✔
365
        })
366

367
        err := g.Wait()
805✔
368
        if err != nil {
1,125✔
369
                return nil, err
320✔
370
        }
320✔
371

372
        state, err := m.populateWorkflowMutableState(executions[0])
487✔
373
        if err != nil {
487✔
374
                return nil, &types.InternalServiceError{
×
375
                        Message: fmt.Sprintf("GetWorkflowExecution: failed. Error: %v", err),
×
376
                }
×
377
        }
×
378
        state.ActivityInfos = activityInfos
487✔
379
        state.TimerInfos = timerInfos
487✔
380
        state.ChildExecutionInfos = childExecutionInfos
487✔
381
        state.RequestCancelInfos = requestCancelInfos
487✔
382
        state.SignalInfos = signalInfos
487✔
383
        state.BufferedEvents = bufferedEvents
487✔
384
        state.SignalRequestedIDs = signalsRequested
487✔
385

487✔
386
        return &p.InternalGetWorkflowExecutionResponse{State: state}, nil
487✔
387
}
388

389
func (m *sqlExecutionStore) UpdateWorkflowExecution(
390
        ctx context.Context,
391
        request *p.InternalUpdateWorkflowExecutionRequest,
392
) error {
2,933✔
393
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
2,933✔
394
        return m.txExecuteShardLockedFn(ctx, dbShardID, "UpdateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
5,866✔
395
                return m.updateWorkflowExecutionTx(ctx, tx, request)
2,933✔
396
        })
2,933✔
397
}
398

399
func (m *sqlExecutionStore) updateWorkflowExecutionTx(
400
        ctx context.Context,
401
        tx sqlplugin.Tx,
402
        request *p.InternalUpdateWorkflowExecutionRequest,
403
) error {
2,933✔
404

2,933✔
405
        updateWorkflow := request.UpdateWorkflowMutation
2,933✔
406
        newWorkflow := request.NewWorkflowSnapshot
2,933✔
407

2,933✔
408
        executionInfo := updateWorkflow.ExecutionInfo
2,933✔
409
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
2,933✔
410
        workflowID := executionInfo.WorkflowID
2,933✔
411
        runID := serialization.MustParseUUID(executionInfo.RunID)
2,933✔
412
        shardID := m.shardID
2,933✔
413

2,933✔
414
        if err := p.ValidateUpdateWorkflowModeState(
2,933✔
415
                request.Mode,
2,933✔
416
                updateWorkflow,
2,933✔
417
                newWorkflow,
2,933✔
418
        ); err != nil {
2,934✔
419
                return err
1✔
420
        }
1✔
421

422
        switch request.Mode {
2,932✔
423
        case p.UpdateWorkflowModeIgnoreCurrent:
1✔
424
                // no-op
425
        case p.UpdateWorkflowModeBypassCurrent:
4✔
426
                if err := m.assertNotCurrentExecutionFn(
4✔
427
                        ctx,
4✔
428
                        tx,
4✔
429
                        shardID,
4✔
430
                        domainID,
4✔
431
                        workflowID,
4✔
432
                        runID); err != nil {
5✔
433
                        return err
1✔
434
                }
1✔
435

436
        case p.UpdateWorkflowModeUpdateCurrent:
2,929✔
437
                if newWorkflow != nil {
3,048✔
438
                        newExecutionInfo := newWorkflow.ExecutionInfo
119✔
439
                        startVersion := newWorkflow.StartVersion
119✔
440
                        lastWriteVersion := newWorkflow.LastWriteVersion
119✔
441
                        newDomainID := serialization.MustParseUUID(newExecutionInfo.DomainID)
119✔
442
                        newRunID := serialization.MustParseUUID(newExecutionInfo.RunID)
119✔
443

119✔
444
                        if !bytes.Equal(domainID, newDomainID) {
120✔
445
                                return &types.InternalServiceError{
1✔
446
                                        Message: "UpdateWorkflowExecution: cannot continue as new to another domain",
1✔
447
                                }
1✔
448
                        }
1✔
449

450
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
118✔
451
                                ctx,
118✔
452
                                tx,
118✔
453
                                shardID,
118✔
454
                                domainID,
118✔
455
                                workflowID,
118✔
456
                                newRunID,
118✔
457
                                runID,
118✔
458
                                newWorkflow.ExecutionInfo.CreateRequestID,
118✔
459
                                newWorkflow.ExecutionInfo.State,
118✔
460
                                newWorkflow.ExecutionInfo.CloseStatus,
118✔
461
                                startVersion,
118✔
462
                                lastWriteVersion); err != nil {
118✔
463
                                return err
×
464
                        }
×
465
                } else {
2,812✔
466
                        startVersion := updateWorkflow.StartVersion
2,812✔
467
                        lastWriteVersion := updateWorkflow.LastWriteVersion
2,812✔
468
                        // this is only to update the current record
2,812✔
469
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
2,812✔
470
                                ctx,
2,812✔
471
                                tx,
2,812✔
472
                                shardID,
2,812✔
473
                                domainID,
2,812✔
474
                                workflowID,
2,812✔
475
                                runID,
2,812✔
476
                                runID,
2,812✔
477
                                executionInfo.CreateRequestID,
2,812✔
478
                                executionInfo.State,
2,812✔
479
                                executionInfo.CloseStatus,
2,812✔
480
                                startVersion,
2,812✔
481
                                lastWriteVersion); err != nil {
2,813✔
482
                                return err
1✔
483
                        }
1✔
484
                }
485

486
        default:
×
487
                return &types.InternalServiceError{
×
488
                        Message: fmt.Sprintf("UpdateWorkflowExecution: unknown mode: %v", request.Mode),
×
489
                }
×
490
        }
491

492
        if err := m.applyWorkflowMutationTxFn(ctx, tx, shardID, &updateWorkflow, m.parser); err != nil {
2,930✔
493
                return err
1✔
494
        }
1✔
495
        if newWorkflow != nil {
3,046✔
496
                if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
119✔
497
                        return err
1✔
498
                }
1✔
499
        }
500
        return nil
2,927✔
501
}
502

503
func (m *sqlExecutionStore) ConflictResolveWorkflowExecution(
504
        ctx context.Context,
505
        request *p.InternalConflictResolveWorkflowExecutionRequest,
506
) error {
11✔
507
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
11✔
508
        return m.txExecuteShardLockedFn(ctx, dbShardID, "ConflictResolveWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
22✔
509
                return m.conflictResolveWorkflowExecutionTx(ctx, tx, request)
11✔
510
        })
11✔
511
}
512

513
func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx(
514
        ctx context.Context,
515
        tx sqlplugin.Tx,
516
        request *p.InternalConflictResolveWorkflowExecutionRequest,
517
) error {
11✔
518

11✔
519
        currentWorkflow := request.CurrentWorkflowMutation
11✔
520
        resetWorkflow := request.ResetWorkflowSnapshot
11✔
521
        newWorkflow := request.NewWorkflowSnapshot
11✔
522

11✔
523
        shardID := m.shardID
11✔
524

11✔
525
        domainID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.DomainID)
11✔
526
        workflowID := resetWorkflow.ExecutionInfo.WorkflowID
11✔
527

11✔
528
        if err := p.ValidateConflictResolveWorkflowModeState(
11✔
529
                request.Mode,
11✔
530
                resetWorkflow,
11✔
531
                newWorkflow,
11✔
532
                currentWorkflow,
11✔
533
        ); err != nil {
12✔
534
                return err
1✔
535
        }
1✔
536

537
        switch request.Mode {
10✔
538
        case p.ConflictResolveWorkflowModeBypassCurrent:
4✔
539
                if err := m.assertNotCurrentExecutionFn(
4✔
540
                        ctx,
4✔
541
                        tx,
4✔
542
                        shardID,
4✔
543
                        domainID,
4✔
544
                        workflowID,
4✔
545
                        serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)); err != nil {
5✔
546
                        return err
1✔
547
                }
1✔
548

549
        case p.ConflictResolveWorkflowModeUpdateCurrent:
7✔
550
                executionInfo := resetWorkflow.ExecutionInfo
7✔
551
                startVersion := resetWorkflow.StartVersion
7✔
552
                lastWriteVersion := resetWorkflow.LastWriteVersion
7✔
553
                if newWorkflow != nil {
12✔
554
                        executionInfo = newWorkflow.ExecutionInfo
5✔
555
                        startVersion = newWorkflow.StartVersion
5✔
556
                        lastWriteVersion = newWorkflow.LastWriteVersion
5✔
557
                }
5✔
558
                runID := serialization.MustParseUUID(executionInfo.RunID)
7✔
559
                createRequestID := executionInfo.CreateRequestID
7✔
560
                state := executionInfo.State
7✔
561
                closeStatus := executionInfo.CloseStatus
7✔
562

7✔
563
                if currentWorkflow != nil {
11✔
564
                        prevRunID := serialization.MustParseUUID(currentWorkflow.ExecutionInfo.RunID)
4✔
565

4✔
566
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
4✔
567
                                ctx,
4✔
568
                                tx,
4✔
569
                                m.shardID,
4✔
570
                                domainID,
4✔
571
                                workflowID,
4✔
572
                                runID,
4✔
573
                                prevRunID,
4✔
574
                                createRequestID,
4✔
575
                                state,
4✔
576
                                closeStatus,
4✔
577
                                startVersion,
4✔
578
                                lastWriteVersion); err != nil {
5✔
579
                                return err
1✔
580
                        }
1✔
581
                } else {
3✔
582
                        // reset workflow is current
3✔
583
                        prevRunID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)
3✔
584

3✔
585
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
3✔
586
                                ctx,
3✔
587
                                tx,
3✔
588
                                m.shardID,
3✔
589
                                domainID,
3✔
590
                                workflowID,
3✔
591
                                runID,
3✔
592
                                prevRunID,
3✔
593
                                createRequestID,
3✔
594
                                state,
3✔
595
                                closeStatus,
3✔
596
                                startVersion,
3✔
597
                                lastWriteVersion); err != nil {
3✔
598
                                return err
×
599
                        }
×
600
                }
601

602
        default:
×
603
                return &types.InternalServiceError{
×
604
                        Message: fmt.Sprintf("ConflictResolveWorkflowExecution: unknown mode: %v", request.Mode),
×
605
                }
×
606
        }
607

608
        if err := m.applyWorkflowSnapshotTxAsResetFn(ctx, tx, shardID, &resetWorkflow, m.parser); err != nil {
9✔
609
                return err
1✔
610
        }
1✔
611
        if currentWorkflow != nil {
10✔
612
                if err := m.applyWorkflowMutationTxFn(ctx, tx, shardID, currentWorkflow, m.parser); err != nil {
4✔
613
                        return err
1✔
614
                }
1✔
615
        }
616
        if newWorkflow != nil {
9✔
617
                if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
4✔
618
                        return err
1✔
619
                }
1✔
620
        }
621
        return nil
5✔
622
}
623

624
func (m *sqlExecutionStore) DeleteWorkflowExecution(
625
        ctx context.Context,
626
        request *p.DeleteWorkflowExecutionRequest,
627
) error {
38✔
628
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
38✔
629
        domainID := serialization.MustParseUUID(request.DomainID)
38✔
630
        runID := serialization.MustParseUUID(request.RunID)
38✔
631
        wfID := request.WorkflowID
38✔
632
        return m.txExecute(ctx, dbShardID, "DeleteWorkflowExecution", func(tx sqlplugin.Tx) error {
76✔
633
                if _, err := tx.DeleteFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
38✔
634
                        ShardID:    m.shardID,
38✔
635
                        DomainID:   domainID,
38✔
636
                        WorkflowID: wfID,
38✔
637
                        RunID:      runID,
38✔
638
                }); err != nil {
39✔
639
                        return convertCommonErrors(tx, "DeleteWorkflowExecution", "", err)
1✔
640
                }
1✔
641
                if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
37✔
642
                        ShardID:    int64(m.shardID),
37✔
643
                        DomainID:   domainID,
37✔
644
                        WorkflowID: wfID,
37✔
645
                        RunID:      runID,
37✔
646
                }); err != nil {
37✔
NEW
647
                        return convertCommonErrors(tx, "DeleteFromActivityInfoMaps", "", err)
×
NEW
648
                }
×
649
                if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
37✔
650
                        ShardID:    int64(m.shardID),
37✔
651
                        DomainID:   domainID,
37✔
652
                        WorkflowID: wfID,
37✔
653
                        RunID:      runID,
37✔
654
                }); err != nil {
37✔
NEW
655
                        return convertCommonErrors(tx, "DeleteFromTimerInfoMaps", "", err)
×
NEW
656
                }
×
657
                if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
37✔
658
                        ShardID:    int64(m.shardID),
37✔
659
                        DomainID:   domainID,
37✔
660
                        WorkflowID: wfID,
37✔
661
                        RunID:      runID,
37✔
662
                }); err != nil {
37✔
NEW
663
                        return convertCommonErrors(tx, "DeleteFromChildExecutionInfoMaps", "", err)
×
NEW
664
                }
×
665
                if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
37✔
666
                        ShardID:    int64(m.shardID),
37✔
667
                        DomainID:   domainID,
37✔
668
                        WorkflowID: wfID,
37✔
669
                        RunID:      runID,
37✔
670
                }); err != nil {
37✔
NEW
671
                        return convertCommonErrors(tx, "DeleteFromRequestCancelInfoMaps", "", err)
×
NEW
672
                }
×
673
                if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
37✔
674
                        ShardID:    int64(m.shardID),
37✔
675
                        DomainID:   domainID,
37✔
676
                        WorkflowID: wfID,
37✔
677
                        RunID:      runID,
37✔
678
                }); err != nil {
37✔
NEW
679
                        return convertCommonErrors(tx, "DeleteFromSignalInfoMaps", "", err)
×
NEW
680
                }
×
681
                if _, err := tx.DeleteFromBufferedEvents(ctx, &sqlplugin.BufferedEventsFilter{
37✔
682
                        ShardID:    m.shardID,
37✔
683
                        DomainID:   domainID,
37✔
684
                        WorkflowID: wfID,
37✔
685
                        RunID:      runID,
37✔
686
                }); err != nil {
37✔
NEW
687
                        return convertCommonErrors(tx, "DeleteFromBufferedEvents", "", err)
×
NEW
688
                }
×
689
                if _, err := tx.DeleteFromSignalsRequestedSets(ctx, &sqlplugin.SignalsRequestedSetsFilter{
37✔
690
                        ShardID:    int64(m.shardID),
37✔
691
                        DomainID:   domainID,
37✔
692
                        WorkflowID: wfID,
37✔
693
                        RunID:      runID,
37✔
694
                }); err != nil {
37✔
NEW
695
                        return convertCommonErrors(tx, "DeleteFromSignalsRequestedSets", "", err)
×
NEW
696
                }
×
697
                return nil
37✔
698
        })
699
}
700

701
// its possible for a new run of the same workflow to have started after the run we are deleting
702
// here was finished. In that case, current_executions table will have the same workflowID but different
703
// runID. The following code will delete the row from current_executions if and only if the runID is
704
// same as the one we are trying to delete here
705
func (m *sqlExecutionStore) DeleteCurrentWorkflowExecution(
706
        ctx context.Context,
707
        request *p.DeleteCurrentWorkflowExecutionRequest,
708
) error {
38✔
709

38✔
710
        domainID := serialization.MustParseUUID(request.DomainID)
38✔
711
        runID := serialization.MustParseUUID(request.RunID)
38✔
712
        _, err := m.db.DeleteFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
38✔
713
                ShardID:    int64(m.shardID),
38✔
714
                DomainID:   domainID,
38✔
715
                WorkflowID: request.WorkflowID,
38✔
716
                RunID:      runID,
38✔
717
        })
38✔
718
        if err != nil {
39✔
719
                return convertCommonErrors(m.db, "DeleteCurrentWorkflowExecution", "", err)
1✔
720
        }
1✔
721
        return nil
37✔
722
}
723

724
func (m *sqlExecutionStore) GetCurrentExecution(
725
        ctx context.Context,
726
        request *p.GetCurrentExecutionRequest,
727
) (*p.GetCurrentExecutionResponse, error) {
124✔
728

124✔
729
        row, err := m.db.SelectFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
124✔
730
                ShardID:    int64(m.shardID),
124✔
731
                DomainID:   serialization.MustParseUUID(request.DomainID),
124✔
732
                WorkflowID: request.WorkflowID,
124✔
733
        })
124✔
734
        if err != nil {
133✔
735
                return nil, convertCommonErrors(m.db, "GetCurrentExecution", "", err)
9✔
736
        }
9✔
737
        return &p.GetCurrentExecutionResponse{
117✔
738
                StartRequestID:   row.CreateRequestID,
117✔
739
                RunID:            row.RunID.String(),
117✔
740
                State:            int(row.State),
117✔
741
                CloseStatus:      int(row.CloseStatus),
117✔
742
                LastWriteVersion: row.LastWriteVersion,
117✔
743
        }, nil
117✔
744
}
745

746
func (m *sqlExecutionStore) ListCurrentExecutions(
747
        _ context.Context,
748
        _ *p.ListCurrentExecutionsRequest,
749
) (*p.ListCurrentExecutionsResponse, error) {
×
750
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
751
}
×
752

753
func (m *sqlExecutionStore) IsWorkflowExecutionExists(
754
        _ context.Context,
755
        _ *p.IsWorkflowExecutionExistsRequest,
756
) (*p.IsWorkflowExecutionExistsResponse, error) {
×
757
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
758
}
×
759

760
func (m *sqlExecutionStore) ListConcreteExecutions(
761
        ctx context.Context,
762
        request *p.ListConcreteExecutionsRequest,
763
) (*p.InternalListConcreteExecutionsResponse, error) {
×
764

×
765
        filter := &sqlplugin.ExecutionsFilter{}
×
766
        if len(request.PageToken) > 0 {
×
767
                err := gobDeserialize(request.PageToken, &filter)
×
768
                if err != nil {
×
769
                        return nil, &types.InternalServiceError{
×
770
                                Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
771
                        }
×
772
                }
×
773
        } else {
×
774
                filter = &sqlplugin.ExecutionsFilter{
×
775
                        ShardID:    m.shardID,
×
776
                        WorkflowID: "",
×
777
                }
×
778
        }
×
779
        filter.Size = request.PageSize
×
780

×
781
        executions, err := m.db.SelectFromExecutions(ctx, filter)
×
782
        if err != nil {
×
783
                if err == sql.ErrNoRows {
×
784
                        return &p.InternalListConcreteExecutionsResponse{}, nil
×
785
                }
×
786
                return nil, convertCommonErrors(m.db, "ListConcreteExecutions", "", err)
×
787
        }
788

789
        if len(executions) == 0 {
×
790
                return &p.InternalListConcreteExecutionsResponse{}, nil
×
791
        }
×
792
        lastExecution := executions[len(executions)-1]
×
793
        nextFilter := &sqlplugin.ExecutionsFilter{
×
794
                ShardID:    m.shardID,
×
795
                WorkflowID: lastExecution.WorkflowID,
×
796
        }
×
797
        token, err := gobSerialize(nextFilter)
×
798
        if err != nil {
×
799
                return nil, &types.InternalServiceError{
×
800
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
801
                }
×
802
        }
×
803
        concreteExecutions, err := m.populateInternalListConcreteExecutions(executions)
×
804
        if err != nil {
×
805
                return nil, &types.InternalServiceError{
×
806
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
807
                }
×
808
        }
×
809

810
        return &p.InternalListConcreteExecutionsResponse{
×
811
                Executions:    concreteExecutions,
×
812
                NextPageToken: token,
×
813
        }, nil
×
814
}
815

816
func (m *sqlExecutionStore) GetTransferTasks(
817
        ctx context.Context,
818
        request *p.GetTransferTasksRequest,
819
) (*p.GetTransferTasksResponse, error) {
1,514✔
820
        minReadLevel := request.ReadLevel
1,514✔
821
        if len(request.NextPageToken) > 0 {
1,517✔
822
                readLevel, err := deserializePageToken(request.NextPageToken)
3✔
823
                if err != nil {
3✔
824
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "failed to deserialize page token", err)
×
825
                }
×
826
                minReadLevel = readLevel
3✔
827
        }
828
        rows, err := m.db.SelectFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
1,514✔
829
                ShardID:   m.shardID,
1,514✔
830
                MinTaskID: minReadLevel,
1,514✔
831
                MaxTaskID: request.MaxReadLevel,
1,514✔
832
                PageSize:  request.BatchSize,
1,514✔
833
        })
1,514✔
834
        if err != nil {
1,515✔
835
                if err != sql.ErrNoRows {
2✔
836
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "", err)
1✔
837
                }
1✔
838
        }
839
        resp := &p.GetTransferTasksResponse{Tasks: make([]*p.TransferTaskInfo, len(rows))}
1,513✔
840
        for i, row := range rows {
4,934✔
841
                info, err := m.parser.TransferTaskInfoFromBlob(row.Data, row.DataEncoding)
3,421✔
842
                if err != nil {
3,422✔
843
                        return nil, err
1✔
844
                }
1✔
845
                resp.Tasks[i] = &p.TransferTaskInfo{
3,420✔
846
                        TaskID:                  row.TaskID,
3,420✔
847
                        DomainID:                info.DomainID.String(),
3,420✔
848
                        WorkflowID:              info.GetWorkflowID(),
3,420✔
849
                        RunID:                   info.RunID.String(),
3,420✔
850
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
3,420✔
851
                        TargetDomainID:          info.TargetDomainID.String(),
3,420✔
852
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
3,420✔
853
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
3,420✔
854
                        TargetRunID:             info.TargetRunID.String(),
3,420✔
855
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
3,420✔
856
                        TaskList:                info.GetTaskList(),
3,420✔
857
                        TaskType:                int(info.GetTaskType()),
3,420✔
858
                        ScheduleID:              info.GetScheduleID(),
3,420✔
859
                        Version:                 info.GetVersion(),
3,420✔
860
                }
3,420✔
861
        }
862
        if len(rows) > 0 {
2,960✔
863
                lastTaskID := rows[len(rows)-1].TaskID
1,448✔
864
                if lastTaskID < request.MaxReadLevel {
1,452✔
865
                        resp.NextPageToken = serializePageToken(lastTaskID)
4✔
866
                }
4✔
867
        }
868
        return resp, nil
1,512✔
869
}
870

871
func (m *sqlExecutionStore) CompleteTransferTask(
872
        ctx context.Context,
873
        request *p.CompleteTransferTaskRequest,
874
) error {
2✔
875

2✔
876
        if _, err := m.db.DeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
2✔
877
                ShardID: m.shardID,
2✔
878
                TaskID:  request.TaskID,
2✔
879
        }); err != nil {
3✔
880
                return convertCommonErrors(m.db, "CompleteTransferTask", "", err)
1✔
881
        }
1✔
882
        return nil
1✔
883
}
884

885
func (m *sqlExecutionStore) RangeCompleteTransferTask(
886
        ctx context.Context,
887
        request *p.RangeCompleteTransferTaskRequest,
888
) (*p.RangeCompleteTransferTaskResponse, error) {
59✔
889
        result, err := m.db.RangeDeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
59✔
890
                ShardID:   m.shardID,
59✔
891
                MinTaskID: request.ExclusiveBeginTaskID,
59✔
892
                MaxTaskID: request.InclusiveEndTaskID,
59✔
893
                PageSize:  request.PageSize,
59✔
894
        })
59✔
895
        if err != nil {
60✔
896
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
1✔
897
        }
1✔
898
        rowsDeleted, err := result.RowsAffected()
58✔
899
        if err != nil {
58✔
900
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
×
901
        }
×
902
        return &p.RangeCompleteTransferTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
58✔
903
}
904

905
func (m *sqlExecutionStore) GetCrossClusterTasks(
906
        ctx context.Context,
907
        request *p.GetCrossClusterTasksRequest,
908
) (*p.GetCrossClusterTasksResponse, error) {
102✔
909
        minReadLevel := request.ReadLevel
102✔
910
        if len(request.NextPageToken) > 0 {
105✔
911
                readLevel, err := deserializePageToken(request.NextPageToken)
3✔
912
                if err != nil {
3✔
913
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "failed to deserialize page token", err)
×
914
                }
×
915
                minReadLevel = readLevel
3✔
916
        }
917
        rows, err := m.db.SelectFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
102✔
918
                TargetCluster: request.TargetCluster,
102✔
919
                ShardID:       m.shardID,
102✔
920
                MinTaskID:     minReadLevel,
102✔
921
                MaxTaskID:     request.MaxReadLevel,
102✔
922
                PageSize:      request.BatchSize,
102✔
923
        })
102✔
924
        if err != nil {
103✔
925
                if err != sql.ErrNoRows {
2✔
926
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "", err)
1✔
927
                }
1✔
928
        }
929
        resp := &p.GetCrossClusterTasksResponse{Tasks: make([]*p.CrossClusterTaskInfo, len(rows))}
101✔
930
        for i, row := range rows {
103✔
931
                info, err := m.parser.CrossClusterTaskInfoFromBlob(row.Data, row.DataEncoding)
2✔
932
                if err != nil {
3✔
933
                        return nil, err
1✔
934
                }
1✔
935
                resp.Tasks[i] = &p.CrossClusterTaskInfo{
1✔
936
                        TaskID:                  row.TaskID,
1✔
937
                        DomainID:                info.DomainID.String(),
1✔
938
                        WorkflowID:              info.GetWorkflowID(),
1✔
939
                        RunID:                   info.RunID.String(),
1✔
940
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
1✔
941
                        TargetDomainID:          info.TargetDomainID.String(),
1✔
942
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
1✔
943
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
1✔
944
                        TargetRunID:             info.TargetRunID.String(),
1✔
945
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
1✔
946
                        TaskList:                info.GetTaskList(),
1✔
947
                        TaskType:                int(info.GetTaskType()),
1✔
948
                        ScheduleID:              info.GetScheduleID(),
1✔
949
                        Version:                 info.GetVersion(),
1✔
950
                }
1✔
951
        }
952
        if len(rows) > 0 {
101✔
953
                lastTaskID := rows[len(rows)-1].TaskID
1✔
954
                if lastTaskID < request.MaxReadLevel {
2✔
955
                        resp.NextPageToken = serializePageToken(lastTaskID)
1✔
956
                }
1✔
957
        }
958
        return resp, nil
100✔
959

960
}
961

962
func (m *sqlExecutionStore) CompleteCrossClusterTask(
963
        ctx context.Context,
964
        request *p.CompleteCrossClusterTaskRequest,
965
) error {
2✔
966
        if _, err := m.db.DeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
2✔
967
                TargetCluster: request.TargetCluster,
2✔
968
                ShardID:       m.shardID,
2✔
969
                TaskID:        request.TaskID,
2✔
970
        }); err != nil {
3✔
971
                return convertCommonErrors(m.db, "CompleteCrossClusterTask", "", err)
1✔
972
        }
1✔
973
        return nil
1✔
974
}
975

976
func (m *sqlExecutionStore) RangeCompleteCrossClusterTask(
977
        ctx context.Context,
978
        request *p.RangeCompleteCrossClusterTaskRequest,
979
) (*p.RangeCompleteCrossClusterTaskResponse, error) {
84✔
980
        result, err := m.db.RangeDeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
84✔
981
                TargetCluster: request.TargetCluster,
84✔
982
                ShardID:       m.shardID,
84✔
983
                MinTaskID:     request.ExclusiveBeginTaskID,
84✔
984
                MaxTaskID:     request.InclusiveEndTaskID,
84✔
985
                PageSize:      request.PageSize,
84✔
986
        })
84✔
987
        if err != nil {
85✔
988
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
1✔
989
        }
1✔
990
        rowsDeleted, err := result.RowsAffected()
83✔
991
        if err != nil {
83✔
992
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
×
993
        }
×
994
        return &p.RangeCompleteCrossClusterTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
83✔
995
}
996

997
func (m *sqlExecutionStore) GetReplicationTasks(
998
        ctx context.Context,
999
        request *p.GetReplicationTasksRequest,
1000
) (*p.InternalGetReplicationTasksResponse, error) {
63✔
1001

63✔
1002
        readLevel, maxReadLevelInclusive, err := getReadLevels(request)
63✔
1003
        if err != nil {
63✔
1004
                return nil, err
×
1005
        }
×
1006

1007
        rows, err := m.db.SelectFromReplicationTasks(
63✔
1008
                ctx,
63✔
1009
                &sqlplugin.ReplicationTasksFilter{
63✔
1010
                        ShardID:   m.shardID,
63✔
1011
                        MinTaskID: readLevel,
63✔
1012
                        MaxTaskID: maxReadLevelInclusive,
63✔
1013
                        PageSize:  request.BatchSize,
63✔
1014
                })
63✔
1015

63✔
1016
        switch err {
63✔
1017
        case nil:
62✔
1018
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
62✔
1019
        case sql.ErrNoRows:
×
1020
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1021
        default:
1✔
1022
                return nil, convertCommonErrors(m.db, "GetReplicationTasks", "", err)
1✔
1023
        }
1024
}
1025

1026
func getReadLevels(request *p.GetReplicationTasksRequest) (readLevel int64, maxReadLevelInclusive int64, err error) {
68✔
1027
        readLevel = request.ReadLevel
68✔
1028
        if len(request.NextPageToken) > 0 {
76✔
1029
                readLevel, err = deserializePageToken(request.NextPageToken)
8✔
1030
                if err != nil {
8✔
1031
                        return 0, 0, err
×
1032
                }
×
1033
        }
1034

1035
        maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxReadLevel)
68✔
1036
        return readLevel, maxReadLevelInclusive, nil
68✔
1037
}
1038

1039
func (m *sqlExecutionStore) populateGetReplicationTasksResponse(
1040
        rows []sqlplugin.ReplicationTasksRow,
1041
        requestMaxReadLevel int64,
1042
) (*p.InternalGetReplicationTasksResponse, error) {
66✔
1043
        if len(rows) == 0 {
128✔
1044
                return &p.InternalGetReplicationTasksResponse{}, nil
62✔
1045
        }
62✔
1046

1047
        var tasks = make([]*p.InternalReplicationTaskInfo, len(rows))
6✔
1048
        for i, row := range rows {
12✔
1049
                info, err := m.parser.ReplicationTaskInfoFromBlob(row.Data, row.DataEncoding)
6✔
1050
                if err != nil {
8✔
1051
                        return nil, err
2✔
1052
                }
2✔
1053

1054
                tasks[i] = &p.InternalReplicationTaskInfo{
4✔
1055
                        TaskID:            row.TaskID,
4✔
1056
                        DomainID:          info.DomainID.String(),
4✔
1057
                        WorkflowID:        info.GetWorkflowID(),
4✔
1058
                        RunID:             info.RunID.String(),
4✔
1059
                        TaskType:          int(info.GetTaskType()),
4✔
1060
                        FirstEventID:      info.GetFirstEventID(),
4✔
1061
                        NextEventID:       info.GetNextEventID(),
4✔
1062
                        Version:           info.GetVersion(),
4✔
1063
                        ScheduledID:       info.GetScheduledID(),
4✔
1064
                        BranchToken:       info.GetBranchToken(),
4✔
1065
                        NewRunBranchToken: info.GetNewRunBranchToken(),
4✔
1066
                        CreationTime:      info.GetCreationTimestamp(),
4✔
1067
                }
4✔
1068
        }
1069
        var nextPageToken []byte
4✔
1070
        lastTaskID := rows[len(rows)-1].TaskID
4✔
1071
        if lastTaskID < requestMaxReadLevel {
8✔
1072
                nextPageToken = serializePageToken(lastTaskID)
4✔
1073
        }
4✔
1074
        return &p.InternalGetReplicationTasksResponse{
4✔
1075
                Tasks:         tasks,
4✔
1076
                NextPageToken: nextPageToken,
4✔
1077
        }, nil
4✔
1078
}
1079

1080
func (m *sqlExecutionStore) CompleteReplicationTask(
1081
        ctx context.Context,
1082
        request *p.CompleteReplicationTaskRequest,
1083
) error {
2✔
1084

2✔
1085
        if _, err := m.db.DeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
2✔
1086
                ShardID: m.shardID,
2✔
1087
                TaskID:  request.TaskID,
2✔
1088
        }); err != nil {
3✔
1089
                return convertCommonErrors(m.db, "CompleteReplicationTask", "", err)
1✔
1090
        }
1✔
1091
        return nil
1✔
1092
}
1093

1094
func (m *sqlExecutionStore) RangeCompleteReplicationTask(
1095
        ctx context.Context,
1096
        request *p.RangeCompleteReplicationTaskRequest,
1097
) (*p.RangeCompleteReplicationTaskResponse, error) {
67✔
1098
        result, err := m.db.RangeDeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
67✔
1099
                ShardID:            m.shardID,
67✔
1100
                InclusiveEndTaskID: request.InclusiveEndTaskID,
67✔
1101
                PageSize:           request.PageSize,
67✔
1102
        })
67✔
1103
        if err != nil {
68✔
1104
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
1✔
1105
        }
1✔
1106
        rowsDeleted, err := result.RowsAffected()
66✔
1107
        if err != nil {
66✔
1108
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
×
1109
        }
×
1110
        return &p.RangeCompleteReplicationTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
66✔
1111
}
1112

1113
func (m *sqlExecutionStore) GetReplicationTasksFromDLQ(
1114
        ctx context.Context,
1115
        request *p.GetReplicationTasksFromDLQRequest,
1116
) (*p.InternalGetReplicationTasksFromDLQResponse, error) {
5✔
1117

5✔
1118
        readLevel, maxReadLevelInclusive, err := getReadLevels(&request.GetReplicationTasksRequest)
5✔
1119
        if err != nil {
5✔
1120
                return nil, err
×
1121
        }
×
1122

1123
        filter := sqlplugin.ReplicationTasksFilter{
5✔
1124
                ShardID:   m.shardID,
5✔
1125
                MinTaskID: readLevel,
5✔
1126
                MaxTaskID: maxReadLevelInclusive,
5✔
1127
                PageSize:  request.BatchSize,
5✔
1128
        }
5✔
1129
        rows, err := m.db.SelectFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
5✔
1130
                ReplicationTasksFilter: filter,
5✔
1131
                SourceClusterName:      request.SourceClusterName,
5✔
1132
        })
5✔
1133

5✔
1134
        switch err {
5✔
1135
        case nil:
4✔
1136
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
4✔
1137
        case sql.ErrNoRows:
×
1138
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1139
        default:
1✔
1140
                return nil, convertCommonErrors(m.db, "GetReplicationTasksFromDLQ", "", err)
1✔
1141
        }
1142
}
1143

1144
func (m *sqlExecutionStore) GetReplicationDLQSize(
1145
        ctx context.Context,
1146
        request *p.GetReplicationDLQSizeRequest,
1147
) (*p.GetReplicationDLQSizeResponse, error) {
11✔
1148

11✔
1149
        size, err := m.db.SelectFromReplicationDLQ(ctx, &sqlplugin.ReplicationTaskDLQFilter{
11✔
1150
                SourceClusterName: request.SourceClusterName,
11✔
1151
                ShardID:           m.shardID,
11✔
1152
        })
11✔
1153

11✔
1154
        switch err {
11✔
1155
        case nil:
9✔
1156
                return &p.GetReplicationDLQSizeResponse{
9✔
1157
                        Size: size,
9✔
1158
                }, nil
9✔
1159
        case sql.ErrNoRows:
1✔
1160
                return &p.GetReplicationDLQSizeResponse{
1✔
1161
                        Size: 0,
1✔
1162
                }, nil
1✔
1163
        default:
1✔
1164
                return nil, convertCommonErrors(m.db, "GetReplicationDLQSize", "", err)
1✔
1165
        }
1166
}
1167

1168
func (m *sqlExecutionStore) DeleteReplicationTaskFromDLQ(
1169
        ctx context.Context,
1170
        request *p.DeleteReplicationTaskFromDLQRequest,
1171
) error {
2✔
1172

2✔
1173
        filter := sqlplugin.ReplicationTasksFilter{
2✔
1174
                ShardID: m.shardID,
2✔
1175
                TaskID:  request.TaskID,
2✔
1176
        }
2✔
1177

2✔
1178
        if _, err := m.db.DeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
2✔
1179
                ReplicationTasksFilter: filter,
2✔
1180
                SourceClusterName:      request.SourceClusterName,
2✔
1181
        }); err != nil {
3✔
1182
                return convertCommonErrors(m.db, "DeleteReplicationTaskFromDLQ", "", err)
1✔
1183
        }
1✔
1184
        return nil
1✔
1185
}
1186

1187
func (m *sqlExecutionStore) RangeDeleteReplicationTaskFromDLQ(
1188
        ctx context.Context,
1189
        request *p.RangeDeleteReplicationTaskFromDLQRequest,
1190
) (*p.RangeDeleteReplicationTaskFromDLQResponse, error) {
2✔
1191
        filter := sqlplugin.ReplicationTasksFilter{
2✔
1192
                ShardID:            m.shardID,
2✔
1193
                TaskID:             request.ExclusiveBeginTaskID,
2✔
1194
                InclusiveEndTaskID: request.InclusiveEndTaskID,
2✔
1195
                PageSize:           request.PageSize,
2✔
1196
        }
2✔
1197
        result, err := m.db.RangeDeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
2✔
1198
                ReplicationTasksFilter: filter,
2✔
1199
                SourceClusterName:      request.SourceClusterName,
2✔
1200
        })
2✔
1201
        if err != nil {
3✔
1202
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
1✔
1203
        }
1✔
1204
        rowsDeleted, err := result.RowsAffected()
1✔
1205
        if err != nil {
1✔
1206
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
×
1207
        }
×
1208
        return &p.RangeDeleteReplicationTaskFromDLQResponse{TasksCompleted: int(rowsDeleted)}, nil
1✔
1209
}
1210

1211
func (m *sqlExecutionStore) CreateFailoverMarkerTasks(
1212
        ctx context.Context,
1213
        request *p.CreateFailoverMarkersRequest,
1214
) error {
5✔
1215
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
5✔
1216
        return m.txExecuteShardLockedFn(ctx, dbShardID, "CreateFailoverMarkerTasks", request.RangeID, func(tx sqlplugin.Tx) error {
10✔
1217
                replicationTasksRows := make([]sqlplugin.ReplicationTasksRow, len(request.Markers))
5✔
1218
                for i, task := range request.Markers {
10✔
1219
                        blob, err := m.parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{
5✔
1220
                                DomainID:                serialization.MustParseUUID(task.DomainID),
5✔
1221
                                WorkflowID:              emptyWorkflowID,
5✔
1222
                                RunID:                   serialization.MustParseUUID(emptyReplicationRunID),
5✔
1223
                                TaskType:                int16(task.GetType()),
5✔
1224
                                FirstEventID:            common.EmptyEventID,
5✔
1225
                                NextEventID:             common.EmptyEventID,
5✔
1226
                                Version:                 task.GetVersion(),
5✔
1227
                                ScheduledID:             common.EmptyEventID,
5✔
1228
                                EventStoreVersion:       p.EventStoreVersion,
5✔
1229
                                NewRunEventStoreVersion: p.EventStoreVersion,
5✔
1230
                                BranchToken:             nil,
5✔
1231
                                NewRunBranchToken:       nil,
5✔
1232
                                CreationTimestamp:       task.GetVisibilityTimestamp(),
5✔
1233
                        })
5✔
1234
                        if err != nil {
6✔
1235
                                return err
1✔
1236
                        }
1✔
1237
                        replicationTasksRows[i].ShardID = m.shardID
4✔
1238
                        replicationTasksRows[i].TaskID = task.GetTaskID()
4✔
1239
                        replicationTasksRows[i].Data = blob.Data
4✔
1240
                        replicationTasksRows[i].DataEncoding = string(blob.Encoding)
4✔
1241
                }
1242
                result, err := tx.InsertIntoReplicationTasks(ctx, replicationTasksRows)
4✔
1243
                if err != nil {
5✔
1244
                        return convertCommonErrors(tx, "CreateFailoverMarkerTasks", "", err)
1✔
1245
                }
1✔
1246
                rowsAffected, err := result.RowsAffected()
3✔
1247
                if err != nil {
4✔
1248
                        return &types.InternalServiceError{Message: fmt.Sprintf("CreateFailoverMarkerTasks failed. Could not verify number of rows inserted. Error: %v", err)}
1✔
1249
                }
1✔
1250
                if int(rowsAffected) != len(replicationTasksRows) {
3✔
1251
                        return &types.InternalServiceError{Message: fmt.Sprintf("CreateFailoverMarkerTasks failed. Inserted %v instead of %v rows into replication_tasks.", rowsAffected, len(replicationTasksRows))}
1✔
1252
                }
1✔
1253
                return nil
1✔
1254
        })
1255
}
1256

1257
type timerTaskPageToken struct {
1258
        TaskID    int64
1259
        Timestamp time.Time
1260
}
1261

1262
func (t *timerTaskPageToken) serialize() ([]byte, error) {
481✔
1263
        return json.Marshal(t)
481✔
1264
}
481✔
1265

1266
func (t *timerTaskPageToken) deserialize(payload []byte) error {
3✔
1267
        return json.Unmarshal(payload, t)
3✔
1268
}
3✔
1269

1270
func (m *sqlExecutionStore) GetTimerIndexTasks(
1271
        ctx context.Context,
1272
        request *p.GetTimerIndexTasksRequest,
1273
) (*p.GetTimerIndexTasksResponse, error) {
2,080✔
1274

2,080✔
1275
        pageToken := &timerTaskPageToken{TaskID: math.MinInt64, Timestamp: request.MinTimestamp}
2,080✔
1276
        if len(request.NextPageToken) > 0 {
2,083✔
1277
                if err := pageToken.deserialize(request.NextPageToken); err != nil {
3✔
1278
                        return nil, &types.InternalServiceError{
×
1279
                                Message: fmt.Sprintf("error deserializing timerTaskPageToken: %v", err),
×
1280
                        }
×
1281
                }
×
1282
        }
1283

1284
        rows, err := m.db.SelectFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
2,080✔
1285
                ShardID:                m.shardID,
2,080✔
1286
                MinVisibilityTimestamp: pageToken.Timestamp,
2,080✔
1287
                TaskID:                 pageToken.TaskID,
2,080✔
1288
                MaxVisibilityTimestamp: request.MaxTimestamp,
2,080✔
1289
                PageSize:               request.BatchSize + 1,
2,080✔
1290
        })
2,080✔
1291

2,080✔
1292
        if err != nil && err != sql.ErrNoRows {
2,081✔
1293
                return nil, convertCommonErrors(m.db, "GetTimerIndexTasks", "", err)
1✔
1294
        }
1✔
1295

1296
        resp := &p.GetTimerIndexTasksResponse{Timers: make([]*p.TimerTaskInfo, len(rows))}
2,079✔
1297
        for i, row := range rows {
10,216✔
1298
                info, err := m.parser.TimerTaskInfoFromBlob(row.Data, row.DataEncoding)
8,137✔
1299
                if err != nil {
8,138✔
1300
                        return nil, err
1✔
1301
                }
1✔
1302
                resp.Timers[i] = &p.TimerTaskInfo{
8,136✔
1303
                        VisibilityTimestamp: row.VisibilityTimestamp,
8,136✔
1304
                        TaskID:              row.TaskID,
8,136✔
1305
                        DomainID:            info.DomainID.String(),
8,136✔
1306
                        WorkflowID:          info.GetWorkflowID(),
8,136✔
1307
                        RunID:               info.RunID.String(),
8,136✔
1308
                        TaskType:            int(info.GetTaskType()),
8,136✔
1309
                        TimeoutType:         int(info.GetTimeoutType()),
8,136✔
1310
                        EventID:             info.GetEventID(),
8,136✔
1311
                        ScheduleAttempt:     info.GetScheduleAttempt(),
8,136✔
1312
                        Version:             info.GetVersion(),
8,136✔
1313
                }
8,136✔
1314
        }
1315

1316
        if len(resp.Timers) > request.BatchSize {
2,555✔
1317
                pageToken = &timerTaskPageToken{
477✔
1318
                        TaskID:    resp.Timers[request.BatchSize].TaskID,
477✔
1319
                        Timestamp: resp.Timers[request.BatchSize].VisibilityTimestamp,
477✔
1320
                }
477✔
1321
                resp.Timers = resp.Timers[:request.BatchSize]
477✔
1322
                nextToken, err := pageToken.serialize()
477✔
1323
                if err != nil {
477✔
1324
                        return nil, &types.InternalServiceError{
×
1325
                                Message: fmt.Sprintf("GetTimerTasks: error serializing page token: %v", err),
×
1326
                        }
×
1327
                }
×
1328
                resp.NextPageToken = nextToken
477✔
1329
        }
1330

1331
        return resp, nil
2,078✔
1332
}
1333

1334
func (m *sqlExecutionStore) CompleteTimerTask(
1335
        ctx context.Context,
1336
        request *p.CompleteTimerTaskRequest,
1337
) error {
2✔
1338

2✔
1339
        if _, err := m.db.DeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
2✔
1340
                ShardID:             m.shardID,
2✔
1341
                VisibilityTimestamp: request.VisibilityTimestamp,
2✔
1342
                TaskID:              request.TaskID,
2✔
1343
        }); err != nil {
3✔
1344
                return convertCommonErrors(m.db, "CompleteTimerTask", "", err)
1✔
1345
        }
1✔
1346
        return nil
1✔
1347
}
1348

1349
func (m *sqlExecutionStore) RangeCompleteTimerTask(
1350
        ctx context.Context,
1351
        request *p.RangeCompleteTimerTaskRequest,
1352
) (*p.RangeCompleteTimerTaskResponse, error) {
18✔
1353
        result, err := m.db.RangeDeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
18✔
1354
                ShardID:                m.shardID,
18✔
1355
                MinVisibilityTimestamp: request.InclusiveBeginTimestamp,
18✔
1356
                MaxVisibilityTimestamp: request.ExclusiveEndTimestamp,
18✔
1357
                PageSize:               request.PageSize,
18✔
1358
        })
18✔
1359
        if err != nil {
19✔
1360
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
1✔
1361
        }
1✔
1362
        rowsDeleted, err := result.RowsAffected()
17✔
1363
        if err != nil {
17✔
1364
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
×
1365
        }
×
1366
        return &p.RangeCompleteTimerTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
17✔
1367
}
1368

1369
func (m *sqlExecutionStore) PutReplicationTaskToDLQ(
1370
        ctx context.Context,
1371
        request *p.InternalPutReplicationTaskToDLQRequest,
1372
) error {
5✔
1373
        replicationTask := request.TaskInfo
5✔
1374
        blob, err := m.parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{
5✔
1375
                DomainID:                serialization.MustParseUUID(replicationTask.DomainID),
5✔
1376
                WorkflowID:              replicationTask.WorkflowID,
5✔
1377
                RunID:                   serialization.MustParseUUID(replicationTask.RunID),
5✔
1378
                TaskType:                int16(replicationTask.TaskType),
5✔
1379
                FirstEventID:            replicationTask.FirstEventID,
5✔
1380
                NextEventID:             replicationTask.NextEventID,
5✔
1381
                Version:                 replicationTask.Version,
5✔
1382
                ScheduledID:             replicationTask.ScheduledID,
5✔
1383
                EventStoreVersion:       p.EventStoreVersion,
5✔
1384
                NewRunEventStoreVersion: p.EventStoreVersion,
5✔
1385
                BranchToken:             replicationTask.BranchToken,
5✔
1386
                NewRunBranchToken:       replicationTask.NewRunBranchToken,
5✔
1387
                CreationTimestamp:       replicationTask.CreationTime,
5✔
1388
        })
5✔
1389
        if err != nil {
6✔
1390
                return err
1✔
1391
        }
1✔
1392

1393
        row := &sqlplugin.ReplicationTaskDLQRow{
4✔
1394
                SourceClusterName: request.SourceClusterName,
4✔
1395
                ShardID:           m.shardID,
4✔
1396
                TaskID:            replicationTask.TaskID,
4✔
1397
                Data:              blob.Data,
4✔
1398
                DataEncoding:      string(blob.Encoding),
4✔
1399
        }
4✔
1400

4✔
1401
        _, err = m.db.InsertIntoReplicationTasksDLQ(ctx, row)
4✔
1402

4✔
1403
        // Tasks are immutable. So it's fine if we already persisted it before.
4✔
1404
        // This can happen when tasks are retried (ack and cleanup can have lag on source side).
4✔
1405
        if err != nil && !m.db.IsDupEntryError(err) {
5✔
1406
                return convertCommonErrors(m.db, "PutReplicationTaskToDLQ", "", err)
1✔
1407
        }
1✔
1408

1409
        return nil
3✔
1410
}
1411

1412
func (m *sqlExecutionStore) populateWorkflowMutableState(
1413
        execution sqlplugin.ExecutionsRow,
1414
) (*p.InternalWorkflowMutableState, error) {
487✔
1415

487✔
1416
        info, err := m.parser.WorkflowExecutionInfoFromBlob(execution.Data, execution.DataEncoding)
487✔
1417
        if err != nil {
487✔
1418
                return nil, err
×
1419
        }
×
1420

1421
        state := &p.InternalWorkflowMutableState{}
487✔
1422
        state.ExecutionInfo = serialization.ToInternalWorkflowExecutionInfo(info)
487✔
1423
        state.ExecutionInfo.DomainID = execution.DomainID.String()
487✔
1424
        state.ExecutionInfo.WorkflowID = execution.WorkflowID
487✔
1425
        state.ExecutionInfo.RunID = execution.RunID.String()
487✔
1426
        state.ExecutionInfo.NextEventID = execution.NextEventID
487✔
1427
        // TODO: remove this after all 2DC workflows complete
487✔
1428
        if info.LastWriteEventID != nil {
488✔
1429
                state.ReplicationState = &p.ReplicationState{}
1✔
1430
                state.ReplicationState.StartVersion = info.GetStartVersion()
1✔
1431
                state.ReplicationState.LastWriteVersion = execution.LastWriteVersion
1✔
1432
                state.ReplicationState.LastWriteEventID = info.GetLastWriteEventID()
1✔
1433
        }
1✔
1434

1435
        if info.GetVersionHistories() != nil {
974✔
1436
                state.VersionHistories = p.NewDataBlob(
487✔
1437
                        info.GetVersionHistories(),
487✔
1438
                        common.EncodingType(info.GetVersionHistoriesEncoding()),
487✔
1439
                )
487✔
1440
        }
487✔
1441

1442
        if info.GetChecksum() != nil {
974✔
1443
                state.ChecksumData = p.NewDataBlob(
487✔
1444
                        info.GetChecksum(),
487✔
1445
                        common.EncodingType(info.GetChecksumEncoding()),
487✔
1446
                )
487✔
1447
        }
487✔
1448

1449
        return state, nil
487✔
1450
}
1451

1452
func (m *sqlExecutionStore) populateInternalListConcreteExecutions(
1453
        executions []sqlplugin.ExecutionsRow,
1454
) ([]*p.InternalListConcreteExecutionsEntity, error) {
×
1455

×
1456
        concreteExecutions := make([]*p.InternalListConcreteExecutionsEntity, 0, len(executions))
×
1457
        for _, execution := range executions {
×
1458
                mutableState, err := m.populateWorkflowMutableState(execution)
×
1459
                if err != nil {
×
1460
                        return nil, err
×
1461
                }
×
1462

1463
                concreteExecution := &p.InternalListConcreteExecutionsEntity{
×
1464
                        ExecutionInfo:    mutableState.ExecutionInfo,
×
1465
                        VersionHistories: mutableState.VersionHistories,
×
1466
                }
×
1467
                concreteExecutions = append(concreteExecutions, concreteExecution)
×
1468
        }
1469
        return concreteExecutions, nil
×
1470
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc