• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e0f0f-35f3-44b0-8512-458527ee339a

05 Mar 2024 02:40PM UTC coverage: 63.098% (+0.1%) from 62.989%
018e0f0f-35f3-44b0-8512-458527ee339a

push

buildkite

web-flow
remove old metrics wrappers and use new generated metered wrappers (#5717)

* remove old metrics wrappers and use new generated metered wrappers

10 of 11 new or added lines in 2 files covered. (90.91%)

58 existing lines in 11 files now uncovered.

92544 of 146667 relevant lines covered (63.1%)

2338.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.15
/common/persistence/sql/sql_execution_store.go
1
// Copyright (c) 2018 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package sql
22

23
import (
24
        "bytes"
25
        "context"
26
        "database/sql"
27
        "encoding/json"
28
        "fmt"
29
        "math"
30
        "runtime/debug"
31
        "time"
32

33
        "golang.org/x/sync/errgroup"
34

35
        "github.com/uber/cadence/common"
36
        "github.com/uber/cadence/common/collection"
37
        "github.com/uber/cadence/common/log"
38
        "github.com/uber/cadence/common/log/tag"
39
        p "github.com/uber/cadence/common/persistence"
40
        "github.com/uber/cadence/common/persistence/serialization"
41
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
42
        "github.com/uber/cadence/common/types"
43
)
44

45
const (
46
        emptyWorkflowID       string = ""
47
        emptyReplicationRunID string = "30000000-5000-f000-f000-000000000000"
48
)
49

50
type sqlExecutionStore struct {
51
        sqlStore
52
        shardID                          int
53
        txExecuteShardLockedFn           func(context.Context, int, string, int64, func(sqlplugin.Tx) error) error
54
        lockCurrentExecutionIfExistsFn   func(context.Context, sqlplugin.Tx, int, serialization.UUID, string) (*sqlplugin.CurrentExecutionsRow, error)
55
        createOrUpdateCurrentExecutionFn func(context.Context, sqlplugin.Tx, p.CreateWorkflowMode, int, serialization.UUID, string, serialization.UUID, int, int, string, int64, int64) error
56
        applyWorkflowSnapshotTxAsNewFn   func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowSnapshot, serialization.Parser) error
57
}
58

59
var _ p.ExecutionStore = (*sqlExecutionStore)(nil)
60

61
// NewSQLExecutionStore creates an instance of ExecutionStore
62
func NewSQLExecutionStore(
63
        db sqlplugin.DB,
64
        logger log.Logger,
65
        shardID int,
66
        parser serialization.Parser,
67
        dc *p.DynamicConfiguration,
68
) (p.ExecutionStore, error) {
88✔
69

88✔
70
        store := &sqlExecutionStore{
88✔
71
                shardID:                          shardID,
88✔
72
                lockCurrentExecutionIfExistsFn:   lockCurrentExecutionIfExists,
88✔
73
                createOrUpdateCurrentExecutionFn: createOrUpdateCurrentExecution,
88✔
74
                applyWorkflowSnapshotTxAsNewFn:   applyWorkflowSnapshotTxAsNew,
88✔
75
                sqlStore: sqlStore{
88✔
76
                        db:     db,
88✔
77
                        logger: logger,
88✔
78
                        parser: parser,
88✔
79
                        dc:     dc,
88✔
80
                },
88✔
81
        }
88✔
82
        store.txExecuteShardLockedFn = store.txExecuteShardLocked
88✔
83
        return store, nil
88✔
84
}
88✔
85

86
// txExecuteShardLocked executes f under transaction and with read lock on shard row
87
func (m *sqlExecutionStore) txExecuteShardLocked(
88
        ctx context.Context,
89
        dbShardID int,
90
        operation string,
91
        rangeID int64,
92
        fn func(tx sqlplugin.Tx) error,
93
) error {
3,237✔
94

3,237✔
95
        return m.txExecute(ctx, dbShardID, operation, func(tx sqlplugin.Tx) error {
6,474✔
96
                if err := readLockShard(ctx, tx, m.shardID, rangeID); err != nil {
3,238✔
97
                        return err
1✔
98
                }
1✔
99
                err := fn(tx)
3,236✔
100
                if err != nil {
3,267✔
101
                        return err
31✔
102
                }
31✔
103
                return nil
3,207✔
104
        })
105
}
106

107
func (m *sqlExecutionStore) GetShardID() int {
4,257✔
108
        return m.shardID
4,257✔
109
}
4,257✔
110

111
func (m *sqlExecutionStore) CreateWorkflowExecution(
112
        ctx context.Context,
113
        request *p.InternalCreateWorkflowExecutionRequest,
114
) (response *p.CreateWorkflowExecutionResponse, err error) {
349✔
115
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
349✔
116

349✔
117
        err = m.txExecuteShardLockedFn(ctx, dbShardID, "CreateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
698✔
118
                response, err = m.createWorkflowExecutionTx(ctx, tx, request)
349✔
119
                return err
349✔
120
        })
349✔
121
        return
349✔
122
}
123

124
func (m *sqlExecutionStore) createWorkflowExecutionTx(
125
        ctx context.Context,
126
        tx sqlplugin.Tx,
127
        request *p.InternalCreateWorkflowExecutionRequest,
128
) (*p.CreateWorkflowExecutionResponse, error) {
349✔
129

349✔
130
        newWorkflow := request.NewWorkflowSnapshot
349✔
131
        executionInfo := newWorkflow.ExecutionInfo
349✔
132
        startVersion := newWorkflow.StartVersion
349✔
133
        lastWriteVersion := newWorkflow.LastWriteVersion
349✔
134
        shardID := m.shardID
349✔
135
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
349✔
136
        workflowID := executionInfo.WorkflowID
349✔
137
        runID := serialization.MustParseUUID(executionInfo.RunID)
349✔
138

349✔
139
        if err := p.ValidateCreateWorkflowModeState(
349✔
140
                request.Mode,
349✔
141
                newWorkflow,
349✔
142
        ); err != nil {
351✔
143
                return nil, err
2✔
144
        }
2✔
145

146
        var err error
347✔
147
        var row *sqlplugin.CurrentExecutionsRow
347✔
148
        if row, err = m.lockCurrentExecutionIfExistsFn(ctx, tx, m.shardID, domainID, workflowID); err != nil {
348✔
149
                return nil, err
1✔
150
        }
1✔
151

152
        // current workflow record check
153
        if row != nil {
403✔
154
                // current run ID, last write version, current workflow state check
57✔
155
                switch request.Mode {
57✔
156
                case p.CreateWorkflowModeBrandNew:
29✔
157
                        return nil, &p.WorkflowExecutionAlreadyStartedError{
29✔
158
                                Msg:              fmt.Sprintf("Workflow execution already running. WorkflowId: %v", row.WorkflowID),
29✔
159
                                StartRequestID:   row.CreateRequestID,
29✔
160
                                RunID:            row.RunID.String(),
29✔
161
                                State:            int(row.State),
29✔
162
                                CloseStatus:      int(row.CloseStatus),
29✔
163
                                LastWriteVersion: row.LastWriteVersion,
29✔
164
                        }
29✔
165

166
                case p.CreateWorkflowModeWorkflowIDReuse:
24✔
167
                        if request.PreviousLastWriteVersion != row.LastWriteVersion {
25✔
168
                                return nil, &p.CurrentWorkflowConditionFailedError{
1✔
169
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
1✔
170
                                                "LastWriteVersion: %v, PreviousLastWriteVersion: %v",
1✔
171
                                                workflowID, row.LastWriteVersion, request.PreviousLastWriteVersion),
1✔
172
                                }
1✔
173
                        }
1✔
174
                        if row.State != p.WorkflowStateCompleted {
24✔
175
                                return nil, &p.CurrentWorkflowConditionFailedError{
1✔
176
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
1✔
177
                                                "State: %v, Expected: %v",
1✔
178
                                                workflowID, row.State, p.WorkflowStateCompleted),
1✔
179
                                }
1✔
180
                        }
1✔
181
                        runIDStr := row.RunID.String()
22✔
182
                        if runIDStr != request.PreviousRunID {
23✔
183
                                return nil, &p.CurrentWorkflowConditionFailedError{
1✔
184
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
1✔
185
                                                "RunID: %v, PreviousRunID: %v",
1✔
186
                                                workflowID, runIDStr, request.PreviousRunID),
1✔
187
                                }
1✔
188
                        }
1✔
189

190
                case p.CreateWorkflowModeZombie:
4✔
191
                        // zombie workflow creation with existence of current record, this is a noop
4✔
192
                        if err := assertRunIDMismatch(serialization.MustParseUUID(executionInfo.RunID), row.RunID); err != nil {
5✔
193
                                return nil, err
1✔
194
                        }
1✔
195

196
                case p.CreateWorkflowModeContinueAsNew:
2✔
197
                        runIDStr := row.RunID.String()
2✔
198
                        if runIDStr != request.PreviousRunID {
2✔
199
                                return nil, &p.CurrentWorkflowConditionFailedError{
×
200
                                        Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+
×
201
                                                "RunID: %v, PreviousRunID: %v",
×
202
                                                workflowID, runIDStr, request.PreviousRunID),
×
203
                                }
×
204
                        }
×
205

206
                default:
×
207
                        return nil, &types.InternalServiceError{
×
208
                                Message: fmt.Sprintf(
×
209
                                        "CreteWorkflowExecution: unknown mode: %v",
×
210
                                        request.Mode,
×
211
                                ),
×
212
                        }
×
213
                }
214
        }
215

216
        if err := m.createOrUpdateCurrentExecutionFn(
313✔
217
                ctx,
313✔
218
                tx,
313✔
219
                request.Mode,
313✔
220
                m.shardID,
313✔
221
                domainID,
313✔
222
                workflowID,
313✔
223
                runID,
313✔
224
                executionInfo.State,
313✔
225
                executionInfo.CloseStatus,
313✔
226
                executionInfo.CreateRequestID,
313✔
227
                startVersion,
313✔
228
                lastWriteVersion); err != nil {
314✔
229
                return nil, err
1✔
230
        }
1✔
231

232
        if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, &request.NewWorkflowSnapshot, m.parser); err != nil {
315✔
233
                return nil, err
3✔
234
        }
3✔
235

236
        return &p.CreateWorkflowExecutionResponse{}, nil
311✔
237
}
238

239
func (m *sqlExecutionStore) getExecutions(
240
        ctx context.Context,
241
        request *p.InternalGetWorkflowExecutionRequest,
242
        domainID serialization.UUID,
243
        wfID string,
244
        runID serialization.UUID,
245
) ([]sqlplugin.ExecutionsRow, error) {
797✔
246
        executions, err := m.db.SelectFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
797✔
247
                ShardID: m.shardID, DomainID: domainID, WorkflowID: wfID, RunID: runID})
797✔
248

797✔
249
        if err != nil {
1,110✔
250
                if err == sql.ErrNoRows {
626✔
251
                        return nil, &types.EntityNotExistsError{
313✔
252
                                Message: fmt.Sprintf(
313✔
253
                                        "Workflow execution not found.  WorkflowId: %v, RunId: %v",
313✔
254
                                        request.Execution.GetWorkflowID(),
313✔
255
                                        request.Execution.GetRunID(),
313✔
256
                                ),
313✔
257
                        }
313✔
258
                }
313✔
259
                return nil, convertCommonErrors(m.db, "GetWorkflowExecution", "", err)
×
260
        }
261

262
        if len(executions) == 0 {
486✔
263
                return nil, &types.EntityNotExistsError{
×
264
                        Message: fmt.Sprintf(
×
265
                                "Workflow execution not found.  WorkflowId: %v, RunId: %v",
×
266
                                request.Execution.GetWorkflowID(),
×
267
                                request.Execution.GetRunID(),
×
268
                        ),
×
269
                }
×
270
        }
×
271

272
        if len(executions) != 1 {
486✔
273
                return nil, &types.InternalServiceError{
×
274
                        Message: "GetWorkflowExecution return more than one results.",
×
275
                }
×
276
        }
×
277
        return executions, nil
486✔
278
}
279

280
func (m *sqlExecutionStore) GetWorkflowExecution(
281
        ctx context.Context,
282
        request *p.InternalGetWorkflowExecutionRequest,
283
) (resp *p.InternalGetWorkflowExecutionResponse, e error) {
797✔
284
        recoverPanic := func(recovered interface{}, err *error) {
7,159✔
285
                if recovered != nil {
6,362✔
286
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
287
                }
×
288
        }
289

290
        domainID := serialization.MustParseUUID(request.DomainID)
797✔
291
        runID := serialization.MustParseUUID(request.Execution.RunID)
797✔
292
        wfID := request.Execution.WorkflowID
797✔
293

797✔
294
        var executions []sqlplugin.ExecutionsRow
797✔
295
        var activityInfos map[int64]*p.InternalActivityInfo
797✔
296
        var timerInfos map[string]*p.TimerInfo
797✔
297
        var childExecutionInfos map[int64]*p.InternalChildExecutionInfo
797✔
298
        var requestCancelInfos map[int64]*p.RequestCancelInfo
797✔
299
        var signalInfos map[int64]*p.SignalInfo
797✔
300
        var bufferedEvents []*p.DataBlob
797✔
301
        var signalsRequested map[string]struct{}
797✔
302

797✔
303
        g, ctx := errgroup.WithContext(ctx)
797✔
304

797✔
305
        g.Go(func() (e error) {
1,594✔
306
                defer func() { recoverPanic(recover(), &e) }()
1,594✔
307
                executions, e = m.getExecutions(ctx, request, domainID, wfID, runID)
797✔
308
                return e
797✔
309
        })
310

311
        g.Go(func() (e error) {
1,594✔
312
                defer func() { recoverPanic(recover(), &e) }()
1,594✔
313
                activityInfos, e = getActivityInfoMap(
797✔
314
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
797✔
315
                return e
797✔
316
        })
317

318
        g.Go(func() (e error) {
1,594✔
319
                defer func() { recoverPanic(recover(), &e) }()
1,594✔
320
                timerInfos, e = getTimerInfoMap(
797✔
321
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
797✔
322
                return e
797✔
323
        })
324

325
        g.Go(func() (e error) {
1,594✔
326
                defer func() { recoverPanic(recover(), &e) }()
1,594✔
327
                childExecutionInfos, e = getChildExecutionInfoMap(
797✔
328
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
797✔
329
                return e
797✔
330
        })
331

332
        g.Go(func() (e error) {
1,594✔
333
                defer func() { recoverPanic(recover(), &e) }()
1,594✔
334
                requestCancelInfos, e = getRequestCancelInfoMap(
797✔
335
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
797✔
336
                return e
797✔
337
        })
338

339
        g.Go(func() (e error) {
1,594✔
340
                defer func() { recoverPanic(recover(), &e) }()
1,594✔
341
                signalInfos, e = getSignalInfoMap(
797✔
342
                        ctx, m.db, m.shardID, domainID, wfID, runID, m.parser)
797✔
343
                return e
797✔
344
        })
345

346
        g.Go(func() (e error) {
1,594✔
347
                defer func() { recoverPanic(recover(), &e) }()
1,594✔
348
                bufferedEvents, e = getBufferedEvents(
797✔
349
                        ctx, m.db, m.shardID, domainID, wfID, runID)
797✔
350
                return e
797✔
351
        })
352

353
        g.Go(func() (e error) {
1,594✔
354
                defer func() { recoverPanic(recover(), &e) }()
1,594✔
355
                signalsRequested, e = getSignalsRequested(
797✔
356
                        ctx, m.db, m.shardID, domainID, wfID, runID)
797✔
357
                return e
797✔
358
        })
359

360
        err := g.Wait()
797✔
361
        if err != nil {
1,110✔
362
                return nil, err
313✔
363
        }
313✔
364

365
        state, err := m.populateWorkflowMutableState(executions[0])
486✔
366
        if err != nil {
486✔
367
                return nil, &types.InternalServiceError{
×
368
                        Message: fmt.Sprintf("GetWorkflowExecution: failed. Error: %v", err),
×
369
                }
×
370
        }
×
371
        state.ActivityInfos = activityInfos
486✔
372
        state.TimerInfos = timerInfos
486✔
373
        state.ChildExecutionInfos = childExecutionInfos
486✔
374
        state.RequestCancelInfos = requestCancelInfos
486✔
375
        state.SignalInfos = signalInfos
486✔
376
        state.BufferedEvents = bufferedEvents
486✔
377
        state.SignalRequestedIDs = signalsRequested
486✔
378

486✔
379
        return &p.InternalGetWorkflowExecutionResponse{State: state}, nil
486✔
380
}
381

382
func (m *sqlExecutionStore) UpdateWorkflowExecution(
383
        ctx context.Context,
384
        request *p.InternalUpdateWorkflowExecutionRequest,
385
) error {
2,900✔
386
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
2,900✔
387
        return m.txExecuteShardLocked(ctx, dbShardID, "UpdateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
5,800✔
388
                return m.updateWorkflowExecutionTx(ctx, tx, request)
2,900✔
389
        })
2,900✔
390
}
391

392
func (m *sqlExecutionStore) updateWorkflowExecutionTx(
393
        ctx context.Context,
394
        tx sqlplugin.Tx,
395
        request *p.InternalUpdateWorkflowExecutionRequest,
396
) error {
2,900✔
397

2,900✔
398
        updateWorkflow := request.UpdateWorkflowMutation
2,900✔
399
        newWorkflow := request.NewWorkflowSnapshot
2,900✔
400

2,900✔
401
        executionInfo := updateWorkflow.ExecutionInfo
2,900✔
402
        domainID := serialization.MustParseUUID(executionInfo.DomainID)
2,900✔
403
        workflowID := executionInfo.WorkflowID
2,900✔
404
        runID := serialization.MustParseUUID(executionInfo.RunID)
2,900✔
405
        shardID := m.shardID
2,900✔
406

2,900✔
407
        if err := p.ValidateUpdateWorkflowModeState(
2,900✔
408
                request.Mode,
2,900✔
409
                updateWorkflow,
2,900✔
410
                newWorkflow,
2,900✔
411
        ); err != nil {
2,900✔
412
                return err
×
413
        }
×
414

415
        switch request.Mode {
2,900✔
416
        case p.UpdateWorkflowModeIgnoreCurrent:
×
417
                // no-op
418
        case p.UpdateWorkflowModeBypassCurrent:
2✔
419
                if err := assertNotCurrentExecution(
2✔
420
                        ctx,
2✔
421
                        tx,
2✔
422
                        shardID,
2✔
423
                        domainID,
2✔
424
                        workflowID,
2✔
425
                        runID); err != nil {
2✔
426
                        return err
×
427
                }
×
428

429
        case p.UpdateWorkflowModeUpdateCurrent:
2,900✔
430
                if newWorkflow != nil {
3,016✔
431
                        newExecutionInfo := newWorkflow.ExecutionInfo
116✔
432
                        startVersion := newWorkflow.StartVersion
116✔
433
                        lastWriteVersion := newWorkflow.LastWriteVersion
116✔
434
                        newDomainID := serialization.MustParseUUID(newExecutionInfo.DomainID)
116✔
435
                        newRunID := serialization.MustParseUUID(newExecutionInfo.RunID)
116✔
436

116✔
437
                        if !bytes.Equal(domainID, newDomainID) {
116✔
438
                                return &types.InternalServiceError{
×
439
                                        Message: "UpdateWorkflowExecution: cannot continue as new to another domain",
×
440
                                }
×
441
                        }
×
442

443
                        if err := assertRunIDAndUpdateCurrentExecution(
116✔
444
                                ctx,
116✔
445
                                tx,
116✔
446
                                shardID,
116✔
447
                                domainID,
116✔
448
                                workflowID,
116✔
449
                                newRunID,
116✔
450
                                runID,
116✔
451
                                newWorkflow.ExecutionInfo.CreateRequestID,
116✔
452
                                newWorkflow.ExecutionInfo.State,
116✔
453
                                newWorkflow.ExecutionInfo.CloseStatus,
116✔
454
                                startVersion,
116✔
455
                                lastWriteVersion); err != nil {
116✔
456
                                return err
×
457
                        }
×
458
                } else {
2,786✔
459
                        startVersion := updateWorkflow.StartVersion
2,786✔
460
                        lastWriteVersion := updateWorkflow.LastWriteVersion
2,786✔
461
                        // this is only to update the current record
2,786✔
462
                        if err := assertRunIDAndUpdateCurrentExecution(
2,786✔
463
                                ctx,
2,786✔
464
                                tx,
2,786✔
465
                                shardID,
2,786✔
466
                                domainID,
2,786✔
467
                                workflowID,
2,786✔
468
                                runID,
2,786✔
469
                                runID,
2,786✔
470
                                executionInfo.CreateRequestID,
2,786✔
471
                                executionInfo.State,
2,786✔
472
                                executionInfo.CloseStatus,
2,786✔
473
                                startVersion,
2,786✔
474
                                lastWriteVersion); err != nil {
2,786✔
475
                                return err
×
476
                        }
×
477
                }
478

479
        default:
×
480
                return &types.InternalServiceError{
×
481
                        Message: fmt.Sprintf("UpdateWorkflowExecution: unknown mode: %v", request.Mode),
×
482
                }
×
483
        }
484

485
        if m.useAsyncTransaction() { // async transaction is enabled
2,900✔
486
                // TODO: it's possible to merge some operations in the following 2 functions in a batch, should refactor the code later
×
487
                if err := applyWorkflowMutationAsyncTx(ctx, tx, shardID, &updateWorkflow, m.parser); err != nil {
×
488
                        return err
×
489
                }
×
490
                if newWorkflow != nil {
×
491
                        if err := m.applyWorkflowSnapshotAsyncTxAsNew(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
×
492
                                return err
×
493
                        }
×
494
                }
495
                return nil
×
496
        }
497

498
        if err := applyWorkflowMutationTx(ctx, tx, shardID, &updateWorkflow, m.parser); err != nil {
2,900✔
499
                return err
×
500
        }
×
501
        if newWorkflow != nil {
3,016✔
502
                if err := applyWorkflowSnapshotTxAsNew(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
116✔
503
                        return err
×
504
                }
×
505
        }
506
        return nil
2,900✔
507
}
508

509
func (m *sqlExecutionStore) ConflictResolveWorkflowExecution(
510
        ctx context.Context,
511
        request *p.InternalConflictResolveWorkflowExecutionRequest,
512
) error {
2✔
513
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
2✔
514
        return m.txExecuteShardLocked(ctx, dbShardID, "ConflictResolveWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error {
4✔
515
                return m.conflictResolveWorkflowExecutionTx(ctx, tx, request)
2✔
516
        })
2✔
517
}
518

519
func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx(
520
        ctx context.Context,
521
        tx sqlplugin.Tx,
522
        request *p.InternalConflictResolveWorkflowExecutionRequest,
523
) error {
2✔
524

2✔
525
        currentWorkflow := request.CurrentWorkflowMutation
2✔
526
        resetWorkflow := request.ResetWorkflowSnapshot
2✔
527
        newWorkflow := request.NewWorkflowSnapshot
2✔
528

2✔
529
        shardID := m.shardID
2✔
530

2✔
531
        domainID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.DomainID)
2✔
532
        workflowID := resetWorkflow.ExecutionInfo.WorkflowID
2✔
533

2✔
534
        if err := p.ValidateConflictResolveWorkflowModeState(
2✔
535
                request.Mode,
2✔
536
                resetWorkflow,
2✔
537
                newWorkflow,
2✔
538
                currentWorkflow,
2✔
539
        ); err != nil {
2✔
540
                return err
×
541
        }
×
542

543
        switch request.Mode {
2✔
544
        case p.ConflictResolveWorkflowModeBypassCurrent:
2✔
545
                if err := assertNotCurrentExecution(
2✔
546
                        ctx,
2✔
547
                        tx,
2✔
548
                        shardID,
2✔
549
                        domainID,
2✔
550
                        workflowID,
2✔
551
                        serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)); err != nil {
2✔
552
                        return err
×
553
                }
×
554

555
        case p.ConflictResolveWorkflowModeUpdateCurrent:
2✔
556
                executionInfo := resetWorkflow.ExecutionInfo
2✔
557
                startVersion := resetWorkflow.StartVersion
2✔
558
                lastWriteVersion := resetWorkflow.LastWriteVersion
2✔
559
                if newWorkflow != nil {
2✔
560
                        executionInfo = newWorkflow.ExecutionInfo
×
561
                        startVersion = newWorkflow.StartVersion
×
562
                        lastWriteVersion = newWorkflow.LastWriteVersion
×
563
                }
×
564
                runID := serialization.MustParseUUID(executionInfo.RunID)
2✔
565
                createRequestID := executionInfo.CreateRequestID
2✔
566
                state := executionInfo.State
2✔
567
                closeStatus := executionInfo.CloseStatus
2✔
568

2✔
569
                if currentWorkflow != nil {
2✔
UNCOV
570
                        prevRunID := serialization.MustParseUUID(currentWorkflow.ExecutionInfo.RunID)
×
UNCOV
571

×
UNCOV
572
                        if err := assertRunIDAndUpdateCurrentExecution(
×
UNCOV
573
                                ctx,
×
UNCOV
574
                                tx,
×
UNCOV
575
                                m.shardID,
×
UNCOV
576
                                domainID,
×
UNCOV
577
                                workflowID,
×
UNCOV
578
                                runID,
×
UNCOV
579
                                prevRunID,
×
UNCOV
580
                                createRequestID,
×
UNCOV
581
                                state,
×
UNCOV
582
                                closeStatus,
×
UNCOV
583
                                startVersion,
×
UNCOV
584
                                lastWriteVersion); err != nil {
×
585
                                return err
×
586
                        }
×
587
                } else {
2✔
588
                        // reset workflow is current
2✔
589
                        prevRunID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)
2✔
590

2✔
591
                        if err := assertRunIDAndUpdateCurrentExecution(
2✔
592
                                ctx,
2✔
593
                                tx,
2✔
594
                                m.shardID,
2✔
595
                                domainID,
2✔
596
                                workflowID,
2✔
597
                                runID,
2✔
598
                                prevRunID,
2✔
599
                                createRequestID,
2✔
600
                                state,
2✔
601
                                closeStatus,
2✔
602
                                startVersion,
2✔
603
                                lastWriteVersion); err != nil {
2✔
604
                                return err
×
605
                        }
×
606
                }
607

608
        default:
×
609
                return &types.InternalServiceError{
×
610
                        Message: fmt.Sprintf("ConflictResolveWorkflowExecution: unknown mode: %v", request.Mode),
×
611
                }
×
612
        }
613

614
        if err := applyWorkflowSnapshotTxAsReset(ctx, tx, shardID, &resetWorkflow, m.parser); err != nil {
2✔
615
                return err
×
616
        }
×
617
        if currentWorkflow != nil {
2✔
UNCOV
618
                if err := applyWorkflowMutationTx(ctx, tx, shardID, currentWorkflow, m.parser); err != nil {
×
619
                        return err
×
620
                }
×
621
        }
622
        if newWorkflow != nil {
2✔
623
                if err := applyWorkflowSnapshotTxAsNew(ctx, tx, shardID, newWorkflow, m.parser); err != nil {
×
624
                        return err
×
625
                }
×
626
        }
627
        return nil
2✔
628
}
629

630
func (m *sqlExecutionStore) DeleteWorkflowExecution(
631
        ctx context.Context,
632
        request *p.DeleteWorkflowExecutionRequest,
633
) error {
37✔
634
        recoverPanic := func(recovered interface{}, err *error) {
319✔
635
                if recovered != nil {
282✔
636
                        *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
×
637
                }
×
638
        }
639
        domainID := serialization.MustParseUUID(request.DomainID)
37✔
640
        runID := serialization.MustParseUUID(request.RunID)
37✔
641
        wfID := request.WorkflowID
37✔
642
        g, ctx := errgroup.WithContext(ctx)
37✔
643

37✔
644
        g.Go(func() (e error) {
74✔
645
                defer func() { recoverPanic(recover(), &e) }()
74✔
646
                _, e = m.db.DeleteFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
37✔
647
                        ShardID:    m.shardID,
37✔
648
                        DomainID:   domainID,
37✔
649
                        WorkflowID: wfID,
37✔
650
                        RunID:      runID,
37✔
651
                })
37✔
652
                return e
37✔
653
        })
654

655
        g.Go(func() (e error) {
74✔
656
                defer func() { recoverPanic(recover(), &e) }()
74✔
657
                _, e = m.db.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
37✔
658
                        ShardID:    int64(m.shardID),
37✔
659
                        DomainID:   domainID,
37✔
660
                        WorkflowID: wfID,
37✔
661
                        RunID:      runID,
37✔
662
                })
37✔
663
                return e
37✔
664
        })
665

666
        g.Go(func() (e error) {
74✔
667
                defer func() { recoverPanic(recover(), &e) }()
74✔
668
                _, e = m.db.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
37✔
669
                        ShardID:    int64(m.shardID),
37✔
670
                        DomainID:   domainID,
37✔
671
                        WorkflowID: wfID,
37✔
672
                        RunID:      runID,
37✔
673
                })
37✔
674
                return e
37✔
675
        })
676

677
        g.Go(func() (e error) {
74✔
678
                defer func() { recoverPanic(recover(), &e) }()
74✔
679
                _, e = m.db.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
37✔
680
                        ShardID:    int64(m.shardID),
37✔
681
                        DomainID:   domainID,
37✔
682
                        WorkflowID: wfID,
37✔
683
                        RunID:      runID,
37✔
684
                })
37✔
685
                return e
37✔
686
        })
687

688
        g.Go(func() (e error) {
74✔
689
                defer func() { recoverPanic(recover(), &e) }()
74✔
690
                _, e = m.db.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
37✔
691
                        ShardID:    int64(m.shardID),
37✔
692
                        DomainID:   domainID,
37✔
693
                        WorkflowID: wfID,
37✔
694
                        RunID:      runID,
37✔
695
                })
37✔
696
                return e
37✔
697
        })
698

699
        g.Go(func() (e error) {
74✔
700
                defer func() { recoverPanic(recover(), &e) }()
74✔
701
                _, e = m.db.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
37✔
702
                        ShardID:    int64(m.shardID),
37✔
703
                        DomainID:   domainID,
37✔
704
                        WorkflowID: wfID,
37✔
705
                        RunID:      runID,
37✔
706
                })
37✔
707
                return e
37✔
708
        })
709

710
        g.Go(func() (e error) {
74✔
711
                defer func() { recoverPanic(recover(), &e) }()
74✔
712
                _, e = m.db.DeleteFromBufferedEvents(ctx, &sqlplugin.BufferedEventsFilter{
37✔
713
                        ShardID:    m.shardID,
37✔
714
                        DomainID:   domainID,
37✔
715
                        WorkflowID: wfID,
37✔
716
                        RunID:      runID,
37✔
717
                })
37✔
718
                return e
37✔
719
        })
720

721
        g.Go(func() (e error) {
74✔
722
                defer func() { recoverPanic(recover(), &e) }()
74✔
723
                _, e = m.db.DeleteFromSignalsRequestedSets(ctx, &sqlplugin.SignalsRequestedSetsFilter{
37✔
724
                        ShardID:    int64(m.shardID),
37✔
725
                        DomainID:   domainID,
37✔
726
                        WorkflowID: wfID,
37✔
727
                        RunID:      runID,
37✔
728
                })
37✔
729
                return e
37✔
730
        })
731
        return g.Wait()
37✔
732
}
733

734
// its possible for a new run of the same workflow to have started after the run we are deleting
735
// here was finished. In that case, current_executions table will have the same workflowID but different
736
// runID. The following code will delete the row from current_executions if and only if the runID is
737
// same as the one we are trying to delete here
738
func (m *sqlExecutionStore) DeleteCurrentWorkflowExecution(
739
        ctx context.Context,
740
        request *p.DeleteCurrentWorkflowExecutionRequest,
741
) error {
38✔
742

38✔
743
        domainID := serialization.MustParseUUID(request.DomainID)
38✔
744
        runID := serialization.MustParseUUID(request.RunID)
38✔
745
        _, err := m.db.DeleteFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
38✔
746
                ShardID:    int64(m.shardID),
38✔
747
                DomainID:   domainID,
38✔
748
                WorkflowID: request.WorkflowID,
38✔
749
                RunID:      runID,
38✔
750
        })
38✔
751
        if err != nil {
39✔
752
                return convertCommonErrors(m.db, "DeleteCurrentWorkflowExecution", "", err)
1✔
753
        }
1✔
754
        return nil
37✔
755
}
756

757
func (m *sqlExecutionStore) GetCurrentExecution(
758
        ctx context.Context,
759
        request *p.GetCurrentExecutionRequest,
760
) (*p.GetCurrentExecutionResponse, error) {
124✔
761

124✔
762
        row, err := m.db.SelectFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{
124✔
763
                ShardID:    int64(m.shardID),
124✔
764
                DomainID:   serialization.MustParseUUID(request.DomainID),
124✔
765
                WorkflowID: request.WorkflowID,
124✔
766
        })
124✔
767
        if err != nil {
133✔
768
                return nil, convertCommonErrors(m.db, "GetCurrentExecution", "", err)
9✔
769
        }
9✔
770
        return &p.GetCurrentExecutionResponse{
117✔
771
                StartRequestID:   row.CreateRequestID,
117✔
772
                RunID:            row.RunID.String(),
117✔
773
                State:            int(row.State),
117✔
774
                CloseStatus:      int(row.CloseStatus),
117✔
775
                LastWriteVersion: row.LastWriteVersion,
117✔
776
        }, nil
117✔
777
}
778

779
func (m *sqlExecutionStore) ListCurrentExecutions(
780
        _ context.Context,
781
        _ *p.ListCurrentExecutionsRequest,
782
) (*p.ListCurrentExecutionsResponse, error) {
×
783
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
784
}
×
785

786
func (m *sqlExecutionStore) IsWorkflowExecutionExists(
787
        _ context.Context,
788
        _ *p.IsWorkflowExecutionExistsRequest,
789
) (*p.IsWorkflowExecutionExistsResponse, error) {
×
790
        return nil, &types.InternalServiceError{Message: "Not yet implemented"}
×
791
}
×
792

793
func (m *sqlExecutionStore) ListConcreteExecutions(
794
        ctx context.Context,
795
        request *p.ListConcreteExecutionsRequest,
796
) (*p.InternalListConcreteExecutionsResponse, error) {
×
797

×
798
        filter := &sqlplugin.ExecutionsFilter{}
×
799
        if len(request.PageToken) > 0 {
×
800
                err := gobDeserialize(request.PageToken, &filter)
×
801
                if err != nil {
×
802
                        return nil, &types.InternalServiceError{
×
803
                                Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
804
                        }
×
805
                }
×
806
        } else {
×
807
                filter = &sqlplugin.ExecutionsFilter{
×
808
                        ShardID:    m.shardID,
×
809
                        WorkflowID: "",
×
810
                }
×
811
        }
×
812
        filter.Size = request.PageSize
×
813

×
814
        executions, err := m.db.SelectFromExecutions(ctx, filter)
×
815
        if err != nil {
×
816
                if err == sql.ErrNoRows {
×
817
                        return &p.InternalListConcreteExecutionsResponse{}, nil
×
818
                }
×
819
                return nil, convertCommonErrors(m.db, "ListConcreteExecutions", "", err)
×
820
        }
821

822
        if len(executions) == 0 {
×
823
                return &p.InternalListConcreteExecutionsResponse{}, nil
×
824
        }
×
825
        lastExecution := executions[len(executions)-1]
×
826
        nextFilter := &sqlplugin.ExecutionsFilter{
×
827
                ShardID:    m.shardID,
×
828
                WorkflowID: lastExecution.WorkflowID,
×
829
        }
×
830
        token, err := gobSerialize(nextFilter)
×
831
        if err != nil {
×
832
                return nil, &types.InternalServiceError{
×
833
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
834
                }
×
835
        }
×
836
        concreteExecutions, err := m.populateInternalListConcreteExecutions(executions)
×
837
        if err != nil {
×
838
                return nil, &types.InternalServiceError{
×
839
                        Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err),
×
840
                }
×
841
        }
×
842

843
        return &p.InternalListConcreteExecutionsResponse{
×
844
                Executions:    concreteExecutions,
×
845
                NextPageToken: token,
×
846
        }, nil
×
847
}
848

849
func (m *sqlExecutionStore) GetTransferTasks(
850
        ctx context.Context,
851
        request *p.GetTransferTasksRequest,
852
) (*p.GetTransferTasksResponse, error) {
1,504✔
853
        minReadLevel := request.ReadLevel
1,504✔
854
        if len(request.NextPageToken) > 0 {
1,507✔
855
                readLevel, err := deserializePageToken(request.NextPageToken)
3✔
856
                if err != nil {
3✔
857
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "failed to deserialize page token", err)
×
858
                }
×
859
                minReadLevel = readLevel
3✔
860
        }
861
        rows, err := m.db.SelectFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
1,504✔
862
                ShardID:   m.shardID,
1,504✔
863
                MinTaskID: minReadLevel,
1,504✔
864
                MaxTaskID: request.MaxReadLevel,
1,504✔
865
                PageSize:  request.BatchSize,
1,504✔
866
        })
1,504✔
867
        if err != nil {
1,505✔
868
                if err != sql.ErrNoRows {
2✔
869
                        return nil, convertCommonErrors(m.db, "GetTransferTasks", "", err)
1✔
870
                }
1✔
871
        }
872
        resp := &p.GetTransferTasksResponse{Tasks: make([]*p.TransferTaskInfo, len(rows))}
1,503✔
873
        for i, row := range rows {
4,826✔
874
                info, err := m.parser.TransferTaskInfoFromBlob(row.Data, row.DataEncoding)
3,323✔
875
                if err != nil {
3,324✔
876
                        return nil, err
1✔
877
                }
1✔
878
                resp.Tasks[i] = &p.TransferTaskInfo{
3,322✔
879
                        TaskID:                  row.TaskID,
3,322✔
880
                        DomainID:                info.DomainID.String(),
3,322✔
881
                        WorkflowID:              info.GetWorkflowID(),
3,322✔
882
                        RunID:                   info.RunID.String(),
3,322✔
883
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
3,322✔
884
                        TargetDomainID:          info.TargetDomainID.String(),
3,322✔
885
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
3,322✔
886
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
3,322✔
887
                        TargetRunID:             info.TargetRunID.String(),
3,322✔
888
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
3,322✔
889
                        TaskList:                info.GetTaskList(),
3,322✔
890
                        TaskType:                int(info.GetTaskType()),
3,322✔
891
                        ScheduleID:              info.GetScheduleID(),
3,322✔
892
                        Version:                 info.GetVersion(),
3,322✔
893
                }
3,322✔
894
        }
895
        if len(rows) > 0 {
2,940✔
896
                lastTaskID := rows[len(rows)-1].TaskID
1,438✔
897
                if lastTaskID < request.MaxReadLevel {
1,440✔
898
                        resp.NextPageToken = serializePageToken(lastTaskID)
2✔
899
                }
2✔
900
        }
901
        return resp, nil
1,502✔
902
}
903

904
func (m *sqlExecutionStore) CompleteTransferTask(
905
        ctx context.Context,
906
        request *p.CompleteTransferTaskRequest,
907
) error {
2✔
908

2✔
909
        if _, err := m.db.DeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
2✔
910
                ShardID: m.shardID,
2✔
911
                TaskID:  request.TaskID,
2✔
912
        }); err != nil {
3✔
913
                return convertCommonErrors(m.db, "CompleteTransferTask", "", err)
1✔
914
        }
1✔
915
        return nil
1✔
916
}
917

918
func (m *sqlExecutionStore) RangeCompleteTransferTask(
919
        ctx context.Context,
920
        request *p.RangeCompleteTransferTaskRequest,
921
) (*p.RangeCompleteTransferTaskResponse, error) {
52✔
922
        result, err := m.db.RangeDeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{
52✔
923
                ShardID:   m.shardID,
52✔
924
                MinTaskID: request.ExclusiveBeginTaskID,
52✔
925
                MaxTaskID: request.InclusiveEndTaskID,
52✔
926
                PageSize:  request.PageSize,
52✔
927
        })
52✔
928
        if err != nil {
53✔
929
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
1✔
930
        }
1✔
931
        rowsDeleted, err := result.RowsAffected()
51✔
932
        if err != nil {
51✔
933
                return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err)
×
934
        }
×
935
        return &p.RangeCompleteTransferTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
51✔
936
}
937

938
func (m *sqlExecutionStore) GetCrossClusterTasks(
939
        ctx context.Context,
940
        request *p.GetCrossClusterTasksRequest,
941
) (*p.GetCrossClusterTasksResponse, error) {
97✔
942
        minReadLevel := request.ReadLevel
97✔
943
        if len(request.NextPageToken) > 0 {
100✔
944
                readLevel, err := deserializePageToken(request.NextPageToken)
3✔
945
                if err != nil {
3✔
946
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "failed to deserialize page token", err)
×
947
                }
×
948
                minReadLevel = readLevel
3✔
949
        }
950
        rows, err := m.db.SelectFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
97✔
951
                TargetCluster: request.TargetCluster,
97✔
952
                ShardID:       m.shardID,
97✔
953
                MinTaskID:     minReadLevel,
97✔
954
                MaxTaskID:     request.MaxReadLevel,
97✔
955
                PageSize:      request.BatchSize,
97✔
956
        })
97✔
957
        if err != nil {
98✔
958
                if err != sql.ErrNoRows {
2✔
959
                        return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "", err)
1✔
960
                }
1✔
961
        }
962
        resp := &p.GetCrossClusterTasksResponse{Tasks: make([]*p.CrossClusterTaskInfo, len(rows))}
96✔
963
        for i, row := range rows {
98✔
964
                info, err := m.parser.CrossClusterTaskInfoFromBlob(row.Data, row.DataEncoding)
2✔
965
                if err != nil {
3✔
966
                        return nil, err
1✔
967
                }
1✔
968
                resp.Tasks[i] = &p.CrossClusterTaskInfo{
1✔
969
                        TaskID:                  row.TaskID,
1✔
970
                        DomainID:                info.DomainID.String(),
1✔
971
                        WorkflowID:              info.GetWorkflowID(),
1✔
972
                        RunID:                   info.RunID.String(),
1✔
973
                        VisibilityTimestamp:     info.GetVisibilityTimestamp(),
1✔
974
                        TargetDomainID:          info.TargetDomainID.String(),
1✔
975
                        TargetDomainIDs:         info.GetTargetDomainIDs(),
1✔
976
                        TargetWorkflowID:        info.GetTargetWorkflowID(),
1✔
977
                        TargetRunID:             info.TargetRunID.String(),
1✔
978
                        TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
1✔
979
                        TaskList:                info.GetTaskList(),
1✔
980
                        TaskType:                int(info.GetTaskType()),
1✔
981
                        ScheduleID:              info.GetScheduleID(),
1✔
982
                        Version:                 info.GetVersion(),
1✔
983
                }
1✔
984
        }
985
        if len(rows) > 0 {
96✔
986
                lastTaskID := rows[len(rows)-1].TaskID
1✔
987
                if lastTaskID < request.MaxReadLevel {
2✔
988
                        resp.NextPageToken = serializePageToken(lastTaskID)
1✔
989
                }
1✔
990
        }
991
        return resp, nil
95✔
992

993
}
994

995
func (m *sqlExecutionStore) CompleteCrossClusterTask(
996
        ctx context.Context,
997
        request *p.CompleteCrossClusterTaskRequest,
998
) error {
2✔
999
        if _, err := m.db.DeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
2✔
1000
                TargetCluster: request.TargetCluster,
2✔
1001
                ShardID:       m.shardID,
2✔
1002
                TaskID:        request.TaskID,
2✔
1003
        }); err != nil {
3✔
1004
                return convertCommonErrors(m.db, "CompleteCrossClusterTask", "", err)
1✔
1005
        }
1✔
1006
        return nil
1✔
1007
}
1008

1009
func (m *sqlExecutionStore) RangeCompleteCrossClusterTask(
1010
        ctx context.Context,
1011
        request *p.RangeCompleteCrossClusterTaskRequest,
1012
) (*p.RangeCompleteCrossClusterTaskResponse, error) {
79✔
1013
        result, err := m.db.RangeDeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{
79✔
1014
                TargetCluster: request.TargetCluster,
79✔
1015
                ShardID:       m.shardID,
79✔
1016
                MinTaskID:     request.ExclusiveBeginTaskID,
79✔
1017
                MaxTaskID:     request.InclusiveEndTaskID,
79✔
1018
                PageSize:      request.PageSize,
79✔
1019
        })
79✔
1020
        if err != nil {
80✔
1021
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
1✔
1022
        }
1✔
1023
        rowsDeleted, err := result.RowsAffected()
78✔
1024
        if err != nil {
78✔
1025
                return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err)
×
1026
        }
×
1027
        return &p.RangeCompleteCrossClusterTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
78✔
1028
}
1029

1030
func (m *sqlExecutionStore) GetReplicationTasks(
1031
        ctx context.Context,
1032
        request *p.GetReplicationTasksRequest,
1033
) (*p.InternalGetReplicationTasksResponse, error) {
59✔
1034

59✔
1035
        readLevel, maxReadLevelInclusive, err := getReadLevels(request)
59✔
1036
        if err != nil {
59✔
1037
                return nil, err
×
1038
        }
×
1039

1040
        rows, err := m.db.SelectFromReplicationTasks(
59✔
1041
                ctx,
59✔
1042
                &sqlplugin.ReplicationTasksFilter{
59✔
1043
                        ShardID:   m.shardID,
59✔
1044
                        MinTaskID: readLevel,
59✔
1045
                        MaxTaskID: maxReadLevelInclusive,
59✔
1046
                        PageSize:  request.BatchSize,
59✔
1047
                })
59✔
1048

59✔
1049
        switch err {
59✔
1050
        case nil:
58✔
1051
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
58✔
1052
        case sql.ErrNoRows:
×
1053
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1054
        default:
1✔
1055
                return nil, convertCommonErrors(m.db, "GetReplicationTasks", "", err)
1✔
1056
        }
1057
}
1058

1059
func getReadLevels(request *p.GetReplicationTasksRequest) (readLevel int64, maxReadLevelInclusive int64, err error) {
64✔
1060
        readLevel = request.ReadLevel
64✔
1061
        if len(request.NextPageToken) > 0 {
72✔
1062
                readLevel, err = deserializePageToken(request.NextPageToken)
8✔
1063
                if err != nil {
8✔
1064
                        return 0, 0, err
×
1065
                }
×
1066
        }
1067

1068
        maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxReadLevel)
64✔
1069
        return readLevel, maxReadLevelInclusive, nil
64✔
1070
}
1071

1072
func (m *sqlExecutionStore) populateGetReplicationTasksResponse(
1073
        rows []sqlplugin.ReplicationTasksRow,
1074
        requestMaxReadLevel int64,
1075
) (*p.InternalGetReplicationTasksResponse, error) {
62✔
1076
        if len(rows) == 0 {
120✔
1077
                return &p.InternalGetReplicationTasksResponse{}, nil
58✔
1078
        }
58✔
1079

1080
        var tasks = make([]*p.InternalReplicationTaskInfo, len(rows))
6✔
1081
        for i, row := range rows {
12✔
1082
                info, err := m.parser.ReplicationTaskInfoFromBlob(row.Data, row.DataEncoding)
6✔
1083
                if err != nil {
8✔
1084
                        return nil, err
2✔
1085
                }
2✔
1086

1087
                tasks[i] = &p.InternalReplicationTaskInfo{
4✔
1088
                        TaskID:            row.TaskID,
4✔
1089
                        DomainID:          info.DomainID.String(),
4✔
1090
                        WorkflowID:        info.GetWorkflowID(),
4✔
1091
                        RunID:             info.RunID.String(),
4✔
1092
                        TaskType:          int(info.GetTaskType()),
4✔
1093
                        FirstEventID:      info.GetFirstEventID(),
4✔
1094
                        NextEventID:       info.GetNextEventID(),
4✔
1095
                        Version:           info.GetVersion(),
4✔
1096
                        ScheduledID:       info.GetScheduledID(),
4✔
1097
                        BranchToken:       info.GetBranchToken(),
4✔
1098
                        NewRunBranchToken: info.GetNewRunBranchToken(),
4✔
1099
                        CreationTime:      info.GetCreationTimestamp(),
4✔
1100
                }
4✔
1101
        }
1102
        var nextPageToken []byte
4✔
1103
        lastTaskID := rows[len(rows)-1].TaskID
4✔
1104
        if lastTaskID < requestMaxReadLevel {
8✔
1105
                nextPageToken = serializePageToken(lastTaskID)
4✔
1106
        }
4✔
1107
        return &p.InternalGetReplicationTasksResponse{
4✔
1108
                Tasks:         tasks,
4✔
1109
                NextPageToken: nextPageToken,
4✔
1110
        }, nil
4✔
1111
}
1112

1113
func (m *sqlExecutionStore) CompleteReplicationTask(
1114
        ctx context.Context,
1115
        request *p.CompleteReplicationTaskRequest,
1116
) error {
2✔
1117

2✔
1118
        if _, err := m.db.DeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
2✔
1119
                ShardID: m.shardID,
2✔
1120
                TaskID:  request.TaskID,
2✔
1121
        }); err != nil {
3✔
1122
                return convertCommonErrors(m.db, "CompleteReplicationTask", "", err)
1✔
1123
        }
1✔
1124
        return nil
1✔
1125
}
1126

1127
func (m *sqlExecutionStore) RangeCompleteReplicationTask(
1128
        ctx context.Context,
1129
        request *p.RangeCompleteReplicationTaskRequest,
1130
) (*p.RangeCompleteReplicationTaskResponse, error) {
67✔
1131
        result, err := m.db.RangeDeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{
67✔
1132
                ShardID:            m.shardID,
67✔
1133
                InclusiveEndTaskID: request.InclusiveEndTaskID,
67✔
1134
                PageSize:           request.PageSize,
67✔
1135
        })
67✔
1136
        if err != nil {
68✔
1137
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
1✔
1138
        }
1✔
1139
        rowsDeleted, err := result.RowsAffected()
66✔
1140
        if err != nil {
66✔
1141
                return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err)
×
1142
        }
×
1143
        return &p.RangeCompleteReplicationTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
66✔
1144
}
1145

1146
func (m *sqlExecutionStore) GetReplicationTasksFromDLQ(
1147
        ctx context.Context,
1148
        request *p.GetReplicationTasksFromDLQRequest,
1149
) (*p.InternalGetReplicationTasksFromDLQResponse, error) {
5✔
1150

5✔
1151
        readLevel, maxReadLevelInclusive, err := getReadLevels(&request.GetReplicationTasksRequest)
5✔
1152
        if err != nil {
5✔
1153
                return nil, err
×
1154
        }
×
1155

1156
        filter := sqlplugin.ReplicationTasksFilter{
5✔
1157
                ShardID:   m.shardID,
5✔
1158
                MinTaskID: readLevel,
5✔
1159
                MaxTaskID: maxReadLevelInclusive,
5✔
1160
                PageSize:  request.BatchSize,
5✔
1161
        }
5✔
1162
        rows, err := m.db.SelectFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
5✔
1163
                ReplicationTasksFilter: filter,
5✔
1164
                SourceClusterName:      request.SourceClusterName,
5✔
1165
        })
5✔
1166

5✔
1167
        switch err {
5✔
1168
        case nil:
4✔
1169
                return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
4✔
1170
        case sql.ErrNoRows:
×
1171
                return &p.InternalGetReplicationTasksResponse{}, nil
×
1172
        default:
1✔
1173
                return nil, convertCommonErrors(m.db, "GetReplicationTasksFromDLQ", "", err)
1✔
1174
        }
1175
}
1176

1177
func (m *sqlExecutionStore) GetReplicationDLQSize(
1178
        ctx context.Context,
1179
        request *p.GetReplicationDLQSizeRequest,
1180
) (*p.GetReplicationDLQSizeResponse, error) {
11✔
1181

11✔
1182
        size, err := m.db.SelectFromReplicationDLQ(ctx, &sqlplugin.ReplicationTaskDLQFilter{
11✔
1183
                SourceClusterName: request.SourceClusterName,
11✔
1184
                ShardID:           m.shardID,
11✔
1185
        })
11✔
1186

11✔
1187
        switch err {
11✔
1188
        case nil:
9✔
1189
                return &p.GetReplicationDLQSizeResponse{
9✔
1190
                        Size: size,
9✔
1191
                }, nil
9✔
1192
        case sql.ErrNoRows:
1✔
1193
                return &p.GetReplicationDLQSizeResponse{
1✔
1194
                        Size: 0,
1✔
1195
                }, nil
1✔
1196
        default:
1✔
1197
                return nil, convertCommonErrors(m.db, "GetReplicationDLQSize", "", err)
1✔
1198
        }
1199
}
1200

1201
func (m *sqlExecutionStore) DeleteReplicationTaskFromDLQ(
1202
        ctx context.Context,
1203
        request *p.DeleteReplicationTaskFromDLQRequest,
1204
) error {
2✔
1205

2✔
1206
        filter := sqlplugin.ReplicationTasksFilter{
2✔
1207
                ShardID: m.shardID,
2✔
1208
                TaskID:  request.TaskID,
2✔
1209
        }
2✔
1210

2✔
1211
        if _, err := m.db.DeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
2✔
1212
                ReplicationTasksFilter: filter,
2✔
1213
                SourceClusterName:      request.SourceClusterName,
2✔
1214
        }); err != nil {
3✔
1215
                return convertCommonErrors(m.db, "DeleteReplicationTaskFromDLQ", "", err)
1✔
1216
        }
1✔
1217
        return nil
1✔
1218
}
1219

1220
func (m *sqlExecutionStore) RangeDeleteReplicationTaskFromDLQ(
1221
        ctx context.Context,
1222
        request *p.RangeDeleteReplicationTaskFromDLQRequest,
1223
) (*p.RangeDeleteReplicationTaskFromDLQResponse, error) {
2✔
1224
        filter := sqlplugin.ReplicationTasksFilter{
2✔
1225
                ShardID:            m.shardID,
2✔
1226
                TaskID:             request.ExclusiveBeginTaskID,
2✔
1227
                InclusiveEndTaskID: request.InclusiveEndTaskID,
2✔
1228
                PageSize:           request.PageSize,
2✔
1229
        }
2✔
1230
        result, err := m.db.RangeDeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{
2✔
1231
                ReplicationTasksFilter: filter,
2✔
1232
                SourceClusterName:      request.SourceClusterName,
2✔
1233
        })
2✔
1234
        if err != nil {
3✔
1235
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
1✔
1236
        }
1✔
1237
        rowsDeleted, err := result.RowsAffected()
1✔
1238
        if err != nil {
1✔
1239
                return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err)
×
1240
        }
×
1241
        return &p.RangeDeleteReplicationTaskFromDLQResponse{TasksCompleted: int(rowsDeleted)}, nil
1✔
1242
}
1243

1244
func (m *sqlExecutionStore) CreateFailoverMarkerTasks(
1245
        ctx context.Context,
1246
        request *p.CreateFailoverMarkersRequest,
1247
) error {
×
1248
        dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
×
1249
        return m.txExecuteShardLocked(ctx, dbShardID, "CreateFailoverMarkerTasks", request.RangeID, func(tx sqlplugin.Tx) error {
×
1250
                for _, task := range request.Markers {
×
1251
                        t := []p.Task{task}
×
1252
                        if err := createReplicationTasks(
×
1253
                                ctx,
×
1254
                                tx,
×
1255
                                t,
×
1256
                                m.shardID,
×
1257
                                serialization.MustParseUUID(task.DomainID),
×
1258
                                emptyWorkflowID,
×
1259
                                serialization.MustParseUUID(emptyReplicationRunID),
×
1260
                                m.parser,
×
1261
                        ); err != nil {
×
1262
                                rollBackErr := tx.Rollback()
×
1263
                                if rollBackErr != nil {
×
1264
                                        m.logger.Error("transaction rollback error", tag.Error(rollBackErr))
×
1265
                                }
×
1266
                                return err
×
1267
                        }
1268
                }
1269
                return nil
×
1270
        })
1271
}
1272

1273
type timerTaskPageToken struct {
1274
        TaskID    int64
1275
        Timestamp time.Time
1276
}
1277

1278
func (t *timerTaskPageToken) serialize() ([]byte, error) {
473✔
1279
        return json.Marshal(t)
473✔
1280
}
473✔
1281

1282
func (t *timerTaskPageToken) deserialize(payload []byte) error {
3✔
1283
        return json.Unmarshal(payload, t)
3✔
1284
}
3✔
1285

1286
func (m *sqlExecutionStore) GetTimerIndexTasks(
1287
        ctx context.Context,
1288
        request *p.GetTimerIndexTasksRequest,
1289
) (*p.GetTimerIndexTasksResponse, error) {
2,037✔
1290

2,037✔
1291
        pageToken := &timerTaskPageToken{TaskID: math.MinInt64, Timestamp: request.MinTimestamp}
2,037✔
1292
        if len(request.NextPageToken) > 0 {
2,040✔
1293
                if err := pageToken.deserialize(request.NextPageToken); err != nil {
3✔
1294
                        return nil, &types.InternalServiceError{
×
1295
                                Message: fmt.Sprintf("error deserializing timerTaskPageToken: %v", err),
×
1296
                        }
×
1297
                }
×
1298
        }
1299

1300
        rows, err := m.db.SelectFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
2,037✔
1301
                ShardID:                m.shardID,
2,037✔
1302
                MinVisibilityTimestamp: pageToken.Timestamp,
2,037✔
1303
                TaskID:                 pageToken.TaskID,
2,037✔
1304
                MaxVisibilityTimestamp: request.MaxTimestamp,
2,037✔
1305
                PageSize:               request.BatchSize + 1,
2,037✔
1306
        })
2,037✔
1307

2,037✔
1308
        if err != nil && err != sql.ErrNoRows {
2,038✔
1309
                return nil, convertCommonErrors(m.db, "GetTimerIndexTasks", "", err)
1✔
1310
        }
1✔
1311

1312
        resp := &p.GetTimerIndexTasksResponse{Timers: make([]*p.TimerTaskInfo, len(rows))}
2,036✔
1313
        for i, row := range rows {
9,962✔
1314
                info, err := m.parser.TimerTaskInfoFromBlob(row.Data, row.DataEncoding)
7,926✔
1315
                if err != nil {
7,927✔
1316
                        return nil, err
1✔
1317
                }
1✔
1318
                resp.Timers[i] = &p.TimerTaskInfo{
7,925✔
1319
                        VisibilityTimestamp: row.VisibilityTimestamp,
7,925✔
1320
                        TaskID:              row.TaskID,
7,925✔
1321
                        DomainID:            info.DomainID.String(),
7,925✔
1322
                        WorkflowID:          info.GetWorkflowID(),
7,925✔
1323
                        RunID:               info.RunID.String(),
7,925✔
1324
                        TaskType:            int(info.GetTaskType()),
7,925✔
1325
                        TimeoutType:         int(info.GetTimeoutType()),
7,925✔
1326
                        EventID:             info.GetEventID(),
7,925✔
1327
                        ScheduleAttempt:     info.GetScheduleAttempt(),
7,925✔
1328
                        Version:             info.GetVersion(),
7,925✔
1329
                }
7,925✔
1330
        }
1331

1332
        if len(resp.Timers) > request.BatchSize {
2,504✔
1333
                pageToken = &timerTaskPageToken{
469✔
1334
                        TaskID:    resp.Timers[request.BatchSize].TaskID,
469✔
1335
                        Timestamp: resp.Timers[request.BatchSize].VisibilityTimestamp,
469✔
1336
                }
469✔
1337
                resp.Timers = resp.Timers[:request.BatchSize]
469✔
1338
                nextToken, err := pageToken.serialize()
469✔
1339
                if err != nil {
469✔
1340
                        return nil, &types.InternalServiceError{
×
1341
                                Message: fmt.Sprintf("GetTimerTasks: error serializing page token: %v", err),
×
1342
                        }
×
1343
                }
×
1344
                resp.NextPageToken = nextToken
469✔
1345
        }
1346

1347
        return resp, nil
2,035✔
1348
}
1349

1350
func (m *sqlExecutionStore) CompleteTimerTask(
1351
        ctx context.Context,
1352
        request *p.CompleteTimerTaskRequest,
1353
) error {
2✔
1354

2✔
1355
        if _, err := m.db.DeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
2✔
1356
                ShardID:             m.shardID,
2✔
1357
                VisibilityTimestamp: request.VisibilityTimestamp,
2✔
1358
                TaskID:              request.TaskID,
2✔
1359
        }); err != nil {
3✔
1360
                return convertCommonErrors(m.db, "CompleteTimerTask", "", err)
1✔
1361
        }
1✔
1362
        return nil
1✔
1363
}
1364

1365
func (m *sqlExecutionStore) RangeCompleteTimerTask(
1366
        ctx context.Context,
1367
        request *p.RangeCompleteTimerTaskRequest,
1368
) (*p.RangeCompleteTimerTaskResponse, error) {
18✔
1369
        result, err := m.db.RangeDeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{
18✔
1370
                ShardID:                m.shardID,
18✔
1371
                MinVisibilityTimestamp: request.InclusiveBeginTimestamp,
18✔
1372
                MaxVisibilityTimestamp: request.ExclusiveEndTimestamp,
18✔
1373
                PageSize:               request.PageSize,
18✔
1374
        })
18✔
1375
        if err != nil {
19✔
1376
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
1✔
1377
        }
1✔
1378
        rowsDeleted, err := result.RowsAffected()
17✔
1379
        if err != nil {
17✔
1380
                return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err)
×
1381
        }
×
1382
        return &p.RangeCompleteTimerTaskResponse{TasksCompleted: int(rowsDeleted)}, nil
17✔
1383
}
1384

1385
func (m *sqlExecutionStore) PutReplicationTaskToDLQ(
1386
        ctx context.Context,
1387
        request *p.InternalPutReplicationTaskToDLQRequest,
1388
) error {
5✔
1389
        replicationTask := request.TaskInfo
5✔
1390
        blob, err := m.parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{
5✔
1391
                DomainID:                serialization.MustParseUUID(replicationTask.DomainID),
5✔
1392
                WorkflowID:              replicationTask.WorkflowID,
5✔
1393
                RunID:                   serialization.MustParseUUID(replicationTask.RunID),
5✔
1394
                TaskType:                int16(replicationTask.TaskType),
5✔
1395
                FirstEventID:            replicationTask.FirstEventID,
5✔
1396
                NextEventID:             replicationTask.NextEventID,
5✔
1397
                Version:                 replicationTask.Version,
5✔
1398
                ScheduledID:             replicationTask.ScheduledID,
5✔
1399
                EventStoreVersion:       p.EventStoreVersion,
5✔
1400
                NewRunEventStoreVersion: p.EventStoreVersion,
5✔
1401
                BranchToken:             replicationTask.BranchToken,
5✔
1402
                NewRunBranchToken:       replicationTask.NewRunBranchToken,
5✔
1403
                CreationTimestamp:       replicationTask.CreationTime,
5✔
1404
        })
5✔
1405
        if err != nil {
6✔
1406
                return err
1✔
1407
        }
1✔
1408

1409
        row := &sqlplugin.ReplicationTaskDLQRow{
4✔
1410
                SourceClusterName: request.SourceClusterName,
4✔
1411
                ShardID:           m.shardID,
4✔
1412
                TaskID:            replicationTask.TaskID,
4✔
1413
                Data:              blob.Data,
4✔
1414
                DataEncoding:      string(blob.Encoding),
4✔
1415
        }
4✔
1416

4✔
1417
        _, err = m.db.InsertIntoReplicationTasksDLQ(ctx, row)
4✔
1418

4✔
1419
        // Tasks are immutable. So it's fine if we already persisted it before.
4✔
1420
        // This can happen when tasks are retried (ack and cleanup can have lag on source side).
4✔
1421
        if err != nil && !m.db.IsDupEntryError(err) {
5✔
1422
                return convertCommonErrors(m.db, "PutReplicationTaskToDLQ", "", err)
1✔
1423
        }
1✔
1424

1425
        return nil
3✔
1426
}
1427

1428
func (m *sqlExecutionStore) populateWorkflowMutableState(
1429
        execution sqlplugin.ExecutionsRow,
1430
) (*p.InternalWorkflowMutableState, error) {
486✔
1431

486✔
1432
        info, err := m.parser.WorkflowExecutionInfoFromBlob(execution.Data, execution.DataEncoding)
486✔
1433
        if err != nil {
486✔
1434
                return nil, err
×
1435
        }
×
1436

1437
        state := &p.InternalWorkflowMutableState{}
486✔
1438
        state.ExecutionInfo = serialization.ToInternalWorkflowExecutionInfo(info)
486✔
1439
        state.ExecutionInfo.DomainID = execution.DomainID.String()
486✔
1440
        state.ExecutionInfo.WorkflowID = execution.WorkflowID
486✔
1441
        state.ExecutionInfo.RunID = execution.RunID.String()
486✔
1442
        state.ExecutionInfo.NextEventID = execution.NextEventID
486✔
1443
        // TODO: remove this after all 2DC workflows complete
486✔
1444
        if info.LastWriteEventID != nil {
486✔
1445
                state.ReplicationState = &p.ReplicationState{}
×
1446
                state.ReplicationState.StartVersion = info.GetStartVersion()
×
1447
                state.ReplicationState.LastWriteVersion = execution.LastWriteVersion
×
1448
                state.ReplicationState.LastWriteEventID = info.GetLastWriteEventID()
×
1449
        }
×
1450

1451
        if info.GetVersionHistories() != nil {
972✔
1452
                state.VersionHistories = p.NewDataBlob(
486✔
1453
                        info.GetVersionHistories(),
486✔
1454
                        common.EncodingType(info.GetVersionHistoriesEncoding()),
486✔
1455
                )
486✔
1456
        }
486✔
1457

1458
        if info.GetChecksum() != nil {
972✔
1459
                state.ChecksumData = p.NewDataBlob(
486✔
1460
                        info.GetChecksum(),
486✔
1461
                        common.EncodingType(info.GetChecksumEncoding()),
486✔
1462
                )
486✔
1463
        }
486✔
1464

1465
        return state, nil
486✔
1466
}
1467

1468
func (m *sqlExecutionStore) populateInternalListConcreteExecutions(
1469
        executions []sqlplugin.ExecutionsRow,
1470
) ([]*p.InternalListConcreteExecutionsEntity, error) {
×
1471

×
1472
        concreteExecutions := make([]*p.InternalListConcreteExecutionsEntity, 0, len(executions))
×
1473
        for _, execution := range executions {
×
1474
                mutableState, err := m.populateWorkflowMutableState(execution)
×
1475
                if err != nil {
×
1476
                        return nil, err
×
1477
                }
×
1478

1479
                concreteExecution := &p.InternalListConcreteExecutionsEntity{
×
1480
                        ExecutionInfo:    mutableState.ExecutionInfo,
×
1481
                        VersionHistories: mutableState.VersionHistories,
×
1482
                }
×
1483
                concreteExecutions = append(concreteExecutions, concreteExecution)
×
1484
        }
1485
        return concreteExecutions, nil
×
1486
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc