• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018d9fa8-75f8-405b-9b1e-38f93e6b0a11

12 Feb 2024 11:30PM UTC coverage: 62.748% (+0.05%) from 62.701%
018d9fa8-75f8-405b-9b1e-38f93e6b0a11

Pull #5657

buildkite

Shaddoll
Implement SignalWithStartWorkflowExecutionAsync API
Pull Request #5657: Implement SignalWithStartWorkflowExecutionAsync API

96 of 142 new or added lines in 5 files covered. (67.61%)

60 existing lines in 8 files now uncovered.

92596 of 147569 relevant lines covered (62.75%)

2318.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.98
/common/persistence/nosql/nosql_execution_store.go
1
// Copyright (c) 2017-2021 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package nosql
22

23
import (
24
        "context"
25
        "fmt"
26

27
        "github.com/uber/cadence/common/log"
28
        "github.com/uber/cadence/common/log/tag"
29
        "github.com/uber/cadence/common/persistence"
30
        "github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
31
        "github.com/uber/cadence/common/types"
32
)
33

34
// Implements ExecutionStore
35
type nosqlExecutionStore struct {
36
        shardID int
37
        nosqlStore
38
}
39

40
// NewExecutionStore is used to create an instance of ExecutionStore implementation
41
func NewExecutionStore(
42
        shardID int,
43
        db nosqlplugin.DB,
44
        logger log.Logger,
45
) (persistence.ExecutionStore, error) {
21✔
46
        return &nosqlExecutionStore{
21✔
47
                nosqlStore: nosqlStore{
21✔
48
                        logger: logger,
21✔
49
                        db:     db,
21✔
50
                },
21✔
51
                shardID: shardID,
21✔
52
        }, nil
21✔
53
}
21✔
54

55
func (d *nosqlExecutionStore) GetShardID() int {
3,829✔
56
        return d.shardID
3,829✔
57
}
3,829✔
58

59
func (d *nosqlExecutionStore) CreateWorkflowExecution(
60
        ctx context.Context,
61
        request *persistence.InternalCreateWorkflowExecutionRequest,
62
) (*persistence.CreateWorkflowExecutionResponse, error) {
168✔
63

168✔
64
        newWorkflow := request.NewWorkflowSnapshot
168✔
65
        executionInfo := newWorkflow.ExecutionInfo
168✔
66
        lastWriteVersion := newWorkflow.LastWriteVersion
168✔
67
        domainID := executionInfo.DomainID
168✔
68
        workflowID := executionInfo.WorkflowID
168✔
69
        runID := executionInfo.RunID
168✔
70

168✔
71
        if err := persistence.ValidateCreateWorkflowModeState(
168✔
72
                request.Mode,
168✔
73
                newWorkflow,
168✔
74
        ); err != nil {
168✔
75
                return nil, err
×
76
        }
×
77

78
        currentWorkflowWriteReq, err := d.prepareCurrentWorkflowRequestForCreateWorkflowTxn(domainID, workflowID, runID, executionInfo, lastWriteVersion, request)
168✔
79
        if err != nil {
168✔
80
                return nil, err
×
81
        }
×
82

83
        workflowExecutionWriteReq, err := d.prepareCreateWorkflowExecutionRequestWithMaps(&newWorkflow)
168✔
84
        if err != nil {
168✔
85
                return nil, err
×
86
        }
×
87

88
        transferTasks, crossClusterTasks, replicationTasks, timerTasks, err := d.prepareNoSQLTasksForWorkflowTxn(
168✔
89
                domainID, workflowID, runID,
168✔
90
                newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
168✔
91
                nil, nil, nil, nil,
168✔
92
        )
168✔
93
        if err != nil {
168✔
94
                return nil, err
×
95
        }
×
96

97
        shardCondition := &nosqlplugin.ShardCondition{
168✔
98
                ShardID: d.shardID,
168✔
99
                RangeID: request.RangeID,
168✔
100
        }
168✔
101

168✔
102
        err = d.db.InsertWorkflowExecutionWithTasks(
168✔
103
                ctx,
168✔
104
                currentWorkflowWriteReq, workflowExecutionWriteReq,
168✔
105
                transferTasks, crossClusterTasks, replicationTasks, timerTasks,
168✔
106
                shardCondition,
168✔
107
        )
168✔
108
        if err != nil {
183✔
109
                conditionFailureErr, isConditionFailedError := err.(*nosqlplugin.WorkflowOperationConditionFailure)
15✔
110
                if isConditionFailedError {
30✔
111
                        switch {
15✔
112
                        case conditionFailureErr.UnknownConditionFailureDetails != nil:
×
113
                                return nil, &persistence.ShardOwnershipLostError{
×
114
                                        ShardID: d.shardID,
×
115
                                        Msg:     *conditionFailureErr.UnknownConditionFailureDetails,
×
116
                                }
×
117
                        case conditionFailureErr.ShardRangeIDNotMatch != nil:
×
118
                                return nil, &persistence.ShardOwnershipLostError{
×
119
                                        ShardID: d.shardID,
×
120
                                        Msg: fmt.Sprintf("Failed to create workflow execution.  Request RangeID: %v, Actual RangeID: %v",
×
121
                                                request.RangeID, *conditionFailureErr.ShardRangeIDNotMatch),
×
122
                                }
×
123
                        case conditionFailureErr.CurrentWorkflowConditionFailInfo != nil:
×
124
                                return nil, &persistence.CurrentWorkflowConditionFailedError{
×
125
                                        Msg: *conditionFailureErr.CurrentWorkflowConditionFailInfo,
×
126
                                }
×
127
                        case conditionFailureErr.WorkflowExecutionAlreadyExists != nil:
15✔
128
                                return nil, &persistence.WorkflowExecutionAlreadyStartedError{
15✔
129
                                        Msg:              conditionFailureErr.WorkflowExecutionAlreadyExists.OtherInfo,
15✔
130
                                        StartRequestID:   conditionFailureErr.WorkflowExecutionAlreadyExists.CreateRequestID,
15✔
131
                                        RunID:            conditionFailureErr.WorkflowExecutionAlreadyExists.RunID,
15✔
132
                                        State:            conditionFailureErr.WorkflowExecutionAlreadyExists.State,
15✔
133
                                        CloseStatus:      conditionFailureErr.WorkflowExecutionAlreadyExists.CloseStatus,
15✔
134
                                        LastWriteVersion: conditionFailureErr.WorkflowExecutionAlreadyExists.LastWriteVersion,
15✔
135
                                }
15✔
136
                        default:
×
137
                                // If ever runs into this branch, there is bug in the code either in here, or in the implementation of nosql plugin
×
138
                                err := fmt.Errorf("unsupported conditionFailureReason error")
×
139
                                d.logger.Error("A code bug exists in persistence layer, please investigate ASAP", tag.Error(err))
×
140
                                return nil, err
×
141
                        }
142
                }
143
                return nil, convertCommonErrors(d.db, "CreateWorkflowExecution", err)
×
144
        }
145

146
        return &persistence.CreateWorkflowExecutionResponse{}, nil
154✔
147
}
148

149
func (d *nosqlExecutionStore) GetWorkflowExecution(
150
        ctx context.Context,
151
        request *persistence.InternalGetWorkflowExecutionRequest,
152
) (*persistence.InternalGetWorkflowExecutionResponse, error) {
399✔
153

399✔
154
        execution := request.Execution
399✔
155
        state, err := d.db.SelectWorkflowExecution(ctx, d.shardID, request.DomainID, execution.WorkflowID, execution.RunID)
399✔
156
        if err != nil {
555✔
157
                if d.db.IsNotFoundError(err) {
312✔
158
                        return nil, &types.EntityNotExistsError{
156✔
159
                                Message: fmt.Sprintf("Workflow execution not found.  WorkflowId: %v, RunId: %v",
156✔
160
                                        execution.WorkflowID, execution.RunID),
156✔
161
                        }
156✔
162
                }
156✔
163

164
                return nil, convertCommonErrors(d.db, "GetWorkflowExecution", err)
×
165
        }
166

167
        return &persistence.InternalGetWorkflowExecutionResponse{State: state}, nil
244✔
168
}
169

170
func (d *nosqlExecutionStore) UpdateWorkflowExecution(
171
        ctx context.Context,
172
        request *persistence.InternalUpdateWorkflowExecutionRequest,
173
) error {
1,445✔
174
        updateWorkflow := request.UpdateWorkflowMutation
1,445✔
175
        newWorkflow := request.NewWorkflowSnapshot
1,445✔
176

1,445✔
177
        executionInfo := updateWorkflow.ExecutionInfo
1,445✔
178
        domainID := executionInfo.DomainID
1,445✔
179
        workflowID := executionInfo.WorkflowID
1,445✔
180
        runID := executionInfo.RunID
1,445✔
181

1,445✔
182
        if err := persistence.ValidateUpdateWorkflowModeState(
1,445✔
183
                request.Mode,
1,445✔
184
                updateWorkflow,
1,445✔
185
                newWorkflow,
1,445✔
186
        ); err != nil {
1,445✔
187
                return err
×
188
        }
×
189

190
        var currentWorkflowWriteReq *nosqlplugin.CurrentWorkflowWriteRequest
1,445✔
191

1,445✔
192
        switch request.Mode {
1,445✔
193
        case persistence.UpdateWorkflowModeIgnoreCurrent:
×
194
                currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
×
195
                        WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop,
×
196
                }
×
197
        case persistence.UpdateWorkflowModeBypassCurrent:
1✔
198
                if err := d.assertNotCurrentExecution(
1✔
199
                        ctx,
1✔
200
                        domainID,
1✔
201
                        workflowID,
1✔
202
                        runID); err != nil {
1✔
203
                        return err
×
204
                }
×
205
                currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
1✔
206
                        WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop,
1✔
207
                }
1✔
208

209
        case persistence.UpdateWorkflowModeUpdateCurrent:
1,445✔
210
                if newWorkflow != nil {
1,503✔
211
                        newExecutionInfo := newWorkflow.ExecutionInfo
58✔
212
                        newLastWriteVersion := newWorkflow.LastWriteVersion
58✔
213
                        newDomainID := newExecutionInfo.DomainID
58✔
214
                        // TODO: ?? would it change at all ??
58✔
215
                        newWorkflowID := newExecutionInfo.WorkflowID
58✔
216
                        newRunID := newExecutionInfo.RunID
58✔
217

58✔
218
                        if domainID != newDomainID {
58✔
219
                                return &types.InternalServiceError{
×
220
                                        Message: "UpdateWorkflowExecution: cannot continue as new to another domain",
×
221
                                }
×
222
                        }
×
223

224
                        currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
58✔
225
                                WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate,
58✔
226
                                Row: nosqlplugin.CurrentWorkflowRow{
58✔
227
                                        ShardID:          d.shardID,
58✔
228
                                        DomainID:         newDomainID,
58✔
229
                                        WorkflowID:       newWorkflowID,
58✔
230
                                        RunID:            newRunID,
58✔
231
                                        State:            newExecutionInfo.State,
58✔
232
                                        CloseStatus:      newExecutionInfo.CloseStatus,
58✔
233
                                        CreateRequestID:  newExecutionInfo.CreateRequestID,
58✔
234
                                        LastWriteVersion: newLastWriteVersion,
58✔
235
                                },
58✔
236
                                Condition: &nosqlplugin.CurrentWorkflowWriteCondition{
58✔
237
                                        CurrentRunID: &runID,
58✔
238
                                },
58✔
239
                        }
58✔
240
                } else {
1,388✔
241
                        lastWriteVersion := updateWorkflow.LastWriteVersion
1,388✔
242

1,388✔
243
                        currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
1,388✔
244
                                WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate,
1,388✔
245
                                Row: nosqlplugin.CurrentWorkflowRow{
1,388✔
246
                                        ShardID:          d.shardID,
1,388✔
247
                                        DomainID:         domainID,
1,388✔
248
                                        WorkflowID:       workflowID,
1,388✔
249
                                        RunID:            runID,
1,388✔
250
                                        State:            executionInfo.State,
1,388✔
251
                                        CloseStatus:      executionInfo.CloseStatus,
1,388✔
252
                                        CreateRequestID:  executionInfo.CreateRequestID,
1,388✔
253
                                        LastWriteVersion: lastWriteVersion,
1,388✔
254
                                },
1,388✔
255
                                Condition: &nosqlplugin.CurrentWorkflowWriteCondition{
1,388✔
256
                                        CurrentRunID: &runID,
1,388✔
257
                                },
1,388✔
258
                        }
1,388✔
259
                }
1,388✔
260

261
        default:
×
262
                return &types.InternalServiceError{
×
263
                        Message: fmt.Sprintf("UpdateWorkflowExecution: unknown mode: %v", request.Mode),
×
264
                }
×
265
        }
266

267
        var mutateExecution, insertExecution *nosqlplugin.WorkflowExecutionRequest
1,445✔
268
        var nosqlTransferTasks []*nosqlplugin.TransferTask
1,445✔
269
        var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
1,445✔
270
        var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
1,445✔
271
        var nosqlTimerTasks []*nosqlplugin.TimerTask
1,445✔
272
        var err error
1,445✔
273

1,445✔
274
        // 1. current
1,445✔
275
        mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(&updateWorkflow)
1,445✔
276
        if err != nil {
1,445✔
277
                return err
×
278
        }
×
279
        nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
1,445✔
280
                domainID, workflowID, updateWorkflow.ExecutionInfo.RunID,
1,445✔
281
                updateWorkflow.TransferTasks, updateWorkflow.CrossClusterTasks, updateWorkflow.ReplicationTasks, updateWorkflow.TimerTasks,
1,445✔
282
                nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
1,445✔
283
        )
1,445✔
284
        if err != nil {
1,445✔
285
                return err
×
286
        }
×
287

288
        // 2. new
289
        if newWorkflow != nil {
1,503✔
290
                insertExecution, err = d.prepareCreateWorkflowExecutionRequestWithMaps(newWorkflow)
58✔
291
                if err != nil {
58✔
292
                        return err
×
293
                }
×
294

295
                nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
58✔
296
                        domainID, workflowID, newWorkflow.ExecutionInfo.RunID,
58✔
297
                        newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
58✔
298
                        nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
58✔
299
                )
58✔
300
                if err != nil {
58✔
301
                        return err
×
302
                }
×
303
        }
304

305
        shardCondition := &nosqlplugin.ShardCondition{
1,445✔
306
                ShardID: d.shardID,
1,445✔
307
                RangeID: request.RangeID,
1,445✔
308
        }
1,445✔
309

1,445✔
310
        err = d.db.UpdateWorkflowExecutionWithTasks(
1,445✔
311
                ctx, currentWorkflowWriteReq,
1,445✔
312
                mutateExecution, insertExecution, nil, // no workflow to reset here
1,445✔
313
                nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
1,445✔
314
                shardCondition)
1,445✔
315

1,445✔
316
        return d.processUpdateWorkflowResult(err, request.RangeID)
1,445✔
317
}
318

319
func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
320
        ctx context.Context,
321
        request *persistence.InternalConflictResolveWorkflowExecutionRequest,
322
) error {
1✔
323
        currentWorkflow := request.CurrentWorkflowMutation
1✔
324
        resetWorkflow := request.ResetWorkflowSnapshot
1✔
325
        newWorkflow := request.NewWorkflowSnapshot
1✔
326

1✔
327
        domainID := resetWorkflow.ExecutionInfo.DomainID
1✔
328
        workflowID := resetWorkflow.ExecutionInfo.WorkflowID
1✔
329

1✔
330
        if err := persistence.ValidateConflictResolveWorkflowModeState(
1✔
331
                request.Mode,
1✔
332
                resetWorkflow,
1✔
333
                newWorkflow,
1✔
334
                currentWorkflow,
1✔
335
        ); err != nil {
1✔
336
                return err
×
337
        }
×
338

339
        var currentWorkflowWriteReq *nosqlplugin.CurrentWorkflowWriteRequest
1✔
340
        var prevRunID string
1✔
341

1✔
342
        switch request.Mode {
1✔
UNCOV
343
        case persistence.ConflictResolveWorkflowModeBypassCurrent:
×
UNCOV
344
                if err := d.assertNotCurrentExecution(
×
UNCOV
345
                        ctx,
×
UNCOV
346
                        domainID,
×
UNCOV
347
                        workflowID,
×
UNCOV
348
                        resetWorkflow.ExecutionInfo.RunID); err != nil {
×
349
                        return err
×
350
                }
×
UNCOV
351
                currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
×
UNCOV
352
                        WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop,
×
UNCOV
353
                }
×
354
        case persistence.ConflictResolveWorkflowModeUpdateCurrent:
1✔
355
                executionInfo := resetWorkflow.ExecutionInfo
1✔
356
                lastWriteVersion := resetWorkflow.LastWriteVersion
1✔
357
                if newWorkflow != nil {
1✔
358
                        executionInfo = newWorkflow.ExecutionInfo
×
359
                        lastWriteVersion = newWorkflow.LastWriteVersion
×
360
                }
×
361

362
                if currentWorkflow != nil {
1✔
363
                        prevRunID = currentWorkflow.ExecutionInfo.RunID
×
364
                } else {
1✔
365
                        // reset workflow is current
1✔
366
                        prevRunID = resetWorkflow.ExecutionInfo.RunID
1✔
367
                }
1✔
368
                currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
1✔
369
                        WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate,
1✔
370
                        Row: nosqlplugin.CurrentWorkflowRow{
1✔
371
                                ShardID:          d.shardID,
1✔
372
                                DomainID:         domainID,
1✔
373
                                WorkflowID:       workflowID,
1✔
374
                                RunID:            executionInfo.RunID,
1✔
375
                                State:            executionInfo.State,
1✔
376
                                CloseStatus:      executionInfo.CloseStatus,
1✔
377
                                CreateRequestID:  executionInfo.CreateRequestID,
1✔
378
                                LastWriteVersion: lastWriteVersion,
1✔
379
                        },
1✔
380
                        Condition: &nosqlplugin.CurrentWorkflowWriteCondition{
1✔
381
                                CurrentRunID: &prevRunID,
1✔
382
                        },
1✔
383
                }
1✔
384

385
        default:
×
386
                return &types.InternalServiceError{
×
387
                        Message: fmt.Sprintf("ConflictResolveWorkflowExecution: unknown mode: %v", request.Mode),
×
388
                }
×
389
        }
390

391
        var mutateExecution, insertExecution, resetExecution *nosqlplugin.WorkflowExecutionRequest
1✔
392
        var nosqlTransferTasks []*nosqlplugin.TransferTask
1✔
393
        var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
1✔
394
        var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
1✔
395
        var nosqlTimerTasks []*nosqlplugin.TimerTask
1✔
396
        var err error
1✔
397

1✔
398
        // 1. current
1✔
399
        if currentWorkflow != nil {
1✔
400
                mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(currentWorkflow)
×
401
                if err != nil {
×
402
                        return err
×
403
                }
×
404
                nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
×
405
                        domainID, workflowID, currentWorkflow.ExecutionInfo.RunID,
×
406
                        currentWorkflow.TransferTasks, currentWorkflow.CrossClusterTasks, currentWorkflow.ReplicationTasks, currentWorkflow.TimerTasks,
×
407
                        nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
×
408
                )
×
409
                if err != nil {
×
410
                        return err
×
411
                }
×
412
        }
413

414
        // 2. reset
415
        resetExecution, err = d.prepareResetWorkflowExecutionRequestWithMapsAndEventBuffer(&resetWorkflow)
1✔
416
        if err != nil {
1✔
417
                return err
×
418
        }
×
419
        nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
1✔
420
                domainID, workflowID, resetWorkflow.ExecutionInfo.RunID,
1✔
421
                resetWorkflow.TransferTasks, resetWorkflow.CrossClusterTasks, resetWorkflow.ReplicationTasks, resetWorkflow.TimerTasks,
1✔
422
                nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
1✔
423
        )
1✔
424
        if err != nil {
1✔
425
                return err
×
426
        }
×
427

428
        // 3. new
429
        if newWorkflow != nil {
1✔
430
                insertExecution, err = d.prepareCreateWorkflowExecutionRequestWithMaps(newWorkflow)
×
431
                if err != nil {
×
432
                        return err
×
433
                }
×
434

435
                nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
×
436
                        domainID, workflowID, newWorkflow.ExecutionInfo.RunID,
×
437
                        newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
×
438
                        nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
×
439
                )
×
440
                if err != nil {
×
441
                        return err
×
442
                }
×
443
        }
444

445
        shardCondition := &nosqlplugin.ShardCondition{
1✔
446
                ShardID: d.shardID,
1✔
447
                RangeID: request.RangeID,
1✔
448
        }
1✔
449

1✔
450
        err = d.db.UpdateWorkflowExecutionWithTasks(
1✔
451
                ctx, currentWorkflowWriteReq,
1✔
452
                mutateExecution, insertExecution, resetExecution,
1✔
453
                nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
1✔
454
                shardCondition)
1✔
455
        return d.processUpdateWorkflowResult(err, request.RangeID)
1✔
456
}
457

458
func (d *nosqlExecutionStore) DeleteWorkflowExecution(
459
        ctx context.Context,
460
        request *persistence.DeleteWorkflowExecutionRequest,
461
) error {
18✔
462
        err := d.db.DeleteWorkflowExecution(ctx, d.shardID, request.DomainID, request.WorkflowID, request.RunID)
18✔
463
        if err != nil {
18✔
464
                return convertCommonErrors(d.db, "DeleteWorkflowExecution", err)
×
465
        }
×
466

467
        return nil
18✔
468
}
469

470
func (d *nosqlExecutionStore) DeleteCurrentWorkflowExecution(
471
        ctx context.Context,
472
        request *persistence.DeleteCurrentWorkflowExecutionRequest,
473
) error {
18✔
474
        err := d.db.DeleteCurrentWorkflow(ctx, d.shardID, request.DomainID, request.WorkflowID, request.RunID)
18✔
475
        if err != nil {
18✔
476
                return convertCommonErrors(d.db, "DeleteCurrentWorkflowExecution", err)
×
477
        }
×
478

479
        return nil
18✔
480
}
481

482
func (d *nosqlExecutionStore) GetCurrentExecution(
483
        ctx context.Context,
484
        request *persistence.GetCurrentExecutionRequest,
485
) (*persistence.GetCurrentExecutionResponse,
486
        error) {
61✔
487
        result, err := d.db.SelectCurrentWorkflow(ctx, d.shardID, request.DomainID, request.WorkflowID)
61✔
488

61✔
489
        if err != nil {
65✔
490
                if d.db.IsNotFoundError(err) {
8✔
491
                        return nil, &types.EntityNotExistsError{
4✔
492
                                Message: fmt.Sprintf("Workflow execution not found.  WorkflowId: %v",
4✔
493
                                        request.WorkflowID),
4✔
494
                        }
4✔
495
                }
4✔
496
                return nil, convertCommonErrors(d.db, "GetCurrentExecution", err)
×
497
        }
498

499
        return &persistence.GetCurrentExecutionResponse{
58✔
500
                RunID:            result.RunID,
58✔
501
                StartRequestID:   result.CreateRequestID,
58✔
502
                State:            result.State,
58✔
503
                CloseStatus:      result.CloseStatus,
58✔
504
                LastWriteVersion: result.LastWriteVersion,
58✔
505
        }, nil
58✔
506
}
507

508
func (d *nosqlExecutionStore) ListCurrentExecutions(
509
        ctx context.Context,
510
        request *persistence.ListCurrentExecutionsRequest,
511
) (*persistence.ListCurrentExecutionsResponse, error) {
×
512
        executions, token, err := d.db.SelectAllCurrentWorkflows(ctx, d.shardID, request.PageToken, request.PageSize)
×
513
        if err != nil {
×
514
                return nil, convertCommonErrors(d.db, "ListCurrentExecutions", err)
×
515
        }
×
516
        return &persistence.ListCurrentExecutionsResponse{
×
517
                Executions: executions,
×
518
                PageToken:  token,
×
519
        }, nil
×
520
}
521

522
func (d *nosqlExecutionStore) IsWorkflowExecutionExists(
523
        ctx context.Context,
524
        request *persistence.IsWorkflowExecutionExistsRequest,
525
) (*persistence.IsWorkflowExecutionExistsResponse, error) {
×
526
        exists, err := d.db.IsWorkflowExecutionExists(ctx, d.shardID, request.DomainID, request.WorkflowID, request.RunID)
×
527
        if err != nil {
×
528
                return nil, convertCommonErrors(d.db, "IsWorkflowExecutionExists", err)
×
529
        }
×
530
        return &persistence.IsWorkflowExecutionExistsResponse{
×
531
                Exists: exists,
×
532
        }, nil
×
533
}
534

535
func (d *nosqlExecutionStore) ListConcreteExecutions(
536
        ctx context.Context,
537
        request *persistence.ListConcreteExecutionsRequest,
538
) (*persistence.InternalListConcreteExecutionsResponse, error) {
×
539
        executions, nextPageToken, err := d.db.SelectAllWorkflowExecutions(ctx, d.shardID, request.PageToken, request.PageSize)
×
540
        if err != nil {
×
541
                return nil, convertCommonErrors(d.db, "ListConcreteExecutions", err)
×
542
        }
×
543
        return &persistence.InternalListConcreteExecutionsResponse{
×
544
                Executions:    executions,
×
545
                NextPageToken: nextPageToken,
×
546
        }, nil
×
547
}
548

549
func (d *nosqlExecutionStore) GetTransferTasks(
550
        ctx context.Context,
551
        request *persistence.GetTransferTasksRequest,
552
) (*persistence.GetTransferTasksResponse, error) {
747✔
553

747✔
554
        tasks, nextPageToken, err := d.db.SelectTransferTasksOrderByTaskID(ctx, d.shardID, request.BatchSize, request.NextPageToken, request.ReadLevel, request.MaxReadLevel)
747✔
555
        if err != nil {
747✔
556
                return nil, convertCommonErrors(d.db, "GetTransferTasks", err)
×
557
        }
×
558

559
        return &persistence.GetTransferTasksResponse{
747✔
560
                Tasks:         tasks,
747✔
561
                NextPageToken: nextPageToken,
747✔
562
        }, nil
747✔
563
}
564

565
func (d *nosqlExecutionStore) GetCrossClusterTasks(
566
        ctx context.Context,
567
        request *persistence.GetCrossClusterTasksRequest,
568
) (*persistence.GetCrossClusterTasksResponse, error) {
48✔
569

48✔
570
        cTasks, nextPageToken, err := d.db.SelectCrossClusterTasksOrderByTaskID(ctx, d.shardID, request.BatchSize, request.NextPageToken, request.TargetCluster, request.ReadLevel, request.MaxReadLevel)
48✔
571

48✔
572
        if err != nil {
48✔
573
                return nil, convertCommonErrors(d.db, "GetCrossClusterTasks", err)
×
574
        }
×
575

576
        var tTasks []*persistence.CrossClusterTaskInfo
48✔
577
        for _, t := range cTasks {
48✔
578
                // revive:disable-next-line:range-val-address Appending address of TransferTask, not of t.
×
579
                tTasks = append(tTasks, &t.TransferTask)
×
580
        }
×
581
        return &persistence.GetCrossClusterTasksResponse{
48✔
582
                Tasks:         tTasks,
48✔
583
                NextPageToken: nextPageToken,
48✔
584
        }, nil
48✔
585
}
586

587
func (d *nosqlExecutionStore) GetReplicationTasks(
588
        ctx context.Context,
589
        request *persistence.GetReplicationTasksRequest,
590
) (*persistence.InternalGetReplicationTasksResponse, error) {
29✔
591

29✔
592
        tasks, nextPageToken, err := d.db.SelectReplicationTasksOrderByTaskID(ctx, d.shardID, request.BatchSize, request.NextPageToken, request.ReadLevel, request.MaxReadLevel)
29✔
593
        if err != nil {
29✔
594
                return nil, convertCommonErrors(d.db, "GetReplicationTasks", err)
×
595
        }
×
596
        return &persistence.InternalGetReplicationTasksResponse{
29✔
597
                Tasks:         tasks,
29✔
598
                NextPageToken: nextPageToken,
29✔
599
        }, nil
29✔
600
}
601

602
func (d *nosqlExecutionStore) CompleteTransferTask(
603
        ctx context.Context,
604
        request *persistence.CompleteTransferTaskRequest,
605
) error {
×
606
        err := d.db.DeleteTransferTask(ctx, d.shardID, request.TaskID)
×
607
        if err != nil {
×
608
                return convertCommonErrors(d.db, "CompleteTransferTask", err)
×
609
        }
×
610

611
        return nil
×
612
}
613

614
func (d *nosqlExecutionStore) RangeCompleteTransferTask(
615
        ctx context.Context,
616
        request *persistence.RangeCompleteTransferTaskRequest,
617
) (*persistence.RangeCompleteTransferTaskResponse, error) {
28✔
618
        err := d.db.RangeDeleteTransferTasks(ctx, d.shardID, request.ExclusiveBeginTaskID, request.InclusiveEndTaskID)
28✔
619
        if err != nil {
28✔
620
                return nil, convertCommonErrors(d.db, "RangeCompleteTransferTask", err)
×
621
        }
×
622

623
        return &persistence.RangeCompleteTransferTaskResponse{TasksCompleted: persistence.UnknownNumRowsAffected}, nil
28✔
624
}
625

626
func (d *nosqlExecutionStore) CompleteCrossClusterTask(
627
        ctx context.Context,
628
        request *persistence.CompleteCrossClusterTaskRequest,
629
) error {
×
630

×
631
        err := d.db.DeleteCrossClusterTask(ctx, d.shardID, request.TargetCluster, request.TaskID)
×
632
        if err != nil {
×
633
                return convertCommonErrors(d.db, "CompleteCrossClusterTask", err)
×
634
        }
×
635

636
        return nil
×
637
}
638

639
func (d *nosqlExecutionStore) RangeCompleteCrossClusterTask(
640
        ctx context.Context,
641
        request *persistence.RangeCompleteCrossClusterTaskRequest,
642
) (*persistence.RangeCompleteCrossClusterTaskResponse, error) {
40✔
643

40✔
644
        err := d.db.RangeDeleteCrossClusterTasks(ctx, d.shardID, request.TargetCluster, request.ExclusiveBeginTaskID, request.InclusiveEndTaskID)
40✔
645
        if err != nil {
40✔
646
                return nil, convertCommonErrors(d.db, "RangeCompleteCrossClusterTask", err)
×
647
        }
×
648

649
        return &persistence.RangeCompleteCrossClusterTaskResponse{TasksCompleted: persistence.UnknownNumRowsAffected}, nil
40✔
650
}
651

652
func (d *nosqlExecutionStore) CompleteReplicationTask(
653
        ctx context.Context,
654
        request *persistence.CompleteReplicationTaskRequest,
655
) error {
×
656
        err := d.db.DeleteReplicationTask(ctx, d.shardID, request.TaskID)
×
657
        if err != nil {
×
658
                return convertCommonErrors(d.db, "CompleteReplicationTask", err)
×
659
        }
×
660

661
        return nil
×
662
}
663

664
func (d *nosqlExecutionStore) RangeCompleteReplicationTask(
665
        ctx context.Context,
666
        request *persistence.RangeCompleteReplicationTaskRequest,
667
) (*persistence.RangeCompleteReplicationTaskResponse, error) {
31✔
668

31✔
669
        err := d.db.RangeDeleteReplicationTasks(ctx, d.shardID, request.InclusiveEndTaskID)
31✔
670
        if err != nil {
31✔
671
                return nil, convertCommonErrors(d.db, "RangeCompleteReplicationTask", err)
×
672
        }
×
673

674
        return &persistence.RangeCompleteReplicationTaskResponse{TasksCompleted: persistence.UnknownNumRowsAffected}, nil
31✔
675
}
676

677
func (d *nosqlExecutionStore) CompleteTimerTask(
678
        ctx context.Context,
679
        request *persistence.CompleteTimerTaskRequest,
680
) error {
×
681
        err := d.db.DeleteTimerTask(ctx, d.shardID, request.TaskID, request.VisibilityTimestamp)
×
682
        if err != nil {
×
683
                return convertCommonErrors(d.db, "CompleteTimerTask", err)
×
684
        }
×
685

686
        return nil
×
687
}
688

689
func (d *nosqlExecutionStore) RangeCompleteTimerTask(
690
        ctx context.Context,
691
        request *persistence.RangeCompleteTimerTaskRequest,
692
) (*persistence.RangeCompleteTimerTaskResponse, error) {
9✔
693
        err := d.db.RangeDeleteTimerTasks(ctx, d.shardID, request.InclusiveBeginTimestamp, request.ExclusiveEndTimestamp)
9✔
694
        if err != nil {
9✔
695
                return nil, convertCommonErrors(d.db, "RangeCompleteTimerTask", err)
×
696
        }
×
697

698
        return &persistence.RangeCompleteTimerTaskResponse{TasksCompleted: persistence.UnknownNumRowsAffected}, nil
9✔
699
}
700

701
func (d *nosqlExecutionStore) GetTimerIndexTasks(
702
        ctx context.Context,
703
        request *persistence.GetTimerIndexTasksRequest,
704
) (*persistence.GetTimerIndexTasksResponse, error) {
1,017✔
705

1,017✔
706
        timers, nextPageToken, err := d.db.SelectTimerTasksOrderByVisibilityTime(ctx, d.shardID, request.BatchSize, request.NextPageToken, request.MinTimestamp, request.MaxTimestamp)
1,017✔
707
        if err != nil {
1,017✔
708
                return nil, convertCommonErrors(d.db, "GetTimerTasks", err)
×
709
        }
×
710

711
        return &persistence.GetTimerIndexTasksResponse{
1,017✔
712
                Timers:        timers,
1,017✔
713
                NextPageToken: nextPageToken,
1,017✔
714
        }, nil
1,017✔
715
}
716

717
func (d *nosqlExecutionStore) PutReplicationTaskToDLQ(
718
        ctx context.Context,
719
        request *persistence.InternalPutReplicationTaskToDLQRequest,
720
) error {
1✔
721

1✔
722
        err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, *request.TaskInfo)
1✔
723
        if err != nil {
1✔
724
                return convertCommonErrors(d.db, "PutReplicationTaskToDLQ", err)
×
725
        }
×
726

727
        return nil
1✔
728
}
729

730
func (d *nosqlExecutionStore) GetReplicationTasksFromDLQ(
731
        ctx context.Context,
732
        request *persistence.GetReplicationTasksFromDLQRequest,
733
) (*persistence.InternalGetReplicationTasksFromDLQResponse, error) {
1✔
734
        tasks, nextPageToken, err := d.db.SelectReplicationDLQTasksOrderByTaskID(ctx, d.shardID, request.SourceClusterName, request.BatchSize, request.NextPageToken, request.ReadLevel, request.MaxReadLevel)
1✔
735
        if err != nil {
1✔
736
                return nil, convertCommonErrors(d.db, "GetReplicationTasksFromDLQ", err)
×
737
        }
×
738
        return &persistence.InternalGetReplicationTasksResponse{
1✔
739
                Tasks:         tasks,
1✔
740
                NextPageToken: nextPageToken,
1✔
741
        }, nil
1✔
742
}
743

744
func (d *nosqlExecutionStore) GetReplicationDLQSize(
745
        ctx context.Context,
746
        request *persistence.GetReplicationDLQSizeRequest,
747
) (*persistence.GetReplicationDLQSizeResponse, error) {
4✔
748

4✔
749
        size, err := d.db.SelectReplicationDLQTasksCount(ctx, d.shardID, request.SourceClusterName)
4✔
750
        if err != nil {
4✔
751
                return nil, convertCommonErrors(d.db, "GetReplicationDLQSize", err)
×
752
        }
×
753
        return &persistence.GetReplicationDLQSizeResponse{
4✔
754
                Size: size,
4✔
755
        }, nil
4✔
756
}
757

758
func (d *nosqlExecutionStore) DeleteReplicationTaskFromDLQ(
759
        ctx context.Context,
760
        request *persistence.DeleteReplicationTaskFromDLQRequest,
761
) error {
×
762

×
763
        err := d.db.DeleteReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, request.TaskID)
×
764
        if err != nil {
×
765
                return convertCommonErrors(d.db, "DeleteReplicationTaskFromDLQ", err)
×
766
        }
×
767

768
        return nil
×
769
}
770

771
func (d *nosqlExecutionStore) RangeDeleteReplicationTaskFromDLQ(
772
        ctx context.Context,
773
        request *persistence.RangeDeleteReplicationTaskFromDLQRequest,
774
) (*persistence.RangeDeleteReplicationTaskFromDLQResponse, error) {
×
775

×
776
        err := d.db.RangeDeleteReplicationDLQTasks(ctx, d.shardID, request.SourceClusterName, request.ExclusiveBeginTaskID, request.InclusiveEndTaskID)
×
777
        if err != nil {
×
778
                return nil, convertCommonErrors(d.db, "RangeDeleteReplicationTaskFromDLQ", err)
×
779
        }
×
780

781
        return &persistence.RangeDeleteReplicationTaskFromDLQResponse{TasksCompleted: persistence.UnknownNumRowsAffected}, nil
×
782
}
783

784
func (d *nosqlExecutionStore) CreateFailoverMarkerTasks(
785
        ctx context.Context,
786
        request *persistence.CreateFailoverMarkersRequest,
787
) error {
×
788

×
789
        var nosqlTasks []*nosqlplugin.ReplicationTask
×
790
        for _, task := range request.Markers {
×
791
                ts := []persistence.Task{task}
×
792

×
793
                tasks, err := d.prepareReplicationTasksForWorkflowTxn(task.DomainID, rowTypeReplicationWorkflowID, rowTypeReplicationRunID, ts)
×
794
                if err != nil {
×
795
                        return err
×
796
                }
×
797
                nosqlTasks = append(nosqlTasks, tasks...)
×
798
        }
799

800
        err := d.db.InsertReplicationTask(ctx, nosqlTasks, nosqlplugin.ShardCondition{
×
801
                ShardID: d.shardID,
×
802
                RangeID: request.RangeID,
×
803
        })
×
804

×
805
        if err != nil {
×
806
                conditionFailureErr, isConditionFailedError := err.(*nosqlplugin.ShardOperationConditionFailure)
×
807
                if isConditionFailedError {
×
808
                        return &persistence.ShardOwnershipLostError{
×
809
                                ShardID: d.shardID,
×
810
                                Msg: fmt.Sprintf("Failed to create workflow execution.  Request RangeID: %v, columns: (%v)",
×
811
                                        conditionFailureErr.RangeID, conditionFailureErr.Details),
×
812
                        }
×
813
                }
×
814
        }
815
        return nil
×
816
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc