• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e2a8f-ec9b-4f64-85d5-18430de8b3d4

10 Mar 2024 10:50PM UTC coverage: 64.061% (+0.02%) from 64.043%
018e2a8f-ec9b-4f64-85d5-18430de8b3d4

push

buildkite

web-flow
adding mutable state builder tests - adding continue-as-new events (#5768)

Adds some unit tests to the mutable state builder around the Continue-as-new event.

4 of 5 new or added lines in 3 files covered. (80.0%)

87 existing lines in 12 files now uncovered.

93388 of 145779 relevant lines covered (64.06%)

2345.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.57
/common/persistence/sql/sql_execution_store.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "bytes"
25
        "context"
26
        "database/sql"
27
        "encoding/json"
28
        "fmt"
29
        "math"
30
        "runtime/debug"
31
        "time"
32

33
        "golang.org/x/sync/errgroup"
34

35
        "github.com/uber/cadence/common"
36
        "github.com/uber/cadence/common/collection"
37
        "github.com/uber/cadence/common/log"
38
        p "github.com/uber/cadence/common/persistence"
39
        "github.com/uber/cadence/common/persistence/serialization"
40
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
41
        "github.com/uber/cadence/common/types"
42
)
43

44
const (
45
        emptyWorkflowID       string = ""
46
        emptyReplicationRunID string = "30000000-5000-f000-f000-000000000000"
47
)
48

49
type sqlExecutionStore struct {
50
        sqlStore
51
        shardID                                int
52
        txExecuteShardLockedFn                 func(context.Context, int, string, int64, func(sqlplugin.Tx) error) error
53
        lockCurrentExecutionIfExistsFn         func(context.Context, sqlplugin.Tx, int, serialization.UUID, string) (*sqlplugin.CurrentExecutionsRow, error)
54
        createOrUpdateCurrentExecutionFn       func(context.Context, sqlplugin.Tx, p.CreateWorkflowMode, int, serialization.UUID, string, serialization.UUID, int, int, string, int64, int64) error
55
        assertNotCurrentExecutionFn            func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID) error
56
        assertRunIDAndUpdateCurrentExecutionFn func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID, serialization.UUID, string, int, int, int64, int64) error
57
        applyWorkflowSnapshotTxAsNewFn         func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowSnapshot, serialization.Parser) error
58
        applyWorkflowMutationTxFn              func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowMutation, serialization.Parser) error
59
        applyWorkflowSnapshotTxAsResetFn       func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowSnapshot, serialization.Parser) error
60
}
61

62
var _ p.ExecutionStore = (*sqlExecutionStore)(nil)
63

64
// NewSQLExecutionStore creates an instance of ExecutionStore
65
func NewSQLExecutionStore(
66
        db sqlplugin.DB,
67
        logger log.Logger,
68
        shardID int,
69
        parser serialization.Parser,
70
        dc *p.DynamicConfiguration,
71
) (p.ExecutionStore, error) {
42✔
72

42✔
73
        store := &sqlExecutionStore{
42✔
74
                shardID:                                shardID,
42✔
75
                lockCurrentExecutionIfExistsFn:         lockCurrentExecutionIfExists,
42✔
76
                createOrUpdateCurrentExecutionFn:       createOrUpdateCurrentExecution,
42✔
77
                assertNotCurrentExecutionFn:            assertNotCurrentExecution,
42✔
78
                assertRunIDAndUpdateCurrentExecutionFn: assertRunIDAndUpdateCurrentExecution,
42✔
79
                applyWorkflowSnapshotTxAsNewFn:         applyWorkflowSnapshotTxAsNew,
42✔
80
                applyWorkflowMutationTxFn:              applyWorkflowMutationTx,
42✔
81
                applyWorkflowSnapshotTxAsResetFn:       applyWorkflowSnapshotTxAsReset,
42✔
82
                sqlStore: sqlStore{
42✔
83
                        db:     db,
42✔
84
                        logger: logger,
42✔
85
                        parser: parser,
42✔
86
                        dc:     dc,
42✔
87
                },
42✔
88
        }
42✔
89
        store.txExecuteShardLockedFn = store.txExecuteShardLocked
42✔
90
        return store, nil
42✔
91
}
42✔
92

93
// txExecuteShardLocked executes f under transaction and with read lock on shard row
94
func (m *sqlExecutionStore) txExecuteShardLocked(
95
        ctx context.Context,
96
        dbShardID int,
97
        operation string,
98
        rangeID int64,
99
        fn func(tx sqlplugin.Tx) error,
100
) error {
3,240✔
101

3,240✔
102
        return m.txExecute(ctx, dbShardID, operation, func(tx sqlplugin.Tx) error {
6,480✔
103
                if err := readLockShard(ctx, tx, m.shardID, rangeID); err != nil {
3,240✔
104
                        return err
×
105
                }
×
106
                err := fn(tx)
3,240✔
107
                if err != nil {
3,270✔
108
                        return err
30✔
109
                }
30✔
110
                return nil
3,212✔
111
        })
112
}
113

114
func (m *sqlExecutionStore) GetShardID() int {
4,260✔
115
        return m.shardID
4,260✔
116
}
4,260✔
117

118
func (m *sqlExecutionStore) CreateWorkflowExecution(
119
        ctx context.Context,
120
        request *p.InternalCreateWorkflowExecutionRequest,
121
) (response *p.CreateWorkflowExecutionResponse, err error) {
336✔
122
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
336✔
123

336✔
124
        err = m.txExecuteShardLockedFn(ctx, dbShardID, "CreateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
672✔
125
                response, err = m.createWorkflowExecutionTx(ctx, tx, request)
336✔
126
                return err
336✔
127
        })
336✔
128
        return
336✔
129
}
130

131
func (m *sqlExecutionStore) createWorkflowExecutionTx(
132
        ctx context.Context,
133
        tx sqlplugin.Tx,
134
        request *p.InternalCreateWorkflowExecutionRequest,
135
) (*p.CreateWorkflowExecutionResponse, error) {
336✔
136

336✔
137
        newWorkflow := request.NewWorkflowSnapshot
336✔
138
        executionInfo := newWorkflow.ExecutionInfo
336✔
139
        startVersion := newWorkflow.StartVersion
336✔
140
        lastWriteVersion := newWorkflow.LastWriteVersion
336✔
141
        shardID := m.shardID
336✔
142
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
336✔
143
        workflowID := executionInfo.WorkflowID
336✔
144
        runID := serialization.MustParseUUID(executionInfo.RunID)
336✔
145

336✔
146
        if err := p.ValidateCreateWorkflowModeState(
336✔
147
                request.Mode,
336✔
148
                newWorkflow,
336✔
149
        ); err != nil {
336✔
150
                return nil, err
×
151
        }
×
152

153
        var err error
336✔
154
        var row *sqlplugin.CurrentExecutionsRow
336✔
155
        if row, err = m.lockCurrentExecutionIfExistsFn(ctx, tx, m.shardID, domainID, workflowID); err != nil {
336✔
156
                return nil, err
×
157
        }
×
158

159
        // current workflow record check
160
        if row != nil {
386✔
161
                // current run ID, last write version, current workflow state check
50✔
162
                switch request.Mode {
50✔
163
                case p.CreateWorkflowModeBrandNew:
28✔
164
                        return nil, &p.WorkflowExecutionAlreadyStartedError{
28✔
165
                                Msg:              fmt.Sprintf("Workflow execution already running. WorkflowId: %v", row.WorkflowID),
28✔
166
                                StartRequestID:   row.CreateRequestID,
28✔
167
                                RunID:            row.RunID.String(),
28✔
168
                                State:            int(row.State),
28✔
169
                                CloseStatus:      int(row.CloseStatus),
28✔
170
                                LastWriteVersion: row.LastWriteVersion,
28✔
171
                        }
28✔
172

173
                case p.CreateWorkflowModeWorkflowIDReuse:
20✔
174
                        if request.PreviousLastWriteVersion != row.LastWriteVersion {
20✔
175
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
176
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
177
                                                "LastWriteVersion: %v, PreviousLastWriteVersion: %v",
×
178
                                                workflowID, row.LastWriteVersion, request.PreviousLastWriteVersion),
×
179
                                }
×
180
                        }
×
181
                        if row.State != p.WorkflowStateCompleted {
20✔
182
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
183
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
184
                                                "State: %v, Expected: %v",
×
185
                                                workflowID, row.State, p.WorkflowStateCompleted),
×
186
                                }
×
187
                        }
×
188
                        runIDStr := row.RunID.String()
20✔
189
                        if runIDStr != request.PreviousRunID {
20✔
190
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
191
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
192
                                                "RunID: %v, PreviousRunID: %v",
×
193
                                                workflowID, runIDStr, request.PreviousRunID),
×
194
                                }
×
195
                        }
×
196

197
                case p.CreateWorkflowModeZombie:
2✔
198
                        // zombie workflow creation with existence of current record, this is a noop
2✔
199
                        if err := assertRunIDMismatch(serialization.MustParseUUID(executionInfo.RunID), row.RunID); err != nil {
2✔
200
                                return nil, err
×
201
                        }
×
202

203
                case p.CreateWorkflowModeContinueAsNew:
2✔
204
                        runIDStr := row.RunID.String()
2✔
205
                        if runIDStr != request.PreviousRunID {
2✔
206
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
207
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
208
                                                "RunID: %v, PreviousRunID: %v",
×
209
                                                workflowID, runIDStr, request.PreviousRunID),
×
210
                                }
×
211
                        }
×
212

213
                default:
×
214
                        return nil, &types.InternalServiceError{
×
215
                                Message: fmt.Sprintf(
×
216
                                        "CreteWorkflowExecution: unknown mode: %v",
×
217
                                        request.Mode,
×
218
                                ),
×
219
                        }
×
220
                }
221
        }
222

223
        if err := m.createOrUpdateCurrentExecutionFn(
308✔
224
                ctx,
308✔
225
                tx,
308✔
226
                request.Mode,
308✔
227
                m.shardID,
308✔
228
                domainID,
308✔
229
                workflowID,
308✔
230
                runID,
308✔
231
                executionInfo.State,
308✔
232
                executionInfo.CloseStatus,
308✔
233
                executionInfo.CreateRequestID,
308✔
234
                startVersion,
308✔
235
                lastWriteVersion); err != nil {
308✔
236
                return nil, err
×
237
        }
×
238

239
        if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, &request.NewWorkflowSnapshot, m.parser); err != nil {
310✔
240
                return nil, err
2✔
241
        }
2✔
242

243
        return &p.CreateWorkflowExecutionResponse{}, nil
308✔
244
}
245

246
func (m *sqlExecutionStore) getExecutions(
247
        ctx context.Context,
248
        request *p.InternalGetWorkflowExecutionRequest,
249
        domainID serialization.UUID,
250
        wfID string,
251
        runID serialization.UUID,
252
) ([]sqlplugin.ExecutionsRow, error) {
794✔
253
        executions, err := m.db.SelectFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
794✔
254
                ShardID: m.shardID, DomainID: domainID, WorkflowID: wfID, RunID: runID})
794✔
255

794✔
256
        if err != nil {
1,104✔
257
                if err == sql.ErrNoRows {
620✔
258
                        return nil, &types.EntityNotExistsError{
310✔
259
                                Message: fmt.Sprintf(
310✔
260
                                        "Workflow execution not found.  WorkflowId: %v, RunId: %v",
310✔
261
                                        request.Execution.GetWorkflowID(),
310✔
262
                                        request.Execution.GetRunID(),
310✔
263
                                ),
310✔
264
                        }
310✔
265
                }
310✔
266
                return nil, convertCommonErrors(m.db, "GetWorkflowExecution", "", err)
×
267
        }
268

269
        if len(executions) == 0 {
486✔
270
                return nil, &types.EntityNotExistsError{
×
271
                        Message: fmt.Sprintf(
×
272
                                "Workflow execution not found.  WorkflowId: %v, RunId: %v",
×
273
                                request.Execution.GetWorkflowID(),
×
274
                                request.Execution.GetRunID(),
×
275
                        ),
×
276
                }
×
277
        }
×
278

279
        if len(executions) != 1 {
486✔
280
                return nil, &types.InternalServiceError{
×
281
                        Message: "GetWorkflowExecution return more than one results.",
×
282
                }
×
283
        }
×
284
        return executions, nil
486✔
285
}
286

287
func (m *sqlExecutionStore) GetWorkflowExecution(
288
        ctx context.Context,
289
        request *p.InternalGetWorkflowExecutionRequest,
290
) (resp *p.InternalGetWorkflowExecutionResponse, e error) {
794✔
291
        recoverPanic := func(recovered interface{}, err *error) {
7,132✔
292
                if recovered != nil {
6,338✔
293
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
294
                }
×
295
        }
296

297
        domainID := serialization.MustParseUUID(request.DomainID)
794✔
298
        runID := serialization.MustParseUUID(request.Execution.RunID)
794✔
299
        wfID := request.Execution.WorkflowID
794✔
300

794✔
301
        var executions []sqlplugin.ExecutionsRow
794✔
302
        var activityInfos map[int64]*p.InternalActivityInfo
794✔
303
        var timerInfos map[string]*p.TimerInfo
794✔
304
        var childExecutionInfos map[int64]*p.InternalChildExecutionInfo
794✔
305
        var requestCancelInfos map[int64]*p.RequestCancelInfo
794✔
306
        var signalInfos map[int64]*p.SignalInfo
794✔
307
        var bufferedEvents []*p.DataBlob
794✔
308
        var signalsRequested map[string]struct{}
794✔
309

794✔
310
        g, ctx := errgroup.WithContext(ctx)
794✔
311

794✔
312
        g.Go(func() (e error) {
1,588✔
313
                defer func() { recoverPanic(recover(), &e) }()
1,588✔
314
                executions, e = m.getExecutions(ctx, request, domainID, wfID, runID)
794✔
315
                return e
794✔
316
        })
317

318
        g.Go(func() (e error) {
1,588✔
319
                defer func() { recoverPanic(recover(), &e) }()
1,588✔
320
                activityInfos, e = getActivityInfoMap(
794✔
321
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
794✔
322
                return e
794✔
323
        })
324

325
        g.Go(func() (e error) {
1,588✔
326
                defer func() { recoverPanic(recover(), &e) }()
1,588✔
327
                timerInfos, e = getTimerInfoMap(
794✔
328
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
794✔
329
                return e
794✔
330
        })
331

332
        g.Go(func() (e error) {
1,588✔
333
                defer func() { recoverPanic(recover(), &e) }()
1,588✔
334
                childExecutionInfos, e = getChildExecutionInfoMap(
794✔
335
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
794✔
336
                return e
794✔
337
        })
338

339
        g.Go(func() (e error) {
1,588✔
340
                defer func() { recoverPanic(recover(), &e) }()
1,588✔
341
                requestCancelInfos, e = getRequestCancelInfoMap(
794✔
342
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
794✔
343
                return e
794✔
344
        })
345

346
        g.Go(func() (e error) {
1,588✔
347
                defer func() { recoverPanic(recover(), &e) }()
1,588✔
348
                signalInfos, e = getSignalInfoMap(
794✔
349
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
794✔
350
                return e
794✔
351
        })
352

353
        g.Go(func() (e error) {
1,588✔
354
                defer func() { recoverPanic(recover(), &e) }()
1,588✔
355
                bufferedEvents, e = getBufferedEvents(
794✔
356
                        ctx, m.db, m.shardID, domainID, wfID, runID)
794✔
357
                return e
794✔
358
        })
359

360
        g.Go(func() (e error) {
1,588✔
361
                defer func() { recoverPanic(recover(), &e) }()
1,588✔
362
                signalsRequested, e = getSignalsRequested(
794✔
363
                        ctx, m.db, m.shardID, domainID, wfID, runID)
794✔
364
                return e
794✔
365
        })
366

367
        err := g.Wait()
794✔
368
        if err != nil {
1,104✔
369
                return nil, err
310✔
370
        }
310✔
371

372
        state, err := m.populateWorkflowMutableState(executions[0])
486✔
373
        if err != nil {
486✔
374
                return nil, &types.InternalServiceError{
×
375
                        Message: fmt.Sprintf("GetWorkflowExecution: failed. Error: %v", err),
×
376
                }
×
377
        }
×
378
        state.ActivityInfos = activityInfos
486✔
379
        state.TimerInfos = timerInfos
486✔
380
        state.ChildExecutionInfos = childExecutionInfos
486✔
381
        state.RequestCancelInfos = requestCancelInfos
486✔
382
        state.SignalInfos = signalInfos
486✔
383
        state.BufferedEvents = bufferedEvents
486✔
384
        state.SignalRequestedIDs = signalsRequested
486✔
385

486✔
386
        return &p.InternalGetWorkflowExecutionResponse{State: state}, nil
486✔
387
}
388

389
func (m *sqlExecutionStore) UpdateWorkflowExecution(
390
        ctx context.Context,
391
        request *p.InternalUpdateWorkflowExecutionRequest,
392
) error {
2,906✔
393
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
2,906✔
394
        return m.txExecuteShardLockedFn(ctx, dbShardID, "UpdateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
5,812✔
395
                return m.updateWorkflowExecutionTx(ctx, tx, request)
2,906✔
396
        })
2,906✔
397
}
398

399
func (m *sqlExecutionStore) updateWorkflowExecutionTx(
400
        ctx context.Context,
401
        tx sqlplugin.Tx,
402
        request *p.InternalUpdateWorkflowExecutionRequest,
403
) error {
2,906✔
404

2,906✔
405
        updateWorkflow := request.UpdateWorkflowMutation
2,906✔
406
        newWorkflow := request.NewWorkflowSnapshot
2,906✔
407

2,906✔
408
        executionInfo := updateWorkflow.ExecutionInfo
2,906✔
409
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
2,906✔
410
        workflowID := executionInfo.WorkflowID
2,906✔
411
        runID := serialization.MustParseUUID(executionInfo.RunID)
2,906✔
412
        shardID := m.shardID
2,906✔
413

2,906✔
414
        if err := p.ValidateUpdateWorkflowModeState(
2,906✔
415
                request.Mode,
2,906✔
416
                updateWorkflow,
2,906✔
417
                newWorkflow,
2,906✔
418
        ); err != nil {
2,906✔
419
                return err
×
420
        }
×
421

422
        switch request.Mode {
2,906✔
423
        case p.UpdateWorkflowModeIgnoreCurrent:
×
424
                // no-op
425
        case p.UpdateWorkflowModeBypassCurrent:
2✔
426
                if err := m.assertNotCurrentExecutionFn(
2✔
427
                        ctx,
2✔
428
                        tx,
2✔
429
                        shardID,
2✔
430
                        domainID,
2✔
431
                        workflowID,
2✔
432
                        runID); err != nil {
2✔
433
                        return err
×
434
                }
×
435

436
        case p.UpdateWorkflowModeUpdateCurrent:
2,906✔
437
                if newWorkflow != nil {
3,021✔
438
                        newExecutionInfo := newWorkflow.ExecutionInfo
115✔
439
                        startVersion := newWorkflow.StartVersion
115✔
440
                        lastWriteVersion := newWorkflow.LastWriteVersion
115✔
441
                        newDomainID := serialization.MustParseUUID(newExecutionInfo.DomainID)
115✔
442
                        newRunID := serialization.MustParseUUID(newExecutionInfo.RunID)
115✔
443

115✔
444
                        if !bytes.Equal(domainID, newDomainID) {
115✔
445
                                return &types.InternalServiceError{
×
446
                                        Message: "UpdateWorkflowExecution: cannot continue as new to another domain",
×
447
                                }
×
448
                        }
×
449

450
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
115✔
451
                                ctx,
115✔
452
                                tx,
115✔
453
                                shardID,
115✔
454
                                domainID,
115✔
455
                                workflowID,
115✔
456
                                newRunID,
115✔
457
                                runID,
115✔
458
                                newWorkflow.ExecutionInfo.CreateRequestID,
115✔
459
                                newWorkflow.ExecutionInfo.State,
115✔
460
                                newWorkflow.ExecutionInfo.CloseStatus,
115✔
461
                                startVersion,
115✔
462
                                lastWriteVersion); err != nil {
115✔
463
                                return err
×
464
                        }
×
465
                } else {
2,792✔
466
                        startVersion := updateWorkflow.StartVersion
2,792✔
467
                        lastWriteVersion := updateWorkflow.LastWriteVersion
2,792✔
468
                        // this is only to update the current record
2,792✔
469
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
2,792✔
470
                                ctx,
2,792✔
471
                                tx,
2,792✔
472
                                shardID,
2,792✔
473
                                domainID,
2,792✔
474
                                workflowID,
2,792✔
475
                                runID,
2,792✔
476
                                runID,
2,792✔
477
                                executionInfo.CreateRequestID,
2,792✔
478
                                executionInfo.State,
2,792✔
479
                                executionInfo.CloseStatus,
2,792✔
480
                                startVersion,
2,792✔
481
                                lastWriteVersion); err != nil {
2,792✔
482
                                return err
×
483
                        }
×
484
                }
485

486
        default:
×
487
                return &types.InternalServiceError{
×
488
                        Message: fmt.Sprintf("UpdateWorkflowExecution: unknown mode: %v", request.Mode),
×
489
                }
×
490
        }
491

492
        if err := m.applyWorkflowMutationTxFn(ctx, tx, shardID, &updateWorkflow, m.parser); err != nil {
2,906✔
493
                return err
×
494
        }
×
495
        if newWorkflow != nil {
3,022✔
496
                if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
116✔
497
                        return err
×
498
                }
×
499
        }
500
        return nil
2,906✔
501
}
502

503
func (m *sqlExecutionStore) ConflictResolveWorkflowExecution(
504
        ctx context.Context,
505
        request *p.InternalConflictResolveWorkflowExecutionRequest,
506
) error {
2✔
507
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
2✔
508
        return m.txExecuteShardLockedFn(ctx, dbShardID, "ConflictResolveWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
4✔
509
                return m.conflictResolveWorkflowExecutionTx(ctx, tx, request)
2✔
510
        })
2✔
511
}
512

513
func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx(
514
        ctx context.Context,
515
        tx sqlplugin.Tx,
516
        request *p.InternalConflictResolveWorkflowExecutionRequest,
517
) error {
2✔
518

2✔
519
        currentWorkflow := request.CurrentWorkflowMutation
2✔
520
        resetWorkflow := request.ResetWorkflowSnapshot
2✔
521
        newWorkflow := request.NewWorkflowSnapshot
2✔
522

2✔
523
        shardID := m.shardID
2✔
524

2✔
525
        domainID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.DomainID)
2✔
526
        workflowID := resetWorkflow.ExecutionInfo.WorkflowID
2✔
527

2✔
528
        if err := p.ValidateConflictResolveWorkflowModeState(
2✔
529
                request.Mode,
2✔
530
                resetWorkflow,
2✔
531
                newWorkflow,
2✔
532
                currentWorkflow,
2✔
533
        ); err != nil {
2✔
534
                return err
×
535
        }
×
536

537
        switch request.Mode {
2✔
538
        case p.ConflictResolveWorkflowModeBypassCurrent:
2✔
539
                if err := m.assertNotCurrentExecutionFn(
2✔
540
                        ctx,
2✔
541
                        tx,
2✔
542
                        shardID,
2✔
543
                        domainID,
2✔
544
                        workflowID,
2✔
545
                        serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)); err != nil {
2✔
546
                        return err
×
547
                }
×
548

549
        case p.ConflictResolveWorkflowModeUpdateCurrent:
2✔
550
                executionInfo := resetWorkflow.ExecutionInfo
2✔
551
                startVersion := resetWorkflow.StartVersion
2✔
552
                lastWriteVersion := resetWorkflow.LastWriteVersion
2✔
553
                if newWorkflow != nil {
2✔
554
                        executionInfo = newWorkflow.ExecutionInfo
×
555
                        startVersion = newWorkflow.StartVersion
×
556
                        lastWriteVersion = newWorkflow.LastWriteVersion
×
557
                }
×
558
                runID := serialization.MustParseUUID(executionInfo.RunID)
2✔
559
                createRequestID := executionInfo.CreateRequestID
2✔
560
                state := executionInfo.State
2✔
561
                closeStatus := executionInfo.CloseStatus
2✔
562

2✔
563
                if currentWorkflow != nil {
2✔
UNCOV
564
                        prevRunID := serialization.MustParseUUID(currentWorkflow.ExecutionInfo.RunID)
×
UNCOV
565

×
UNCOV
566
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
×
UNCOV
567
                                ctx,
×
UNCOV
568
                                tx,
×
UNCOV
569
                                m.shardID,
×
UNCOV
570
                                domainID,
×
UNCOV
571
                                workflowID,
×
UNCOV
572
                                runID,
×
UNCOV
573
                                prevRunID,
×
UNCOV
574
                                createRequestID,
×
UNCOV
575
                                state,
×
UNCOV
576
                                closeStatus,
×
UNCOV
577
                                startVersion,
×
UNCOV
578
                                lastWriteVersion); err != nil {
×
579
                                return err
×
580
                        }
×
581
                } else {
2✔
582
                        // reset workflow is current
2✔
583
                        prevRunID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)
2✔
584

2✔
585
                        if err := m.assertRunIDAndUpdateCurrentExecutionFn(
2✔
586
                                ctx,
2✔
587
                                tx,
2✔
588
                                m.shardID,
2✔
589
                                domainID,
2✔
590
                                workflowID,
2✔
591
                                runID,
2✔
592
                                prevRunID,
2✔
593
                                createRequestID,
2✔
594
                                state,
2✔
595
                                closeStatus,
2✔
596
                                startVersion,
2✔
597
                                lastWriteVersion); err != nil {
2✔
598
                                return err
×
599
                        }
×
600
                }
601

602
        default:
×
603
                return &types.InternalServiceError{
×
604
                        Message: fmt.Sprintf("ConflictResolveWorkflowExecution: unknown mode: %v", request.Mode),
×
605
                }
×
606
        }
607

608
        if err := m.applyWorkflowSnapshotTxAsResetFn(ctx, tx, shardID, &resetWorkflow, m.parser); err != nil {
2✔
609
                return err
×
610
        }
×
611
        if currentWorkflow != nil {
2✔
UNCOV
612
                if err := m.applyWorkflowMutationTxFn(ctx, tx, shardID, currentWorkflow, m.parser); err != nil {
×
613
                        return err
×
614
                }
×
615
        }
616
        if newWorkflow != nil {
2✔
617
                if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
×
618
                        return err
×
619
                }
×
620
        }
621
        return nil
2✔
622
}
623

624
func (m *sqlExecutionStore) DeleteWorkflowExecution(
625
        ctx context.Context,
626
        request *p.DeleteWorkflowExecutionRequest,
627
) error {
36✔
628
        recoverPanic := func(recovered interface{}, err *error) {
310✔
629
                if recovered != nil {
274✔
630
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
631
                }
×
632
        }
633
        domainID := serialization.MustParseUUID(request.DomainID)
36✔
634
        runID := serialization.MustParseUUID(request.RunID)
36✔
635
        wfID := request.WorkflowID
36✔
636
        g, ctx := errgroup.WithContext(ctx)
36✔
637

36✔
638
        g.Go(func() (e error) {
72✔
639
                defer func() { recoverPanic(recover(), &e) }()
72✔
640
                _, e = m.db.DeleteFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
36✔
641
                        ShardID:    m.shardID,
36✔
642
                        DomainID:   domainID,
36✔
643
                        WorkflowID: wfID,
36✔
644
                        RunID:      runID,
36✔
645
                })
36✔
646
                return e
36✔
647
        })
648

649
        g.Go(func() (e error) {
72✔
650
                defer func() { recoverPanic(recover(), &e) }()
72✔
651
                _, e = m.db.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
36✔
652
                        ShardID:    int64(m.shardID),
36✔
653
                        DomainID:   domainID,
36✔
654
                        WorkflowID: wfID,
36✔
655
                        RunID:      runID,
36✔
656
                })
36✔
657
                return e
36✔
658
        })
659

660
        g.Go(func() (e error) {
72✔
661
                defer func() { recoverPanic(recover(), &e) }()
72✔
662
                _, e = m.db.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
36✔
663
                        ShardID:    int64(m.shardID),
36✔
664
                        DomainID:   domainID,
36✔
665
                        WorkflowID: wfID,
36✔
666
                        RunID:      runID,
36✔
667
                })
36✔
668
                return e
36✔
669
        })
670

671
        g.Go(func() (e error) {
72✔
672
                defer func() { recoverPanic(recover(), &e) }()
72✔
673
                _, e = m.db.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
36✔
674
                        ShardID:    int64(m.shardID),
36✔
675
                        DomainID:   domainID,
36✔
676
                        WorkflowID: wfID,
36✔
677
                        RunID:      runID,
36✔
678
                })
36✔
679
                return e
36✔
680
        })
681

682
        g.Go(func() (e error) {
72✔
683
                defer func() { recoverPanic(recover(), &e) }()
72✔
684
                _, e = m.db.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
36✔
685
                        ShardID:    int64(m.shardID),
36✔
686
                        DomainID:   domainID,
36✔
687
                        WorkflowID: wfID,
36✔
688
                        RunID:      runID,
36✔
689
                })
36✔
690
                return e
36✔
691
        })
692

693
        g.Go(func() (e error) {
72✔
694
                defer func() { recoverPanic(recover(), &e) }()
72✔
695
                _, e = m.db.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
36✔
696
                        ShardID:    int64(m.shardID),
36✔
697
                        DomainID:   domainID,
36✔
698
                        WorkflowID: wfID,
36✔
699
                        RunID:      runID,
36✔
700
                })
36✔
701
                return e
36✔
702
        })
703

704
        g.Go(func() (e error) {
72✔
705
                defer func() { recoverPanic(recover(), &e) }()
72✔
706
                _, e = m.db.DeleteFromBufferedEvents(ctx, &sqlplugin.BufferedEventsFilter{
36✔
707
                        ShardID:    m.shardID,
36✔
708
                        DomainID:   domainID,
36✔
709
                        WorkflowID: wfID,
36✔
710
                        RunID:      runID,
36✔
711
                })
36✔
712
                return e
36✔
713
        })
714

715
        g.Go(func() (e error) {
72✔
716
                defer func() { recoverPanic(recover(), &e) }()
72✔
717
                _, e = m.db.DeleteFromSignalsRequestedSets(ctx, &sqlplugin.SignalsRequestedSetsFilter{
36✔
718
                        ShardID:    int64(m.shardID),
36✔
719
                        DomainID:   domainID,
36✔
720
                        WorkflowID: wfID,
36✔
721
                        RunID:      runID,
36✔
722
                })
36✔
723
                return e
36✔
724
        })
725
        return g.Wait()
36✔
726
}
727

728
// its possible for a new run of the same workflow to have started after the run we are deleting
729
// here was finished. In that case, current_executions table will have the same workflowID but different
730
// runID. The following code will delete the row from current_executions if and only if the runID is
731
// same as the one we are trying to delete here
732
func (m *sqlExecutionStore) DeleteCurrentWorkflowExecution(
733
        ctx context.Context,
734
        request *p.DeleteCurrentWorkflowExecutionRequest,
735
) error {
36✔
736

36✔
737
        domainID := serialization.MustParseUUID(request.DomainID)
36✔
738
        runID := serialization.MustParseUUID(request.RunID)
36✔
739
        _, err := m.db.DeleteFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
36✔
740
                ShardID:    int64(m.shardID),
36✔
741
                DomainID:   domainID,
36✔
742
                WorkflowID: request.WorkflowID,
36✔
743
                RunID:      runID,
36✔
744
        })
36✔
745
        if err != nil {
36✔
746
                return convertCommonErrors(m.db, "DeleteCurrentWorkflowExecution", "", err)
×
747
        }
×
748
        return nil
36✔
749
}
750

751
func (m *sqlExecutionStore) GetCurrentExecution(
752
        ctx context.Context,
753
        request *p.GetCurrentExecutionRequest,
754
) (*p.GetCurrentExecutionResponse, error) {
122✔
755

122✔
756
        row, err := m.db.SelectFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
122✔
757
                ShardID:    int64(m.shardID),
122✔
758
                DomainID:   serialization.MustParseUUID(request.DomainID),
122✔
759
                WorkflowID: request.WorkflowID,
122✔
760
        })
122✔
761
        if err != nil {
130✔
762
                return nil, convertCommonErrors(m.db, "GetCurrentExecution", "", err)
8✔
763
        }
8✔
764
        return &p.GetCurrentExecutionResponse{
116✔
765
                StartRequestID:   row.CreateRequestID,
116✔
766
                RunID:            row.RunID.String(),
116✔
767
                State:            int(row.State),
116✔
768
                CloseStatus:      int(row.CloseStatus),
116✔
769
                LastWriteVersion: row.LastWriteVersion,
116✔
770
        }, nil
116✔
771
}
772

773
func (m *sqlExecutionStore) ListCurrentExecutions(
774
        _ context.Context,
775
        _ *p.ListCurrentExecutionsRequest,
776
) (*p.ListCurrentExecutionsResponse, error) {
×
777
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
778
}
×
779

780
func (m *sqlExecutionStore) IsWorkflowExecutionExists(
781
        _ context.Context,
782
        _ *p.IsWorkflowExecutionExistsRequest,
783
) (*p.IsWorkflowExecutionExistsResponse, error) {
×
784
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
785
}
×
786

787
func (m *sqlExecutionStore) ListConcreteExecutions(
788
        ctx context.Context,
789
        request *p.ListConcreteExecutionsRequest,
790
) (*p.InternalListConcreteExecutionsResponse, error) {
×
791

×
792
        filter := &sqlplugin.ExecutionsFilter{}
×
793
        if len(request.PageToken) > 0 {
×
794
                err := gobDeserialize(request.PageToken, &filter)
×
795
                if err != nil {
×
796
                        return nil, &types.InternalServiceError{
×
797
                                Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
798
                        }
×
799
                }
×
800
        } else {
×
801
                filter = &sqlplugin.ExecutionsFilter{
×
802
                        ShardID:    m.shardID,
×
803
                        WorkflowID: "",
×
804
                }
×
805
        }
×
806
        filter.Size = request.PageSize
×
807

×
808
        executions, err := m.db.SelectFromExecutions(ctx, filter)
×
809
        if err != nil {
×
810
                if err == sql.ErrNoRows {
×
811
                        return &p.InternalListConcreteExecutionsResponse{}, nil
×
812
                }
×
813
                return nil, convertCommonErrors(m.db, "ListConcreteExecutions", "", err)
×
814
        }
815

816
        if len(executions) == 0 {
×
817
                return &p.InternalListConcreteExecutionsResponse{}, nil
×
818
        }
×
819
        lastExecution := executions[len(executions)-1]
×
820
        nextFilter := &sqlplugin.ExecutionsFilter{
×
821
                ShardID:    m.shardID,
×
822
                WorkflowID: lastExecution.WorkflowID,
×
823
        }
×
824
        token, err := gobSerialize(nextFilter)
×
825
        if err != nil {
×
826
                return nil, &types.InternalServiceError{
×
827
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
828
                }
×
829
        }
×
830
        concreteExecutions, err := m.populateInternalListConcreteExecutions(executions)
×
831
        if err != nil {
×
832
                return nil, &types.InternalServiceError{
×
833
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
834
                }
×
835
        }
×
836

837
        return &p.InternalListConcreteExecutionsResponse{
×
838
                Executions:    concreteExecutions,
×
839
                NextPageToken: token,
×
840
        }, nil
×
841
}
842

843
func (m *sqlExecutionStore) GetTransferTasks(
844
        ctx context.Context,
845
        request *p.GetTransferTasksRequest,
846
) (*p.GetTransferTasksResponse, error) {
1,506✔
847
        minReadLevel := request.ReadLevel
1,506✔
848
        if len(request.NextPageToken) > 0 {
1,506✔
849
                readLevel, err := deserializePageToken(request.NextPageToken)
×
850
                if err != nil {
×
851
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "failed to deserialize page token", err)
×
852
                }
×
853
                minReadLevel = readLevel
×
854
        }
855
        rows, err := m.db.SelectFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
1,506✔
856
                ShardID:   m.shardID,
1,506✔
857
                MinTaskID: minReadLevel,
1,506✔
858
                MaxTaskID: request.MaxReadLevel,
1,506✔
859
                PageSize:  request.BatchSize,
1,506✔
860
        })
1,506✔
861
        if err != nil {
1,506✔
862
                if err != sql.ErrNoRows {
×
863
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "", err)
×
864
                }
×
865
        }
866
        resp := &p.GetTransferTasksResponse{Tasks: make([]*p.TransferTaskInfo, len(rows))}
1,506✔
867
        for i, row := range rows {
4,975✔
868
                info, err := m.parser.TransferTaskInfoFromBlob(row.Data, row.DataEncoding)
3,469✔
869
                if err != nil {
3,469✔
870
                        return nil, err
×
871
                }
×
872
                resp.Tasks[i] = &p.TransferTaskInfo{
3,469✔
873
                        TaskID:                  row.TaskID,
3,469✔
874
                        DomainID:                info.DomainID.String(),
3,469✔
875
                        WorkflowID:              info.GetWorkflowID(),
3,469✔
876
                        RunID:                   info.RunID.String(),
3,469✔
877
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
3,469✔
878
                        TargetDomainID:          info.TargetDomainID.String(),
3,469✔
879
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
3,469✔
880
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
3,469✔
881
                        TargetRunID:             info.TargetRunID.String(),
3,469✔
882
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
3,469✔
883
                        TaskList:                info.GetTaskList(),
3,469✔
884
                        TaskType:                int(info.GetTaskType()),
3,469✔
885
                        ScheduleID:              info.GetScheduleID(),
3,469✔
886
                        Version:                 info.GetVersion(),
3,469✔
887
                }
3,469✔
888
        }
889
        if len(rows) > 0 {
2,948✔
890
                lastTaskID := rows[len(rows)-1].TaskID
1,442✔
891
                if lastTaskID < request.MaxReadLevel {
1,444✔
892
                        resp.NextPageToken = serializePageToken(lastTaskID)
2✔
893
                }
2✔
894
        }
895
        return resp, nil
1,506✔
896
}
897

898
func (m *sqlExecutionStore) CompleteTransferTask(
899
        ctx context.Context,
900
        request *p.CompleteTransferTaskRequest,
901
) error {
×
902

×
903
        if _, err := m.db.DeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
×
904
                ShardID: m.shardID,
×
905
                TaskID:  request.TaskID,
×
906
        }); err != nil {
×
907
                return convertCommonErrors(m.db, "CompleteTransferTask", "", err)
×
908
        }
×
909
        return nil
×
910
}
911

912
func (m *sqlExecutionStore) RangeCompleteTransferTask(
913
        ctx context.Context,
914
        request *p.RangeCompleteTransferTaskRequest,
915
) (*p.RangeCompleteTransferTaskResponse, error) {
55✔
916
        result, err := m.db.RangeDeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
55✔
917
                ShardID:   m.shardID,
55✔
918
                MinTaskID: request.ExclusiveBeginTaskID,
55✔
919
                MaxTaskID: request.InclusiveEndTaskID,
55✔
920
                PageSize:  request.PageSize,
55✔
921
        })
55✔
922
        if err != nil {
55✔
923
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
×
924
        }
×
925
        rowsDeleted, err := result.RowsAffected()
55✔
926
        if err != nil {
55✔
927
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
×
928
        }
×
929
        return &p.RangeCompleteTransferTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
55✔
930
}
931

932
func (m *sqlExecutionStore) GetCrossClusterTasks(
933
        ctx context.Context,
934
        request *p.GetCrossClusterTasksRequest,
935
) (*p.GetCrossClusterTasksResponse, error) {
94✔
936
        minReadLevel := request.ReadLevel
94✔
937
        if len(request.NextPageToken) > 0 {
94✔
938
                readLevel, err := deserializePageToken(request.NextPageToken)
×
939
                if err != nil {
×
940
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "failed to deserialize page token", err)
×
941
                }
×
942
                minReadLevel = readLevel
×
943
        }
944
        rows, err := m.db.SelectFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
94✔
945
                TargetCluster: request.TargetCluster,
94✔
946
                ShardID:       m.shardID,
94✔
947
                MinTaskID:     minReadLevel,
94✔
948
                MaxTaskID:     request.MaxReadLevel,
94✔
949
                PageSize:      request.BatchSize,
94✔
950
        })
94✔
951
        if err != nil {
94✔
952
                if err != sql.ErrNoRows {
×
953
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "", err)
×
954
                }
×
955
        }
956
        resp := &p.GetCrossClusterTasksResponse{Tasks: make([]*p.CrossClusterTaskInfo, len(rows))}
94✔
957
        for i, row := range rows {
94✔
958
                info, err := m.parser.CrossClusterTaskInfoFromBlob(row.Data, row.DataEncoding)
×
959
                if err != nil {
×
960
                        return nil, err
×
961
                }
×
962
                resp.Tasks[i] = &p.CrossClusterTaskInfo{
×
963
                        TaskID:                  row.TaskID,
×
964
                        DomainID:                info.DomainID.String(),
×
965
                        WorkflowID:              info.GetWorkflowID(),
×
966
                        RunID:                   info.RunID.String(),
×
967
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
×
968
                        TargetDomainID:          info.TargetDomainID.String(),
×
969
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
×
970
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
×
971
                        TargetRunID:             info.TargetRunID.String(),
×
972
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
×
973
                        TaskList:                info.GetTaskList(),
×
974
                        TaskType:                int(info.GetTaskType()),
×
975
                        ScheduleID:              info.GetScheduleID(),
×
976
                        Version:                 info.GetVersion(),
×
977
                }
×
978
        }
979
        if len(rows) > 0 {
94✔
980
                lastTaskID := rows[len(rows)-1].TaskID
×
981
                if lastTaskID < request.MaxReadLevel {
×
982
                        resp.NextPageToken = serializePageToken(lastTaskID)
×
983
                }
×
984
        }
985
        return resp, nil
94✔
986

987
}
988

989
func (m *sqlExecutionStore) CompleteCrossClusterTask(
990
        ctx context.Context,
991
        request *p.CompleteCrossClusterTaskRequest,
992
) error {
×
993
        if _, err := m.db.DeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
×
994
                TargetCluster: request.TargetCluster,
×
995
                ShardID:       m.shardID,
×
996
                TaskID:        request.TaskID,
×
997
        }); err != nil {
×
998
                return convertCommonErrors(m.db, "CompleteCrossClusterTask", "", err)
×
999
        }
×
1000
        return nil
×
1001
}
1002

1003
func (m *sqlExecutionStore) RangeCompleteCrossClusterTask(
1004
        ctx context.Context,
1005
        request *p.RangeCompleteCrossClusterTaskRequest,
1006
) (*p.RangeCompleteCrossClusterTaskResponse, error) {
79✔
1007
        result, err := m.db.RangeDeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
79✔
1008
                TargetCluster: request.TargetCluster,
79✔
1009
                ShardID:       m.shardID,
79✔
1010
                MinTaskID:     request.ExclusiveBeginTaskID,
79✔
1011
                MaxTaskID:     request.InclusiveEndTaskID,
79✔
1012
                PageSize:      request.PageSize,
79✔
1013
        })
79✔
1014
        if err != nil {
79✔
1015
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
×
1016
        }
×
1017
        rowsDeleted, err := result.RowsAffected()
79✔
1018
        if err != nil {
79✔
1019
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
×
1020
        }
×
1021
        return &p.RangeCompleteCrossClusterTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
79✔
1022
}
1023

1024
func (m *sqlExecutionStore) GetReplicationTasks(
1025
        ctx context.Context,
1026
        request *p.GetReplicationTasksRequest,
1027
) (*p.InternalGetReplicationTasksResponse, error) {
61✔
1028

61✔
1029
        readLevel, maxReadLevelInclusive, err := getReadLevels(request)
61✔
1030
        if err != nil {
61✔
1031
                return nil, err
×
1032
        }
×
1033

1034
        rows, err := m.db.SelectFromReplicationTasks(
61✔
1035
                ctx,
61✔
1036
                &sqlplugin.ReplicationTasksFilter{
61✔
1037
                        ShardID:   m.shardID,
61✔
1038
                        MinTaskID: readLevel,
61✔
1039
                        MaxTaskID: maxReadLevelInclusive,
61✔
1040
                        PageSize:  request.BatchSize,
61✔
1041
                })
61✔
1042

61✔
1043
        switch err {
61✔
1044
        case nil:
61✔
1045
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
61✔
1046
        case sql.ErrNoRows:
×
1047
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1048
        default:
×
1049
                return nil, convertCommonErrors(m.db, "GetReplicationTasks", "", err)
×
1050
        }
1051
}
1052

1053
func getReadLevels(request *p.GetReplicationTasksRequest) (readLevel int64, maxReadLevelInclusive int64, err error) {
62✔
1054
        readLevel = request.ReadLevel
62✔
1055
        if len(request.NextPageToken) > 0 {
64✔
1056
                readLevel, err = deserializePageToken(request.NextPageToken)
2✔
1057
                if err != nil {
2✔
1058
                        return 0, 0, err
×
1059
                }
×
1060
        }
1061

1062
        maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxReadLevel)
62✔
1063
        return readLevel, maxReadLevelInclusive, nil
62✔
1064
}
1065

1066
func (m *sqlExecutionStore) populateGetReplicationTasksResponse(
1067
        rows []sqlplugin.ReplicationTasksRow,
1068
        requestMaxReadLevel int64,
1069
) (*p.InternalGetReplicationTasksResponse, error) {
62✔
1070
        if len(rows) == 0 {
124✔
1071
                return &p.InternalGetReplicationTasksResponse{}, nil
62✔
1072
        }
62✔
1073

1074
        var tasks = make([]*p.InternalReplicationTaskInfo, len(rows))
2✔
1075
        for i, row := range rows {
4✔
1076
                info, err := m.parser.ReplicationTaskInfoFromBlob(row.Data, row.DataEncoding)
2✔
1077
                if err != nil {
2✔
1078
                        return nil, err
×
1079
                }
×
1080

1081
                tasks[i] = &p.InternalReplicationTaskInfo{
2✔
1082
                        TaskID:            row.TaskID,
2✔
1083
                        DomainID:          info.DomainID.String(),
2✔
1084
                        WorkflowID:        info.GetWorkflowID(),
2✔
1085
                        RunID:             info.RunID.String(),
2✔
1086
                        TaskType:          int(info.GetTaskType()),
2✔
1087
                        FirstEventID:      info.GetFirstEventID(),
2✔
1088
                        NextEventID:       info.GetNextEventID(),
2✔
1089
                        Version:           info.GetVersion(),
2✔
1090
                        ScheduledID:       info.GetScheduledID(),
2✔
1091
                        BranchToken:       info.GetBranchToken(),
2✔
1092
                        NewRunBranchToken: info.GetNewRunBranchToken(),
2✔
1093
                        CreationTime:      info.GetCreationTimestamp(),
2✔
1094
                }
2✔
1095
        }
1096
        var nextPageToken []byte
2✔
1097
        lastTaskID := rows[len(rows)-1].TaskID
2✔
1098
        if lastTaskID < requestMaxReadLevel {
4✔
1099
                nextPageToken = serializePageToken(lastTaskID)
2✔
1100
        }
2✔
1101
        return &p.InternalGetReplicationTasksResponse{
2✔
1102
                Tasks:         tasks,
2✔
1103
                NextPageToken: nextPageToken,
2✔
1104
        }, nil
2✔
1105
}
1106

1107
func (m *sqlExecutionStore) CompleteReplicationTask(
1108
        ctx context.Context,
1109
        request *p.CompleteReplicationTaskRequest,
1110
) error {
×
1111

×
1112
        if _, err := m.db.DeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
×
1113
                ShardID: m.shardID,
×
1114
                TaskID:  request.TaskID,
×
1115
        }); err != nil {
×
1116
                return convertCommonErrors(m.db, "CompleteReplicationTask", "", err)
×
1117
        }
×
1118
        return nil
×
1119
}
1120

1121
func (m *sqlExecutionStore) RangeCompleteReplicationTask(
1122
        ctx context.Context,
1123
        request *p.RangeCompleteReplicationTaskRequest,
1124
) (*p.RangeCompleteReplicationTaskResponse, error) {
64✔
1125
        result, err := m.db.RangeDeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
64✔
1126
                ShardID:            m.shardID,
64✔
1127
                InclusiveEndTaskID: request.InclusiveEndTaskID,
64✔
1128
                PageSize:           request.PageSize,
64✔
1129
        })
64✔
1130
        if err != nil {
64✔
1131
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
×
1132
        }
×
1133
        rowsDeleted, err := result.RowsAffected()
64✔
1134
        if err != nil {
64✔
1135
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
×
1136
        }
×
1137
        return &p.RangeCompleteReplicationTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
64✔
1138
}
1139

1140
func (m *sqlExecutionStore) GetReplicationTasksFromDLQ(
1141
        ctx context.Context,
1142
        request *p.GetReplicationTasksFromDLQRequest,
1143
) (*p.InternalGetReplicationTasksFromDLQResponse, error) {
2✔
1144

2✔
1145
        readLevel, maxReadLevelInclusive, err := getReadLevels(&request.GetReplicationTasksRequest)
2✔
1146
        if err != nil {
2✔
1147
                return nil, err
×
1148
        }
×
1149

1150
        filter := sqlplugin.ReplicationTasksFilter{
2✔
1151
                ShardID:   m.shardID,
2✔
1152
                MinTaskID: readLevel,
2✔
1153
                MaxTaskID: maxReadLevelInclusive,
2✔
1154
                PageSize:  request.BatchSize,
2✔
1155
        }
2✔
1156
        rows, err := m.db.SelectFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
2✔
1157
                ReplicationTasksFilter: filter,
2✔
1158
                SourceClusterName:      request.SourceClusterName,
2✔
1159
        })
2✔
1160

2✔
1161
        switch err {
2✔
1162
        case nil:
2✔
1163
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
2✔
1164
        case sql.ErrNoRows:
×
1165
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1166
        default:
×
1167
                return nil, convertCommonErrors(m.db, "GetReplicationTasksFromDLQ", "", err)
×
1168
        }
1169
}
1170

1171
func (m *sqlExecutionStore) GetReplicationDLQSize(
1172
        ctx context.Context,
1173
        request *p.GetReplicationDLQSizeRequest,
1174
) (*p.GetReplicationDLQSizeResponse, error) {
8✔
1175

8✔
1176
        size, err := m.db.SelectFromReplicationDLQ(ctx, &sqlplugin.ReplicationTaskDLQFilter{
8✔
1177
                SourceClusterName: request.SourceClusterName,
8✔
1178
                ShardID:           m.shardID,
8✔
1179
        })
8✔
1180

8✔
1181
        switch err {
8✔
1182
        case nil:
8✔
1183
                return &p.GetReplicationDLQSizeResponse{
8✔
1184
                        Size: size,
8✔
1185
                }, nil
8✔
1186
        case sql.ErrNoRows:
×
1187
                return &p.GetReplicationDLQSizeResponse{
×
1188
                        Size: 0,
×
1189
                }, nil
×
1190
        default:
×
1191
                return nil, convertCommonErrors(m.db, "GetReplicationDLQSize", "", err)
×
1192
        }
1193
}
1194

1195
func (m *sqlExecutionStore) DeleteReplicationTaskFromDLQ(
1196
        ctx context.Context,
1197
        request *p.DeleteReplicationTaskFromDLQRequest,
1198
) error {
×
1199

×
1200
        filter := sqlplugin.ReplicationTasksFilter{
×
1201
                ShardID: m.shardID,
×
1202
                TaskID:  request.TaskID,
×
1203
        }
×
1204

×
1205
        if _, err := m.db.DeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
×
1206
                ReplicationTasksFilter: filter,
×
1207
                SourceClusterName:      request.SourceClusterName,
×
1208
        }); err != nil {
×
1209
                return convertCommonErrors(m.db, "DeleteReplicationTaskFromDLQ", "", err)
×
1210
        }
×
1211
        return nil
×
1212
}
1213

1214
func (m *sqlExecutionStore) RangeDeleteReplicationTaskFromDLQ(
1215
        ctx context.Context,
1216
        request *p.RangeDeleteReplicationTaskFromDLQRequest,
1217
) (*p.RangeDeleteReplicationTaskFromDLQResponse, error) {
×
1218
        filter := sqlplugin.ReplicationTasksFilter{
×
1219
                ShardID:            m.shardID,
×
1220
                TaskID:             request.ExclusiveBeginTaskID,
×
1221
                InclusiveEndTaskID: request.InclusiveEndTaskID,
×
1222
                PageSize:           request.PageSize,
×
1223
        }
×
1224
        result, err := m.db.RangeDeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
×
1225
                ReplicationTasksFilter: filter,
×
1226
                SourceClusterName:      request.SourceClusterName,
×
1227
        })
×
1228
        if err != nil {
×
1229
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
×
1230
        }
×
1231
        rowsDeleted, err := result.RowsAffected()
×
1232
        if err != nil {
×
1233
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
×
1234
        }
×
1235
        return &p.RangeDeleteReplicationTaskFromDLQResponse{TasksCompleted: int(rowsDeleted)}, nil
×
1236
}
1237

1238
func (m *sqlExecutionStore) CreateFailoverMarkerTasks(
1239
        ctx context.Context,
1240
        request *p.CreateFailoverMarkersRequest,
1241
) error {
×
1242
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
×
1243
        return m.txExecuteShardLockedFn(ctx, dbShardID, "CreateFailoverMarkerTasks", request.RangeID, func(tx sqlplugin.Tx) error {
×
1244
                replicationTasksRows := make([]sqlplugin.ReplicationTasksRow, len(request.Markers))
×
1245
                for i, task := range request.Markers {
×
1246
                        blob, err := m.parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{
×
1247
                                DomainID:                serialization.MustParseUUID(task.DomainID),
×
1248
                                WorkflowID:              emptyWorkflowID,
×
1249
                                RunID:                   serialization.MustParseUUID(emptyReplicationRunID),
×
1250
                                TaskType:                int16(task.GetType()),
×
1251
                                FirstEventID:            common.EmptyEventID,
×
1252
                                NextEventID:             common.EmptyEventID,
×
1253
                                Version:                 task.GetVersion(),
×
1254
                                ScheduledID:             common.EmptyEventID,
×
1255
                                EventStoreVersion:       p.EventStoreVersion,
×
1256
                                NewRunEventStoreVersion: p.EventStoreVersion,
×
1257
                                BranchToken:             nil,
×
1258
                                NewRunBranchToken:       nil,
×
1259
                                CreationTimestamp:       task.GetVisibilityTimestamp(),
×
1260
                        })
×
1261
                        if err != nil {
×
1262
                                return err
×
1263
                        }
×
1264
                        replicationTasksRows[i].ShardID = m.shardID
×
1265
                        replicationTasksRows[i].TaskID = task.GetTaskID()
×
1266
                        replicationTasksRows[i].Data = blob.Data
×
1267
                        replicationTasksRows[i].DataEncoding = string(blob.Encoding)
×
1268
                }
1269
                result, err := tx.InsertIntoReplicationTasks(ctx, replicationTasksRows)
×
1270
                if err != nil {
×
1271
                        return convertCommonErrors(tx, "CreateFailoverMarkerTasks", "", err)
×
1272
                }
×
1273
                rowsAffected, err := result.RowsAffected()
×
1274
                if err != nil {
×
1275
                        return &types.InternalServiceError{Message: fmt.Sprintf("CreateFailoverMarkerTasks failed. Could not verify number of rows inserted. Error: %v", err)}
×
1276
                }
×
1277
                if int(rowsAffected) != len(replicationTasksRows) {
×
1278
                        return &types.InternalServiceError{Message: fmt.Sprintf("CreateFailoverMarkerTasks failed. Inserted %v instead of %v rows into replication_tasks.", rowsAffected, len(replicationTasksRows))}
×
1279
                }
×
1280
                return nil
×
1281
        })
1282
}
1283

1284
type timerTaskPageToken struct {
1285
        TaskID    int64
1286
        Timestamp time.Time
1287
}
1288

1289
func (t *timerTaskPageToken) serialize() ([]byte, error) {
479✔
1290
        return json.Marshal(t)
479✔
1291
}
479✔
1292

1293
func (t *timerTaskPageToken) deserialize(payload []byte) error {
×
1294
        return json.Unmarshal(payload, t)
×
1295
}
×
1296

1297
func (m *sqlExecutionStore) GetTimerIndexTasks(
1298
        ctx context.Context,
1299
        request *p.GetTimerIndexTasksRequest,
1300
) (*p.GetTimerIndexTasksResponse, error) {
2,078✔
1301

2,078✔
1302
        pageToken := &timerTaskPageToken{TaskID: math.MinInt64, Timestamp: request.MinTimestamp}
2,078✔
1303
        if len(request.NextPageToken) > 0 {
2,078✔
1304
                if err := pageToken.deserialize(request.NextPageToken); err != nil {
×
1305
                        return nil, &types.InternalServiceError{
×
1306
                                Message: fmt.Sprintf("error deserializing timerTaskPageToken: %v", err),
×
1307
                        }
×
1308
                }
×
1309
        }
1310

1311
        rows, err := m.db.SelectFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
2,078✔
1312
                ShardID:                m.shardID,
2,078✔
1313
                MinVisibilityTimestamp: pageToken.Timestamp,
2,078✔
1314
                TaskID:                 pageToken.TaskID,
2,078✔
1315
                MaxVisibilityTimestamp: request.MaxTimestamp,
2,078✔
1316
                PageSize:               request.BatchSize + 1,
2,078✔
1317
        })
2,078✔
1318

2,078✔
1319
        if err != nil && err != sql.ErrNoRows {
2,078✔
1320
                return nil, convertCommonErrors(m.db, "GetTimerIndexTasks", "", err)
×
1321
        }
×
1322

1323
        resp := &p.GetTimerIndexTasksResponse{Timers: make([]*p.TimerTaskInfo, len(rows))}
2,078✔
1324
        for i, row := range rows {
10,258✔
1325
                info, err := m.parser.TimerTaskInfoFromBlob(row.Data, row.DataEncoding)
8,180✔
1326
                if err != nil {
8,180✔
1327
                        return nil, err
×
1328
                }
×
1329
                resp.Timers[i] = &p.TimerTaskInfo{
8,180✔
1330
                        VisibilityTimestamp: row.VisibilityTimestamp,
8,180✔
1331
                        TaskID:              row.TaskID,
8,180✔
1332
                        DomainID:            info.DomainID.String(),
8,180✔
1333
                        WorkflowID:          info.GetWorkflowID(),
8,180✔
1334
                        RunID:               info.RunID.String(),
8,180✔
1335
                        TaskType:            int(info.GetTaskType()),
8,180✔
1336
                        TimeoutType:         int(info.GetTimeoutType()),
8,180✔
1337
                        EventID:             info.GetEventID(),
8,180✔
1338
                        ScheduleAttempt:     info.GetScheduleAttempt(),
8,180✔
1339
                        Version:             info.GetVersion(),
8,180✔
1340
                }
8,180✔
1341
        }
1342

1343
        if len(resp.Timers) > request.BatchSize {
2,557✔
1344
                pageToken = &timerTaskPageToken{
479✔
1345
                        TaskID:    resp.Timers[request.BatchSize].TaskID,
479✔
1346
                        Timestamp: resp.Timers[request.BatchSize].VisibilityTimestamp,
479✔
1347
                }
479✔
1348
                resp.Timers = resp.Timers[:request.BatchSize]
479✔
1349
                nextToken, err := pageToken.serialize()
479✔
1350
                if err != nil {
479✔
1351
                        return nil, &types.InternalServiceError{
×
1352
                                Message: fmt.Sprintf("GetTimerTasks: error serializing page token: %v", err),
×
1353
                        }
×
1354
                }
×
1355
                resp.NextPageToken = nextToken
479✔
1356
        }
1357

1358
        return resp, nil
2,078✔
1359
}
1360

1361
func (m *sqlExecutionStore) CompleteTimerTask(
1362
        ctx context.Context,
1363
        request *p.CompleteTimerTaskRequest,
1364
) error {
×
1365

×
1366
        if _, err := m.db.DeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
×
1367
                ShardID:             m.shardID,
×
1368
                VisibilityTimestamp: request.VisibilityTimestamp,
×
1369
                TaskID:              request.TaskID,
×
1370
        }); err != nil {
×
1371
                return convertCommonErrors(m.db, "CompleteTimerTask", "", err)
×
1372
        }
×
1373
        return nil
×
1374
}
1375

1376
func (m *sqlExecutionStore) RangeCompleteTimerTask(
1377
        ctx context.Context,
1378
        request *p.RangeCompleteTimerTaskRequest,
1379
) (*p.RangeCompleteTimerTaskResponse, error) {
17✔
1380
        result, err := m.db.RangeDeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
17✔
1381
                ShardID:                m.shardID,
17✔
1382
                MinVisibilityTimestamp: request.InclusiveBeginTimestamp,
17✔
1383
                MaxVisibilityTimestamp: request.ExclusiveEndTimestamp,
17✔
1384
                PageSize:               request.PageSize,
17✔
1385
        })
17✔
1386
        if err != nil {
17✔
1387
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
×
1388
        }
×
1389
        rowsDeleted, err := result.RowsAffected()
17✔
1390
        if err != nil {
17✔
1391
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
×
1392
        }
×
1393
        return &p.RangeCompleteTimerTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
17✔
1394
}
1395

1396
func (m *sqlExecutionStore) PutReplicationTaskToDLQ(
1397
        ctx context.Context,
1398
        request *p.InternalPutReplicationTaskToDLQRequest,
1399
) error {
2✔
1400
        replicationTask := request.TaskInfo
2✔
1401
        blob, err := m.parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{
2✔
1402
                DomainID:                serialization.MustParseUUID(replicationTask.DomainID),
2✔
1403
                WorkflowID:              replicationTask.WorkflowID,
2✔
1404
                RunID:                   serialization.MustParseUUID(replicationTask.RunID),
2✔
1405
                TaskType:                int16(replicationTask.TaskType),
2✔
1406
                FirstEventID:            replicationTask.FirstEventID,
2✔
1407
                NextEventID:             replicationTask.NextEventID,
2✔
1408
                Version:                 replicationTask.Version,
2✔
1409
                ScheduledID:             replicationTask.ScheduledID,
2✔
1410
                EventStoreVersion:       p.EventStoreVersion,
2✔
1411
                NewRunEventStoreVersion: p.EventStoreVersion,
2✔
1412
                BranchToken:             replicationTask.BranchToken,
2✔
1413
                NewRunBranchToken:       replicationTask.NewRunBranchToken,
2✔
1414
                CreationTimestamp:       replicationTask.CreationTime,
2✔
1415
        })
2✔
1416
        if err != nil {
2✔
1417
                return err
×
1418
        }
×
1419

1420
        row := &sqlplugin.ReplicationTaskDLQRow{
2✔
1421
                SourceClusterName: request.SourceClusterName,
2✔
1422
                ShardID:           m.shardID,
2✔
1423
                TaskID:            replicationTask.TaskID,
2✔
1424
                Data:              blob.Data,
2✔
1425
                DataEncoding:      string(blob.Encoding),
2✔
1426
        }
2✔
1427

2✔
1428
        _, err = m.db.InsertIntoReplicationTasksDLQ(ctx, row)
2✔
1429

2✔
1430
        // Tasks are immutable. So it's fine if we already persisted it before.
2✔
1431
        // This can happen when tasks are retried (ack and cleanup can have lag on source side).
2✔
1432
        if err != nil && !m.db.IsDupEntryError(err) {
2✔
1433
                return convertCommonErrors(m.db, "PutReplicationTaskToDLQ", "", err)
×
1434
        }
×
1435

1436
        return nil
2✔
1437
}
1438

1439
func (m *sqlExecutionStore) populateWorkflowMutableState(
1440
        execution sqlplugin.ExecutionsRow,
1441
) (*p.InternalWorkflowMutableState, error) {
486✔
1442

486✔
1443
        info, err := m.parser.WorkflowExecutionInfoFromBlob(execution.Data, execution.DataEncoding)
486✔
1444
        if err != nil {
486✔
1445
                return nil, err
×
1446
        }
×
1447

1448
        state := &p.InternalWorkflowMutableState{}
486✔
1449
        state.ExecutionInfo = serialization.ToInternalWorkflowExecutionInfo(info)
486✔
1450
        state.ExecutionInfo.DomainID = execution.DomainID.String()
486✔
1451
        state.ExecutionInfo.WorkflowID = execution.WorkflowID
486✔
1452
        state.ExecutionInfo.RunID = execution.RunID.String()
486✔
1453
        state.ExecutionInfo.NextEventID = execution.NextEventID
486✔
1454
        // TODO: remove this after all 2DC workflows complete
486✔
1455
        if info.LastWriteEventID != nil {
486✔
1456
                state.ReplicationState = &p.ReplicationState{}
×
1457
                state.ReplicationState.StartVersion = info.GetStartVersion()
×
1458
                state.ReplicationState.LastWriteVersion = execution.LastWriteVersion
×
1459
                state.ReplicationState.LastWriteEventID = info.GetLastWriteEventID()
×
1460
        }
×
1461

1462
        if info.GetVersionHistories() != nil {
972✔
1463
                state.VersionHistories = p.NewDataBlob(
486✔
1464
                        info.GetVersionHistories(),
486✔
1465
                        common.EncodingType(info.GetVersionHistoriesEncoding()),
486✔
1466
                )
486✔
1467
        }
486✔
1468

1469
        if info.GetChecksum() != nil {
972✔
1470
                state.ChecksumData = p.NewDataBlob(
486✔
1471
                        info.GetChecksum(),
486✔
1472
                        common.EncodingType(info.GetChecksumEncoding()),
486✔
1473
                )
486✔
1474
        }
486✔
1475

1476
        return state, nil
486✔
1477
}
1478

1479
func (m *sqlExecutionStore) populateInternalListConcreteExecutions(
1480
        executions []sqlplugin.ExecutionsRow,
1481
) ([]*p.InternalListConcreteExecutionsEntity, error) {
×
1482

×
1483
        concreteExecutions := make([]*p.InternalListConcreteExecutionsEntity, 0, len(executions))
×
1484
        for _, execution := range executions {
×
1485
                mutableState, err := m.populateWorkflowMutableState(execution)
×
1486
                if err != nil {
×
1487
                        return nil, err
×
1488
                }
×
1489

1490
                concreteExecution := &p.InternalListConcreteExecutionsEntity{
×
1491
                        ExecutionInfo:    mutableState.ExecutionInfo,
×
1492
                        VersionHistories: mutableState.VersionHistories,
×
1493
                }
×
1494
                concreteExecutions = append(concreteExecutions, concreteExecution)
×
1495
        }
1496
        return concreteExecutions, nil
×
1497
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc