• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

08 May 2023 05:10PM UTC coverage: 57.153% (-0.07%) from 57.225%
0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

push

buildkite

GitHub
Update persistence layer to adopt idl update for isolation (#5254)

204 of 204 new or added lines in 15 files covered. (100.0%)

85781 of 150089 relevant lines covered (57.15%)

2419.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.77
/common/persistence/persistence-tests/persistenceTestBase.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package persistencetests
23

24
import (
25
        "context"
26
        "fmt"
27
        "math"
28
        "math/rand"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/pborman/uuid"
33
        "github.com/stretchr/testify/suite"
34
        "github.com/uber-go/tally"
35

36
        "github.com/uber/cadence/common"
37
        "github.com/uber/cadence/common/backoff"
38
        "github.com/uber/cadence/common/cluster"
39
        "github.com/uber/cadence/common/config"
40
        "github.com/uber/cadence/common/dynamicconfig"
41
        "github.com/uber/cadence/common/log"
42
        "github.com/uber/cadence/common/log/loggerimpl"
43
        "github.com/uber/cadence/common/log/tag"
44
        "github.com/uber/cadence/common/metrics"
45
        "github.com/uber/cadence/common/persistence"
46
        "github.com/uber/cadence/common/persistence/client"
47
        "github.com/uber/cadence/common/persistence/nosql"
48
        "github.com/uber/cadence/common/persistence/persistence-tests/testcluster"
49
        "github.com/uber/cadence/common/persistence/sql"
50
        "github.com/uber/cadence/common/service"
51
        "github.com/uber/cadence/common/types"
52
)
53

54
type (
55
        // TransferTaskIDGenerator generates IDs for transfer tasks written by helper methods
56
        TransferTaskIDGenerator interface {
57
                GenerateTransferTaskID() (int64, error)
58
        }
59

60
        // TestBaseOptions options to configure workflow test base.
61
        TestBaseOptions struct {
62
                DBPluginName    string
63
                DBName          string
64
                DBUsername      string
65
                DBPassword      string
66
                DBHost          string
67
                DBPort          int              `yaml:"-"`
68
                StoreType       string           `yaml:"-"`
69
                SchemaDir       string           `yaml:"-"`
70
                ClusterMetadata cluster.Metadata `yaml:"-"`
71
                ProtoVersion    int              `yaml:"-"`
72
        }
73

74
        // TestBase wraps the base setup needed to create workflows over persistence layer.
75
        TestBase struct {
76
                suite.Suite
77
                ShardMgr                  persistence.ShardManager
78
                ExecutionMgrFactory       client.Factory
79
                ExecutionManager          persistence.ExecutionManager
80
                TaskMgr                   persistence.TaskManager
81
                HistoryV2Mgr              persistence.HistoryManager
82
                DomainManager             persistence.DomainManager
83
                DomainReplicationQueueMgr persistence.QueueManager
84
                ShardInfo                 *persistence.ShardInfo
85
                TaskIDGenerator           TransferTaskIDGenerator
86
                ClusterMetadata           cluster.Metadata
87
                DefaultTestCluster        testcluster.PersistenceTestCluster
88
                VisibilityTestCluster     testcluster.PersistenceTestCluster
89
                Logger                    log.Logger
90
                PayloadSerializer         persistence.PayloadSerializer
91
                ConfigStoreManager        persistence.ConfigStoreManager
92
                DynamicConfiguration      persistence.DynamicConfiguration
93
        }
94

95
        // TestBaseParams defines the input of TestBase
96
        TestBaseParams struct {
97
                DefaultTestCluster    testcluster.PersistenceTestCluster
98
                VisibilityTestCluster testcluster.PersistenceTestCluster
99
                ClusterMetadata       cluster.Metadata
100
                DynamicConfiguration  persistence.DynamicConfiguration
101
        }
102

103
        // TestTransferTaskIDGenerator helper
104
        TestTransferTaskIDGenerator struct {
105
                seqNum int64
106
        }
107
)
108

109
const (
110
        defaultScheduleToStartTimeout = 111
111
)
112

113
// NewTestBaseFromParams returns a customized test base from given input
114
func NewTestBaseFromParams(params TestBaseParams) TestBase {
15✔
115
        logger, err := loggerimpl.NewDevelopment()
15✔
116
        if err != nil {
15✔
117
                panic(err)
×
118
        }
119

120
        return TestBase{
15✔
121
                DefaultTestCluster:    params.DefaultTestCluster,
15✔
122
                VisibilityTestCluster: params.VisibilityTestCluster,
15✔
123
                ClusterMetadata:       params.ClusterMetadata,
15✔
124
                PayloadSerializer:     persistence.NewPayloadSerializer(),
15✔
125
                Logger:                logger,
15✔
126
                DynamicConfiguration:  params.DynamicConfiguration,
15✔
127
        }
15✔
128
}
129

130
// NewTestBaseWithNoSQL returns a persistence test base backed by nosql datastore
131
func NewTestBaseWithNoSQL(options *TestBaseOptions) TestBase {
×
132
        if options.DBName == "" {
×
133
                options.DBName = "test_" + GenerateRandomDBName(10)
×
134
        }
×
135
        testCluster := nosql.NewTestCluster(options.DBPluginName, options.DBName, options.DBUsername, options.DBPassword, options.DBHost, options.DBPort, options.ProtoVersion, "")
×
136
        metadata := options.ClusterMetadata
×
137
        if metadata.GetCurrentClusterName() == "" {
×
138
                metadata = cluster.GetTestClusterMetadata(false)
×
139
        }
×
140
        dc := persistence.DynamicConfiguration{
×
141
                EnableSQLAsyncTransaction:                dynamicconfig.GetBoolPropertyFn(false),
×
142
                EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
×
143
                PersistenceSampleLoggingRate:             dynamicconfig.GetIntPropertyFn(100),
×
144
                EnableShardIDMetrics:                     dynamicconfig.GetBoolPropertyFn(true),
×
145
        }
×
146
        params := TestBaseParams{
×
147
                DefaultTestCluster:    testCluster,
×
148
                VisibilityTestCluster: testCluster,
×
149
                ClusterMetadata:       metadata,
×
150
                DynamicConfiguration:  dc,
×
151
        }
×
152
        return NewTestBaseFromParams(params)
×
153
}
154

155
// NewTestBaseWithSQL returns a new persistence test base backed by SQL
156
func NewTestBaseWithSQL(options *TestBaseOptions) TestBase {
×
157
        if options.DBName == "" {
×
158
                options.DBName = "test_" + GenerateRandomDBName(10)
×
159
        }
×
160
        testCluster := sql.NewTestCluster(options.DBPluginName, options.DBName, options.DBUsername, options.DBPassword, options.DBHost, options.DBPort, options.SchemaDir)
×
161
        metadata := options.ClusterMetadata
×
162
        if metadata.GetCurrentClusterName() == "" {
×
163
                metadata = cluster.GetTestClusterMetadata(false)
×
164
        }
×
165
        dc := persistence.DynamicConfiguration{
×
166
                EnableSQLAsyncTransaction:                dynamicconfig.GetBoolPropertyFn(false),
×
167
                EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
×
168
                PersistenceSampleLoggingRate:             dynamicconfig.GetIntPropertyFn(100),
×
169
                EnableShardIDMetrics:                     dynamicconfig.GetBoolPropertyFn(true),
×
170
        }
×
171
        params := TestBaseParams{
×
172
                DefaultTestCluster:    testCluster,
×
173
                VisibilityTestCluster: testCluster,
×
174
                ClusterMetadata:       metadata,
×
175
                DynamicConfiguration:  dc,
×
176
        }
×
177
        return NewTestBaseFromParams(params)
×
178
}
179

180
// Config returns the persistence configuration for this test
181
func (s *TestBase) Config() config.Persistence {
15✔
182
        cfg := s.DefaultTestCluster.Config()
15✔
183
        if s.DefaultTestCluster == s.VisibilityTestCluster {
30✔
184
                return cfg
15✔
185
        }
15✔
186
        vCfg := s.VisibilityTestCluster.Config()
×
187
        cfg.VisibilityStore = "visibility_ " + vCfg.VisibilityStore
×
188
        cfg.DataStores[cfg.VisibilityStore] = vCfg.DataStores[vCfg.VisibilityStore]
×
189
        return cfg
×
190
}
191

192
// Setup sets up the test base, must be called as part of SetupSuite
193
func (s *TestBase) Setup() {
15✔
194
        var err error
15✔
195
        shardID := 10
15✔
196
        clusterName := s.ClusterMetadata.GetCurrentClusterName()
15✔
197

15✔
198
        s.DefaultTestCluster.SetupTestDatabase()
15✔
199

15✔
200
        cfg := s.DefaultTestCluster.Config()
15✔
201
        scope := tally.NewTestScope(service.History, make(map[string]string))
15✔
202
        metricsClient := metrics.NewClient(scope, service.GetMetricsServiceIdx(service.History, s.Logger))
15✔
203
        factory := client.NewFactory(&cfg, nil, clusterName, metricsClient, s.Logger, &s.DynamicConfiguration)
15✔
204

15✔
205
        s.TaskMgr, err = factory.NewTaskManager()
15✔
206
        s.fatalOnError("NewTaskManager", err)
15✔
207

15✔
208
        s.DomainManager, err = factory.NewDomainManager()
15✔
209
        s.fatalOnError("NewDomainManager", err)
15✔
210

15✔
211
        s.HistoryV2Mgr, err = factory.NewHistoryManager()
15✔
212
        s.fatalOnError("NewHistoryManager", err)
15✔
213

15✔
214
        s.ShardMgr, err = factory.NewShardManager()
15✔
215
        s.fatalOnError("NewShardManager", err)
15✔
216

15✔
217
        if cfg.DefaultStoreType() == config.StoreTypeCassandra {
20✔
218
                s.ConfigStoreManager, err = factory.NewConfigStoreManager()
5✔
219
                s.fatalOnError("NewConfigStoreManager", err)
5✔
220
        }
5✔
221

222
        s.ExecutionMgrFactory = factory
15✔
223
        s.ExecutionManager, err = factory.NewExecutionManager(shardID)
15✔
224
        s.fatalOnError("NewExecutionManager", err)
15✔
225

15✔
226
        domainFilter := &types.DomainFilter{
15✔
227
                DomainIDs:    []string{},
15✔
228
                ReverseMatch: true,
15✔
229
        }
15✔
230
        transferPQSMap := map[string][]*types.ProcessingQueueState{
15✔
231
                s.ClusterMetadata.GetCurrentClusterName(): {
15✔
232
                        &types.ProcessingQueueState{
15✔
233
                                Level:        common.Int32Ptr(0),
15✔
234
                                AckLevel:     common.Int64Ptr(0),
15✔
235
                                MaxLevel:     common.Int64Ptr(0),
15✔
236
                                DomainFilter: domainFilter,
15✔
237
                        },
15✔
238
                },
15✔
239
        }
15✔
240
        transferPQS := types.ProcessingQueueStates{StatesByCluster: transferPQSMap}
15✔
241
        timerPQSMap := map[string][]*types.ProcessingQueueState{
15✔
242
                s.ClusterMetadata.GetCurrentClusterName(): {
15✔
243
                        &types.ProcessingQueueState{
15✔
244
                                Level:        common.Int32Ptr(0),
15✔
245
                                AckLevel:     common.Int64Ptr(time.Now().UnixNano()),
15✔
246
                                MaxLevel:     common.Int64Ptr(time.Now().UnixNano()),
15✔
247
                                DomainFilter: domainFilter,
15✔
248
                        },
15✔
249
                },
15✔
250
        }
15✔
251
        timerPQS := types.ProcessingQueueStates{StatesByCluster: timerPQSMap}
15✔
252

15✔
253
        s.ShardInfo = &persistence.ShardInfo{
15✔
254
                ShardID:                       shardID,
15✔
255
                RangeID:                       0,
15✔
256
                TransferAckLevel:              0,
15✔
257
                ReplicationAckLevel:           0,
15✔
258
                TimerAckLevel:                 time.Time{},
15✔
259
                ClusterTimerAckLevel:          map[string]time.Time{clusterName: time.Time{}},
15✔
260
                ClusterTransferAckLevel:       map[string]int64{clusterName: 0},
15✔
261
                TransferProcessingQueueStates: &transferPQS,
15✔
262
                TimerProcessingQueueStates:    &timerPQS,
15✔
263
        }
15✔
264

15✔
265
        s.TaskIDGenerator = &TestTransferTaskIDGenerator{}
15✔
266
        err = s.ShardMgr.CreateShard(context.Background(), &persistence.CreateShardRequest{ShardInfo: s.ShardInfo})
15✔
267
        s.fatalOnError("CreateShard", err)
15✔
268

15✔
269
        queue, err := factory.NewDomainReplicationQueueManager()
15✔
270
        s.fatalOnError("Create DomainReplicationQueue", err)
15✔
271
        s.DomainReplicationQueueMgr = queue
15✔
272
}
273

274
func (s *TestBase) fatalOnError(msg string, err error) {
91✔
275
        if err != nil {
91✔
276
                s.Logger.Fatal(msg, tag.Error(err))
×
277
        }
×
278
}
279

280
// CreateShard is a utility method to create the shard using persistence layer
281
func (s *TestBase) CreateShard(ctx context.Context, shardID int, owner string, rangeID int64) error {
36✔
282
        info := &persistence.ShardInfo{
36✔
283
                ShardID: shardID,
36✔
284
                Owner:   owner,
36✔
285
                RangeID: rangeID,
36✔
286
        }
36✔
287

36✔
288
        return s.ShardMgr.CreateShard(ctx, &persistence.CreateShardRequest{
36✔
289
                ShardInfo: info,
36✔
290
        })
36✔
291
}
36✔
292

293
// GetShard is a utility method to get the shard using persistence layer
294
func (s *TestBase) GetShard(ctx context.Context, shardID int) (*persistence.ShardInfo, error) {
×
295
        response, err := s.ShardMgr.GetShard(ctx, &persistence.GetShardRequest{
×
296
                ShardID: shardID,
×
297
        })
×
298

×
299
        if err != nil {
×
300
                return nil, err
×
301
        }
×
302

303
        return response.ShardInfo, nil
×
304
}
305

306
// UpdateShard is a utility method to update the shard using persistence layer
307
func (s *TestBase) UpdateShard(ctx context.Context, updatedInfo *persistence.ShardInfo, previousRangeID int64) error {
×
308
        return s.ShardMgr.UpdateShard(ctx, &persistence.UpdateShardRequest{
×
309
                ShardInfo:       updatedInfo,
×
310
                PreviousRangeID: previousRangeID,
×
311
        })
×
312
}
×
313

314
// CreateWorkflowExecutionWithBranchToken test util function
315
func (s *TestBase) CreateWorkflowExecutionWithBranchToken(
316
        ctx context.Context,
317
        domainID string,
318
        workflowExecution types.WorkflowExecution,
319
        taskList string,
320
        wType string,
321
        wTimeout int32,
322
        decisionTimeout int32,
323
        executionContext []byte,
324
        nextEventID int64,
325
        lastProcessedEventID int64,
326
        decisionScheduleID int64,
327
        branchToken []byte,
328
        timerTasks []persistence.Task,
329
        partitionConfig map[string]string,
330
) (*persistence.CreateWorkflowExecutionResponse, error) {
×
331

×
332
        now := time.Now()
×
333
        versionHistory := persistence.NewVersionHistory(branchToken, []*persistence.VersionHistoryItem{
×
334
                {decisionScheduleID, common.EmptyVersion},
×
335
        })
×
336
        versionHistories := persistence.NewVersionHistories(versionHistory)
×
337
        response, err := s.ExecutionManager.CreateWorkflowExecution(ctx, &persistence.CreateWorkflowExecutionRequest{
×
338
                NewWorkflowSnapshot: persistence.WorkflowSnapshot{
×
339
                        ExecutionInfo: &persistence.WorkflowExecutionInfo{
×
340
                                CreateRequestID:             uuid.New(),
×
341
                                DomainID:                    domainID,
×
342
                                WorkflowID:                  workflowExecution.GetWorkflowID(),
×
343
                                RunID:                       workflowExecution.GetRunID(),
×
344
                                FirstExecutionRunID:         workflowExecution.GetRunID(),
×
345
                                TaskList:                    taskList,
×
346
                                WorkflowTypeName:            wType,
×
347
                                WorkflowTimeout:             wTimeout,
×
348
                                DecisionStartToCloseTimeout: decisionTimeout,
×
349
                                ExecutionContext:            executionContext,
×
350
                                State:                       persistence.WorkflowStateRunning,
×
351
                                CloseStatus:                 persistence.WorkflowCloseStatusNone,
×
352
                                LastFirstEventID:            common.FirstEventID,
×
353
                                NextEventID:                 nextEventID,
×
354
                                LastProcessedEvent:          lastProcessedEventID,
×
355
                                LastUpdatedTimestamp:        now,
×
356
                                StartTimestamp:              now,
×
357
                                DecisionScheduleID:          decisionScheduleID,
×
358
                                DecisionStartedID:           common.EmptyEventID,
×
359
                                DecisionTimeout:             1,
×
360
                                BranchToken:                 branchToken,
×
361
                                PartitionConfig:             partitionConfig,
×
362
                        },
×
363
                        ExecutionStats: &persistence.ExecutionStats{},
×
364
                        TransferTasks: []persistence.Task{
×
365
                                &persistence.DecisionTask{
×
366
                                        TaskID:              s.GetNextSequenceNumber(),
×
367
                                        DomainID:            domainID,
×
368
                                        TaskList:            taskList,
×
369
                                        ScheduleID:          decisionScheduleID,
×
370
                                        VisibilityTimestamp: time.Now(),
×
371
                                },
×
372
                        },
×
373
                        TimerTasks:       timerTasks,
×
374
                        Checksum:         testWorkflowChecksum,
×
375
                        VersionHistories: versionHistories,
×
376
                },
×
377
                RangeID: s.ShardInfo.RangeID,
×
378
        })
×
379

×
380
        return response, err
×
381
}
×
382

383
// CreateWorkflowExecution is a utility method to create workflow executions
384
func (s *TestBase) CreateWorkflowExecution(
385
        ctx context.Context,
386
        domainID string,
387
        workflowExecution types.WorkflowExecution,
388
        taskList string,
389
        wType string,
390
        wTimeout int32,
391
        decisionTimeout int32,
392
        executionContext []byte,
393
        nextEventID int64,
394
        lastProcessedEventID int64,
395
        decisionScheduleID int64,
396
        timerTasks []persistence.Task,
397
        partitionConfig map[string]string,
398
) (*persistence.CreateWorkflowExecutionResponse, error) {
×
399

×
400
        return s.CreateWorkflowExecutionWithBranchToken(ctx, domainID, workflowExecution, taskList, wType, wTimeout, decisionTimeout,
×
401
                executionContext, nextEventID, lastProcessedEventID, decisionScheduleID, nil, timerTasks, partitionConfig)
×
402
}
×
403

404
// CreateChildWorkflowExecution is a utility method to create child workflow executions
405
func (s *TestBase) CreateChildWorkflowExecution(ctx context.Context, domainID string, workflowExecution types.WorkflowExecution,
406
        parentDomainID string, parentExecution types.WorkflowExecution, initiatedID int64, taskList, wType string,
407
        wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
408
        decisionScheduleID int64, timerTasks []persistence.Task, partitionConfig map[string]string) (*persistence.CreateWorkflowExecutionResponse, error) {
×
409
        now := time.Now()
×
410
        versionHistory := persistence.NewVersionHistory([]byte{}, []*persistence.VersionHistoryItem{
×
411
                {decisionScheduleID, common.EmptyVersion},
×
412
        })
×
413
        versionHistories := persistence.NewVersionHistories(versionHistory)
×
414
        response, err := s.ExecutionManager.CreateWorkflowExecution(ctx, &persistence.CreateWorkflowExecutionRequest{
×
415
                NewWorkflowSnapshot: persistence.WorkflowSnapshot{
×
416
                        ExecutionInfo: &persistence.WorkflowExecutionInfo{
×
417
                                CreateRequestID:             uuid.New(),
×
418
                                DomainID:                    domainID,
×
419
                                WorkflowID:                  workflowExecution.GetWorkflowID(),
×
420
                                RunID:                       workflowExecution.GetRunID(),
×
421
                                FirstExecutionRunID:         workflowExecution.GetRunID(),
×
422
                                ParentDomainID:              parentDomainID,
×
423
                                ParentWorkflowID:            parentExecution.GetWorkflowID(),
×
424
                                ParentRunID:                 parentExecution.GetRunID(),
×
425
                                InitiatedID:                 initiatedID,
×
426
                                TaskList:                    taskList,
×
427
                                WorkflowTypeName:            wType,
×
428
                                WorkflowTimeout:             wTimeout,
×
429
                                DecisionStartToCloseTimeout: decisionTimeout,
×
430
                                ExecutionContext:            executionContext,
×
431
                                State:                       persistence.WorkflowStateCreated,
×
432
                                CloseStatus:                 persistence.WorkflowCloseStatusNone,
×
433
                                LastFirstEventID:            common.FirstEventID,
×
434
                                NextEventID:                 nextEventID,
×
435
                                LastProcessedEvent:          lastProcessedEventID,
×
436
                                LastUpdatedTimestamp:        now,
×
437
                                StartTimestamp:              now,
×
438
                                DecisionScheduleID:          decisionScheduleID,
×
439
                                DecisionStartedID:           common.EmptyEventID,
×
440
                                DecisionTimeout:             1,
×
441
                                PartitionConfig:             partitionConfig,
×
442
                        },
×
443
                        ExecutionStats: &persistence.ExecutionStats{},
×
444
                        TransferTasks: []persistence.Task{
×
445
                                &persistence.DecisionTask{
×
446
                                        TaskID:     s.GetNextSequenceNumber(),
×
447
                                        DomainID:   domainID,
×
448
                                        TaskList:   taskList,
×
449
                                        ScheduleID: decisionScheduleID,
×
450
                                },
×
451
                        },
×
452
                        TimerTasks:       timerTasks,
×
453
                        VersionHistories: versionHistories,
×
454
                },
×
455
                RangeID:    s.ShardInfo.RangeID,
×
456
                DomainName: s.DomainManager.GetName(),
×
457
        })
×
458

×
459
        return response, err
×
460
}
×
461

462
// GetWorkflowExecutionInfoWithStats is a utility method to retrieve execution info with size stats
463
func (s *TestBase) GetWorkflowExecutionInfoWithStats(ctx context.Context, domainID string, workflowExecution types.WorkflowExecution) (
464
        *persistence.MutableStateStats, *persistence.WorkflowMutableState, error) {
×
465
        response, err := s.ExecutionManager.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
×
466
                DomainID:  domainID,
×
467
                Execution: workflowExecution,
×
468
        })
×
469
        if err != nil {
×
470
                return nil, nil, err
×
471
        }
×
472

473
        return response.MutableStateStats, response.State, nil
×
474
}
475

476
// GetWorkflowExecutionInfo is a utility method to retrieve execution info
477
func (s *TestBase) GetWorkflowExecutionInfo(ctx context.Context, domainID string, workflowExecution types.WorkflowExecution) (
478
        *persistence.WorkflowMutableState, error) {
×
479
        response, err := s.ExecutionManager.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
×
480
                DomainID:  domainID,
×
481
                Execution: workflowExecution,
×
482
        })
×
483
        if err != nil {
×
484
                return nil, err
×
485
        }
×
486
        return response.State, nil
×
487
}
488

489
// GetCurrentWorkflowRunID returns the workflow run ID for the given params
490
func (s *TestBase) GetCurrentWorkflowRunID(ctx context.Context, domainID, workflowID string) (string, error) {
×
491
        response, err := s.ExecutionManager.GetCurrentExecution(ctx, &persistence.GetCurrentExecutionRequest{
×
492
                DomainID:   domainID,
×
493
                WorkflowID: workflowID,
×
494
        })
×
495

×
496
        if err != nil {
×
497
                return "", err
×
498
        }
×
499

500
        return response.RunID, nil
×
501
}
502

503
// ContinueAsNewExecution is a utility method to create workflow executions
504
func (s *TestBase) ContinueAsNewExecution(
505
        ctx context.Context,
506
        updatedInfo *persistence.WorkflowExecutionInfo,
507
        updatedStats *persistence.ExecutionStats,
508
        condition int64,
509
        newExecution types.WorkflowExecution,
510
        nextEventID, decisionScheduleID int64,
511
        prevResetPoints *types.ResetPoints,
512
) error {
×
513

×
514
        now := time.Now()
×
515
        newdecisionTask := &persistence.DecisionTask{
×
516
                TaskID:     s.GetNextSequenceNumber(),
×
517
                DomainID:   updatedInfo.DomainID,
×
518
                TaskList:   updatedInfo.TaskList,
×
519
                ScheduleID: int64(decisionScheduleID),
×
520
        }
×
521
        versionHistory := persistence.NewVersionHistory([]byte{}, []*persistence.VersionHistoryItem{
×
522
                {decisionScheduleID, common.EmptyVersion},
×
523
        })
×
524
        versionHistories := persistence.NewVersionHistories(versionHistory)
×
525

×
526
        req := &persistence.UpdateWorkflowExecutionRequest{
×
527
                UpdateWorkflowMutation: persistence.WorkflowMutation{
×
528
                        ExecutionInfo:       updatedInfo,
×
529
                        ExecutionStats:      updatedStats,
×
530
                        TransferTasks:       []persistence.Task{newdecisionTask},
×
531
                        TimerTasks:          nil,
×
532
                        Condition:           condition,
×
533
                        UpsertActivityInfos: nil,
×
534
                        DeleteActivityInfos: nil,
×
535
                        UpsertTimerInfos:    nil,
×
536
                        DeleteTimerInfos:    nil,
×
537
                        VersionHistories:    versionHistories,
×
538
                },
×
539
                NewWorkflowSnapshot: &persistence.WorkflowSnapshot{
×
540
                        ExecutionInfo: &persistence.WorkflowExecutionInfo{
×
541
                                CreateRequestID:             uuid.New(),
×
542
                                DomainID:                    updatedInfo.DomainID,
×
543
                                WorkflowID:                  newExecution.GetWorkflowID(),
×
544
                                RunID:                       newExecution.GetRunID(),
×
545
                                FirstExecutionRunID:         updatedInfo.FirstExecutionRunID,
×
546
                                TaskList:                    updatedInfo.TaskList,
×
547
                                WorkflowTypeName:            updatedInfo.WorkflowTypeName,
×
548
                                WorkflowTimeout:             updatedInfo.WorkflowTimeout,
×
549
                                DecisionStartToCloseTimeout: updatedInfo.DecisionStartToCloseTimeout,
×
550
                                ExecutionContext:            nil,
×
551
                                State:                       updatedInfo.State,
×
552
                                CloseStatus:                 updatedInfo.CloseStatus,
×
553
                                LastFirstEventID:            common.FirstEventID,
×
554
                                NextEventID:                 nextEventID,
×
555
                                LastProcessedEvent:          common.EmptyEventID,
×
556
                                LastUpdatedTimestamp:        now,
×
557
                                StartTimestamp:              now,
×
558
                                DecisionScheduleID:          decisionScheduleID,
×
559
                                DecisionStartedID:           common.EmptyEventID,
×
560
                                DecisionTimeout:             1,
×
561
                                AutoResetPoints:             prevResetPoints,
×
562
                                PartitionConfig:             updatedInfo.PartitionConfig,
×
563
                        },
×
564
                        ExecutionStats:   updatedStats,
×
565
                        TransferTasks:    nil,
×
566
                        TimerTasks:       nil,
×
567
                        VersionHistories: versionHistories,
×
568
                },
×
569
                RangeID:  s.ShardInfo.RangeID,
×
570
                Encoding: pickRandomEncoding(),
×
571
                //To DO: next PR for UpdateWorkflowExecution
×
572
                //DomainName: s.DomainManager.GetName(),
×
573
        }
×
574
        req.UpdateWorkflowMutation.ExecutionInfo.State = persistence.WorkflowStateCompleted
×
575
        req.UpdateWorkflowMutation.ExecutionInfo.CloseStatus = persistence.WorkflowCloseStatusContinuedAsNew
×
576
        _, err := s.ExecutionManager.UpdateWorkflowExecution(ctx, req)
×
577
        return err
×
578
}
×
579

580
// UpdateWorkflowExecution is a utility method to update workflow execution
581
func (s *TestBase) UpdateWorkflowExecution(
582
        ctx context.Context,
583
        updatedInfo *persistence.WorkflowExecutionInfo,
584
        updatedStats *persistence.ExecutionStats,
585
        updatedVersionHistories *persistence.VersionHistories,
586
        decisionScheduleIDs []int64,
587
        activityScheduleIDs []int64,
588
        condition int64,
589
        timerTasks []persistence.Task,
590
        upsertActivityInfos []*persistence.ActivityInfo,
591
        deleteActivityInfos []int64,
592
        upsertTimerInfos []*persistence.TimerInfo,
593
        deleteTimerInfos []string,
594
) error {
×
595
        return s.UpdateWorkflowExecutionWithRangeID(
×
596
                ctx,
×
597
                updatedInfo,
×
598
                updatedStats,
×
599
                updatedVersionHistories,
×
600
                decisionScheduleIDs,
×
601
                activityScheduleIDs,
×
602
                s.ShardInfo.RangeID,
×
603
                condition,
×
604
                timerTasks,
×
605
                upsertActivityInfos,
×
606
                deleteActivityInfos,
×
607
                upsertTimerInfos,
×
608
                deleteTimerInfos,
×
609
                nil,
×
610
                nil,
×
611
                nil,
×
612
                nil,
×
613
                nil,
×
614
                nil,
×
615
                nil,
×
616
                nil,
×
617
        )
×
618
}
×
619

620
// UpdateWorkflowExecutionAndFinish is a utility method to update workflow execution
621
func (s *TestBase) UpdateWorkflowExecutionAndFinish(
622
        ctx context.Context,
623
        updatedInfo *persistence.WorkflowExecutionInfo,
624
        updatedStats *persistence.ExecutionStats,
625
        condition int64,
626
        versionHistories *persistence.VersionHistories,
627
) error {
×
628
        transferTasks := []persistence.Task{}
×
629
        transferTasks = append(transferTasks, &persistence.CloseExecutionTask{TaskID: s.GetNextSequenceNumber()})
×
630
        _, err := s.ExecutionManager.UpdateWorkflowExecution(ctx, &persistence.UpdateWorkflowExecutionRequest{
×
631
                RangeID: s.ShardInfo.RangeID,
×
632
                UpdateWorkflowMutation: persistence.WorkflowMutation{
×
633
                        ExecutionInfo:       updatedInfo,
×
634
                        ExecutionStats:      updatedStats,
×
635
                        TransferTasks:       transferTasks,
×
636
                        TimerTasks:          nil,
×
637
                        Condition:           condition,
×
638
                        UpsertActivityInfos: nil,
×
639
                        DeleteActivityInfos: nil,
×
640
                        UpsertTimerInfos:    nil,
×
641
                        DeleteTimerInfos:    nil,
×
642
                        VersionHistories:    versionHistories,
×
643
                },
×
644
                Encoding: pickRandomEncoding(),
×
645
                //To DO: next PR for UpdateWorkflowExecution
×
646
                //DomainName: s.DomainManager.GetName(),
×
647
        })
×
648
        return err
×
649
}
×
650

651
// UpsertChildExecutionsState is a utility method to update mutable state of workflow execution
652
func (s *TestBase) UpsertChildExecutionsState(
653
        ctx context.Context,
654
        updatedInfo *persistence.WorkflowExecutionInfo,
655
        updatedStats *persistence.ExecutionStats,
656
        updatedVersionHistories *persistence.VersionHistories,
657
        condition int64,
658
        upsertChildInfos []*persistence.ChildExecutionInfo,
659
) error {
×
660

×
661
        return s.UpdateWorkflowExecutionWithRangeID(
×
662
                ctx,
×
663
                updatedInfo,
×
664
                updatedStats,
×
665
                updatedVersionHistories,
×
666
                nil,
×
667
                nil,
×
668
                s.ShardInfo.RangeID,
×
669
                condition,
×
670
                nil,
×
671
                nil,
×
672
                nil,
×
673
                nil,
×
674
                nil,
×
675
                upsertChildInfos,
×
676
                nil,
×
677
                nil,
×
678
                nil,
×
679
                nil,
×
680
                nil,
×
681
                nil,
×
682
                nil,
×
683
        )
×
684
}
×
685

686
// UpsertRequestCancelState is a utility method to update mutable state of workflow execution
687
func (s *TestBase) UpsertRequestCancelState(
688
        ctx context.Context,
689
        updatedInfo *persistence.WorkflowExecutionInfo,
690
        updatedStats *persistence.ExecutionStats,
691
        updatedVersionHistories *persistence.VersionHistories,
692
        condition int64,
693
        upsertCancelInfos []*persistence.RequestCancelInfo,
694
) error {
×
695

×
696
        return s.UpdateWorkflowExecutionWithRangeID(
×
697
                ctx,
×
698
                updatedInfo,
×
699
                updatedStats,
×
700
                updatedVersionHistories,
×
701
                nil,
×
702
                nil,
×
703
                s.ShardInfo.RangeID,
×
704
                condition,
×
705
                nil,
×
706
                nil,
×
707
                nil,
×
708
                nil,
×
709
                nil,
×
710
                nil,
×
711
                nil,
×
712
                upsertCancelInfos,
×
713
                nil,
×
714
                nil,
×
715
                nil,
×
716
                nil,
×
717
                nil,
×
718
        )
×
719
}
×
720

721
// UpsertSignalInfoState is a utility method to update mutable state of workflow execution
722
func (s *TestBase) UpsertSignalInfoState(
723
        ctx context.Context,
724
        updatedInfo *persistence.WorkflowExecutionInfo,
725
        updatedStats *persistence.ExecutionStats,
726
        updatedVersionHistories *persistence.VersionHistories,
727
        condition int64,
728
        upsertSignalInfos []*persistence.SignalInfo,
729
) error {
×
730

×
731
        return s.UpdateWorkflowExecutionWithRangeID(
×
732
                ctx,
×
733
                updatedInfo,
×
734
                updatedStats,
×
735
                updatedVersionHistories,
×
736
                nil,
×
737
                nil,
×
738
                s.ShardInfo.RangeID,
×
739
                condition,
×
740
                nil,
×
741
                nil,
×
742
                nil,
×
743
                nil,
×
744
                nil,
×
745
                nil,
×
746
                nil,
×
747
                nil,
×
748
                nil,
×
749
                upsertSignalInfos,
×
750
                nil,
×
751
                nil,
×
752
                nil,
×
753
        )
×
754
}
×
755

756
// UpsertSignalsRequestedState is a utility method to update mutable state of workflow execution
757
func (s *TestBase) UpsertSignalsRequestedState(
758
        ctx context.Context,
759
        updatedInfo *persistence.WorkflowExecutionInfo,
760
        updatedStats *persistence.ExecutionStats,
761
        updatedVersionHistories *persistence.VersionHistories,
762
        condition int64,
763
        upsertSignalsRequested []string,
764
) error {
×
765
        return s.UpdateWorkflowExecutionWithRangeID(
×
766
                ctx,
×
767
                updatedInfo,
×
768
                updatedStats,
×
769
                updatedVersionHistories,
×
770
                nil,
×
771
                nil,
×
772
                s.ShardInfo.RangeID,
×
773
                condition,
×
774
                nil,
×
775
                nil,
×
776
                nil,
×
777
                nil,
×
778
                nil,
×
779
                nil,
×
780
                nil,
×
781
                nil,
×
782
                nil,
×
783
                nil,
×
784
                nil,
×
785
                upsertSignalsRequested,
×
786
                nil,
×
787
        )
×
788
}
×
789

790
// DeleteChildExecutionsState is a utility method to delete child execution from mutable state
791
func (s *TestBase) DeleteChildExecutionsState(
792
        ctx context.Context,
793
        updatedInfo *persistence.WorkflowExecutionInfo,
794
        updatedStats *persistence.ExecutionStats,
795
        updatedVersionHistories *persistence.VersionHistories,
796
        condition int64,
797
        deleteChildInfo int64,
798
) error {
×
799
        return s.UpdateWorkflowExecutionWithRangeID(
×
800
                ctx,
×
801
                updatedInfo,
×
802
                updatedStats,
×
803
                updatedVersionHistories,
×
804
                nil,
×
805
                nil,
×
806
                s.ShardInfo.RangeID,
×
807
                condition,
×
808
                nil,
×
809
                nil,
×
810
                nil,
×
811
                nil,
×
812
                nil,
×
813
                nil,
×
814
                []int64{deleteChildInfo},
×
815
                nil,
×
816
                nil,
×
817
                nil,
×
818
                nil,
×
819
                nil,
×
820
                nil,
×
821
        )
×
822
}
×
823

824
// DeleteCancelState is a utility method to delete request cancel state from mutable state
825
func (s *TestBase) DeleteCancelState(
826
        ctx context.Context,
827
        updatedInfo *persistence.WorkflowExecutionInfo,
828
        updatedStats *persistence.ExecutionStats,
829
        updatedVersionHistories *persistence.VersionHistories,
830
        condition int64,
831
        deleteCancelInfo int64,
832
) error {
×
833
        return s.UpdateWorkflowExecutionWithRangeID(
×
834
                ctx,
×
835
                updatedInfo,
×
836
                updatedStats,
×
837
                updatedVersionHistories,
×
838
                nil,
×
839
                nil,
×
840
                s.ShardInfo.RangeID,
×
841
                condition,
×
842
                nil,
×
843
                nil,
×
844
                nil,
×
845
                nil,
×
846
                nil,
×
847
                nil,
×
848
                nil,
×
849
                nil,
×
850
                []int64{deleteCancelInfo},
×
851
                nil,
×
852
                nil,
×
853
                nil,
×
854
                nil,
×
855
        )
×
856
}
×
857

858
// DeleteSignalState is a utility method to delete request cancel state from mutable state
859
func (s *TestBase) DeleteSignalState(
860
        ctx context.Context,
861
        updatedInfo *persistence.WorkflowExecutionInfo,
862
        updatedStats *persistence.ExecutionStats,
863
        updatedVersionHistories *persistence.VersionHistories,
864
        condition int64,
865
        deleteSignalInfo int64,
866
) error {
×
867
        return s.UpdateWorkflowExecutionWithRangeID(
×
868
                ctx,
×
869
                updatedInfo,
×
870
                updatedStats,
×
871
                updatedVersionHistories,
×
872
                nil,
×
873
                nil,
×
874
                s.ShardInfo.RangeID,
×
875
                condition,
×
876
                nil,
×
877
                nil,
×
878
                nil,
×
879
                nil,
×
880
                nil,
×
881
                nil,
×
882
                nil,
×
883
                nil,
×
884
                nil,
×
885
                nil,
×
886
                []int64{deleteSignalInfo},
×
887
                nil,
×
888
                nil,
×
889
        )
×
890
}
×
891

892
// DeleteSignalsRequestedState is a utility method to delete mutable state of workflow execution
893
func (s *TestBase) DeleteSignalsRequestedState(
894
        ctx context.Context,
895
        updatedInfo *persistence.WorkflowExecutionInfo,
896
        updatedStats *persistence.ExecutionStats,
897
        updatedVersionHistories *persistence.VersionHistories,
898
        condition int64,
899
        deleteSignalsRequestedIDs []string,
900
) error {
×
901
        return s.UpdateWorkflowExecutionWithRangeID(
×
902
                ctx,
×
903
                updatedInfo,
×
904
                updatedStats,
×
905
                updatedVersionHistories,
×
906
                nil,
×
907
                nil,
×
908
                s.ShardInfo.RangeID,
×
909
                condition,
×
910
                nil,
×
911
                nil,
×
912
                nil,
×
913
                nil,
×
914
                nil,
×
915
                nil,
×
916
                nil,
×
917
                nil,
×
918
                nil,
×
919
                nil,
×
920
                nil,
×
921
                nil,
×
922
                deleteSignalsRequestedIDs,
×
923
        )
×
924
}
×
925

926
// UpdateWorklowStateAndReplication is a utility method to update workflow execution
927
func (s *TestBase) UpdateWorklowStateAndReplication(
928
        ctx context.Context,
929
        updatedInfo *persistence.WorkflowExecutionInfo,
930
        updatedStats *persistence.ExecutionStats,
931
        updatedVersionHistories *persistence.VersionHistories,
932
        condition int64,
933
        txTasks []persistence.Task,
934
) error {
×
935

×
936
        return s.UpdateWorkflowExecutionWithReplication(
×
937
                ctx,
×
938
                updatedInfo,
×
939
                updatedStats,
×
940
                updatedVersionHistories,
×
941
                nil,
×
942
                nil,
×
943
                s.ShardInfo.RangeID,
×
944
                condition,
×
945
                nil,
×
946
                txTasks,
×
947
                nil,
×
948
                nil,
×
949
                nil,
×
950
                nil,
×
951
                nil,
×
952
                nil,
×
953
                nil,
×
954
                nil,
×
955
                nil,
×
956
                nil,
×
957
                nil,
×
958
                nil,
×
959
        )
×
960
}
×
961

962
// UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution
963
func (s *TestBase) UpdateWorkflowExecutionWithRangeID(
964
        ctx context.Context,
965
        updatedInfo *persistence.WorkflowExecutionInfo,
966
        updatedStats *persistence.ExecutionStats,
967
        updatedVersionHistories *persistence.VersionHistories,
968
        decisionScheduleIDs []int64,
969
        activityScheduleIDs []int64,
970
        rangeID int64,
971
        condition int64,
972
        timerTasks []persistence.Task,
973
        upsertActivityInfos []*persistence.ActivityInfo,
974
        deleteActivityInfos []int64,
975
        upsertTimerInfos []*persistence.TimerInfo,
976
        deleteTimerInfos []string,
977
        upsertChildInfos []*persistence.ChildExecutionInfo,
978
        deleteChildInfos []int64,
979
        upsertCancelInfos []*persistence.RequestCancelInfo,
980
        deleteCancelInfos []int64,
981
        upsertSignalInfos []*persistence.SignalInfo,
982
        deleteSignalInfos []int64,
983
        upsertSignalRequestedIDs []string,
984
        deleteSignalRequestedIDs []string,
985
) error {
×
986
        return s.UpdateWorkflowExecutionWithReplication(
×
987
                ctx,
×
988
                updatedInfo,
×
989
                updatedStats,
×
990
                updatedVersionHistories,
×
991
                decisionScheduleIDs,
×
992
                activityScheduleIDs,
×
993
                rangeID,
×
994
                condition,
×
995
                timerTasks,
×
996
                []persistence.Task{},
×
997
                upsertActivityInfos,
×
998
                deleteActivityInfos,
×
999
                upsertTimerInfos,
×
1000
                deleteTimerInfos,
×
1001
                upsertChildInfos,
×
1002
                deleteChildInfos,
×
1003
                upsertCancelInfos,
×
1004
                deleteCancelInfos,
×
1005
                upsertSignalInfos,
×
1006
                deleteSignalInfos,
×
1007
                upsertSignalRequestedIDs,
×
1008
                deleteSignalRequestedIDs,
×
1009
        )
×
1010
}
×
1011

1012
// UpdateWorkflowExecutionWithReplication is a utility method to update workflow execution
1013
func (s *TestBase) UpdateWorkflowExecutionWithReplication(
1014
        ctx context.Context,
1015
        updatedInfo *persistence.WorkflowExecutionInfo,
1016
        updatedStats *persistence.ExecutionStats,
1017
        updatedVersionHistories *persistence.VersionHistories,
1018
        decisionScheduleIDs []int64,
1019
        activityScheduleIDs []int64,
1020
        rangeID int64,
1021
        condition int64,
1022
        timerTasks []persistence.Task,
1023
        txTasks []persistence.Task,
1024
        upsertActivityInfos []*persistence.ActivityInfo,
1025
        deleteActivityInfos []int64,
1026
        upsertTimerInfos []*persistence.TimerInfo,
1027
        deleteTimerInfos []string,
1028
        upsertChildInfos []*persistence.ChildExecutionInfo,
1029
        deleteChildInfos []int64,
1030
        upsertCancelInfos []*persistence.RequestCancelInfo,
1031
        deleteCancelInfos []int64,
1032
        upsertSignalInfos []*persistence.SignalInfo,
1033
        deleteSignalInfos []int64,
1034
        upsertSignalRequestedIDs []string,
1035
        deleteSignalRequestedIDs []string,
1036
) error {
×
1037

×
1038
        // TODO: use separate fields for those three task types
×
1039
        var transferTasks []persistence.Task
×
1040
        var crossClusterTasks []persistence.Task
×
1041
        var replicationTasks []persistence.Task
×
1042
        for _, task := range txTasks {
×
1043
                switch t := task.(type) {
×
1044
                case *persistence.DecisionTask,
1045
                        *persistence.ActivityTask,
1046
                        *persistence.CloseExecutionTask,
1047
                        *persistence.RecordWorkflowClosedTask,
1048
                        *persistence.RecordChildExecutionCompletedTask,
1049
                        *persistence.ApplyParentClosePolicyTask,
1050
                        *persistence.CancelExecutionTask,
1051
                        *persistence.StartChildExecutionTask,
1052
                        *persistence.SignalExecutionTask,
1053
                        *persistence.RecordWorkflowStartedTask,
1054
                        *persistence.ResetWorkflowTask,
1055
                        *persistence.UpsertWorkflowSearchAttributesTask:
×
1056
                        transferTasks = append(transferTasks, t)
×
1057
                case *persistence.CrossClusterStartChildExecutionTask,
1058
                        *persistence.CrossClusterCancelExecutionTask,
1059
                        *persistence.CrossClusterSignalExecutionTask,
1060
                        *persistence.CrossClusterRecordChildExecutionCompletedTask,
1061
                        *persistence.CrossClusterApplyParentClosePolicyTask:
×
1062
                        crossClusterTasks = append(crossClusterTasks, t)
×
1063
                case *persistence.HistoryReplicationTask, *persistence.SyncActivityTask:
×
1064
                        replicationTasks = append(replicationTasks, t)
×
1065
                default:
×
1066
                        panic(fmt.Sprintf("Unknown transfer task type. %v", t))
×
1067
                }
1068
        }
1069
        for _, decisionScheduleID := range decisionScheduleIDs {
×
1070
                transferTasks = append(transferTasks, &persistence.DecisionTask{
×
1071
                        TaskID:     s.GetNextSequenceNumber(),
×
1072
                        DomainID:   updatedInfo.DomainID,
×
1073
                        TaskList:   updatedInfo.TaskList,
×
1074
                        ScheduleID: int64(decisionScheduleID)})
×
1075
        }
×
1076

1077
        for _, activityScheduleID := range activityScheduleIDs {
×
1078
                transferTasks = append(transferTasks, &persistence.ActivityTask{
×
1079
                        TaskID:     s.GetNextSequenceNumber(),
×
1080
                        DomainID:   updatedInfo.DomainID,
×
1081
                        TaskList:   updatedInfo.TaskList,
×
1082
                        ScheduleID: int64(activityScheduleID)})
×
1083
        }
×
1084
        _, err := s.ExecutionManager.UpdateWorkflowExecution(ctx, &persistence.UpdateWorkflowExecutionRequest{
×
1085
                RangeID: rangeID,
×
1086
                UpdateWorkflowMutation: persistence.WorkflowMutation{
×
1087
                        ExecutionInfo:    updatedInfo,
×
1088
                        ExecutionStats:   updatedStats,
×
1089
                        VersionHistories: updatedVersionHistories,
×
1090

×
1091
                        UpsertActivityInfos:       upsertActivityInfos,
×
1092
                        DeleteActivityInfos:       deleteActivityInfos,
×
1093
                        UpsertTimerInfos:          upsertTimerInfos,
×
1094
                        DeleteTimerInfos:          deleteTimerInfos,
×
1095
                        UpsertChildExecutionInfos: upsertChildInfos,
×
1096
                        DeleteChildExecutionInfos: deleteChildInfos,
×
1097
                        UpsertRequestCancelInfos:  upsertCancelInfos,
×
1098
                        DeleteRequestCancelInfos:  deleteCancelInfos,
×
1099
                        UpsertSignalInfos:         upsertSignalInfos,
×
1100
                        DeleteSignalInfos:         deleteSignalInfos,
×
1101
                        UpsertSignalRequestedIDs:  upsertSignalRequestedIDs,
×
1102
                        DeleteSignalRequestedIDs:  deleteSignalRequestedIDs,
×
1103

×
1104
                        TransferTasks:     transferTasks,
×
1105
                        CrossClusterTasks: crossClusterTasks,
×
1106
                        ReplicationTasks:  replicationTasks,
×
1107
                        TimerTasks:        timerTasks,
×
1108

×
1109
                        Condition: condition,
×
1110
                        Checksum:  testWorkflowChecksum,
×
1111
                },
×
1112
                Encoding: pickRandomEncoding(),
×
1113
        })
×
1114
        return err
×
1115
}
1116

1117
// UpdateWorkflowExecutionTasks is a utility method to update workflow tasks
1118
// with IgnoreCurrent update mode.
1119
func (s *TestBase) UpdateWorkflowExecutionTasks(
1120
        ctx context.Context,
1121
        updatedInfo *persistence.WorkflowExecutionInfo,
1122
        updatedStats *persistence.ExecutionStats,
1123
        condition int64,
1124
        transferTasks []persistence.Task,
1125
        timerTasks []persistence.Task,
1126
        crossClusterTasks []persistence.Task,
1127
) error {
×
1128
        _, err := s.ExecutionManager.UpdateWorkflowExecution(ctx, &persistence.UpdateWorkflowExecutionRequest{
×
1129
                Mode: persistence.UpdateWorkflowModeIgnoreCurrent,
×
1130
                UpdateWorkflowMutation: persistence.WorkflowMutation{
×
1131
                        ExecutionInfo:     updatedInfo,
×
1132
                        ExecutionStats:    updatedStats,
×
1133
                        TransferTasks:     transferTasks,
×
1134
                        TimerTasks:        timerTasks,
×
1135
                        CrossClusterTasks: crossClusterTasks,
×
1136
                        Condition:         condition,
×
1137
                },
×
1138
                RangeID:  s.ShardInfo.RangeID,
×
1139
                Encoding: pickRandomEncoding(),
×
1140
        })
×
1141
        return err
×
1142
}
×
1143

1144
// UpdateWorkflowExecutionWithTransferTasks is a utility method to update workflow execution
1145
func (s *TestBase) UpdateWorkflowExecutionWithTransferTasks(
1146
        ctx context.Context,
1147
        updatedInfo *persistence.WorkflowExecutionInfo,
1148
        updatedStats *persistence.ExecutionStats,
1149
        condition int64,
1150
        transferTasks []persistence.Task,
1151
        upsertActivityInfo []*persistence.ActivityInfo,
1152
        versionHistories *persistence.VersionHistories,
1153
) error {
×
1154

×
1155
        _, err := s.ExecutionManager.UpdateWorkflowExecution(ctx, &persistence.UpdateWorkflowExecutionRequest{
×
1156
                UpdateWorkflowMutation: persistence.WorkflowMutation{
×
1157
                        ExecutionInfo:       updatedInfo,
×
1158
                        ExecutionStats:      updatedStats,
×
1159
                        TransferTasks:       transferTasks,
×
1160
                        Condition:           condition,
×
1161
                        UpsertActivityInfos: upsertActivityInfo,
×
1162
                        VersionHistories:    versionHistories,
×
1163
                },
×
1164
                RangeID:  s.ShardInfo.RangeID,
×
1165
                Encoding: pickRandomEncoding(),
×
1166
        })
×
1167
        return err
×
1168
}
×
1169

1170
// UpdateWorkflowExecutionForChildExecutionsInitiated is a utility method to update workflow execution
1171
func (s *TestBase) UpdateWorkflowExecutionForChildExecutionsInitiated(
1172
        ctx context.Context,
1173
        updatedInfo *persistence.WorkflowExecutionInfo, updatedStats *persistence.ExecutionStats, condition int64, transferTasks []persistence.Task, childInfos []*persistence.ChildExecutionInfo) error {
×
1174
        _, err := s.ExecutionManager.UpdateWorkflowExecution(ctx, &persistence.UpdateWorkflowExecutionRequest{
×
1175
                UpdateWorkflowMutation: persistence.WorkflowMutation{
×
1176
                        ExecutionInfo:             updatedInfo,
×
1177
                        ExecutionStats:            updatedStats,
×
1178
                        TransferTasks:             transferTasks,
×
1179
                        Condition:                 condition,
×
1180
                        UpsertChildExecutionInfos: childInfos,
×
1181
                },
×
1182
                RangeID:  s.ShardInfo.RangeID,
×
1183
                Encoding: pickRandomEncoding(),
×
1184
        })
×
1185
        return err
×
1186
}
×
1187

1188
// UpdateWorkflowExecutionForRequestCancel is a utility method to update workflow execution
1189
func (s *TestBase) UpdateWorkflowExecutionForRequestCancel(
1190
        ctx context.Context,
1191
        updatedInfo *persistence.WorkflowExecutionInfo, updatedStats *persistence.ExecutionStats, condition int64, transferTasks []persistence.Task,
1192
        upsertRequestCancelInfo []*persistence.RequestCancelInfo) error {
×
1193
        _, err := s.ExecutionManager.UpdateWorkflowExecution(ctx, &persistence.UpdateWorkflowExecutionRequest{
×
1194
                UpdateWorkflowMutation: persistence.WorkflowMutation{
×
1195
                        ExecutionInfo:            updatedInfo,
×
1196
                        ExecutionStats:           updatedStats,
×
1197
                        TransferTasks:            transferTasks,
×
1198
                        Condition:                condition,
×
1199
                        UpsertRequestCancelInfos: upsertRequestCancelInfo,
×
1200
                },
×
1201
                RangeID:  s.ShardInfo.RangeID,
×
1202
                Encoding: pickRandomEncoding(),
×
1203
        })
×
1204
        return err
×
1205
}
×
1206

1207
// UpdateWorkflowExecutionForSignal is a utility method to update workflow execution
1208
func (s *TestBase) UpdateWorkflowExecutionForSignal(
1209
        ctx context.Context,
1210
        updatedInfo *persistence.WorkflowExecutionInfo, updatedStats *persistence.ExecutionStats, condition int64, transferTasks []persistence.Task,
1211
        upsertSignalInfos []*persistence.SignalInfo) error {
×
1212
        _, err := s.ExecutionManager.UpdateWorkflowExecution(ctx, &persistence.UpdateWorkflowExecutionRequest{
×
1213
                UpdateWorkflowMutation: persistence.WorkflowMutation{
×
1214
                        ExecutionInfo:     updatedInfo,
×
1215
                        ExecutionStats:    updatedStats,
×
1216
                        TransferTasks:     transferTasks,
×
1217
                        Condition:         condition,
×
1218
                        UpsertSignalInfos: upsertSignalInfos,
×
1219
                },
×
1220
                RangeID:  s.ShardInfo.RangeID,
×
1221
                Encoding: pickRandomEncoding(),
×
1222
        })
×
1223
        return err
×
1224
}
×
1225

1226
// UpdateWorkflowExecutionForBufferEvents is a utility method to update workflow execution
1227
func (s *TestBase) UpdateWorkflowExecutionForBufferEvents(
1228
        ctx context.Context,
1229
        updatedInfo *persistence.WorkflowExecutionInfo,
1230
        updatedStats *persistence.ExecutionStats,
1231
        condition int64,
1232
        bufferEvents []*types.HistoryEvent,
1233
        clearBufferedEvents bool,
1234
        versionHistories *persistence.VersionHistories,
1235
) error {
×
1236

×
1237
        _, err := s.ExecutionManager.UpdateWorkflowExecution(ctx, &persistence.UpdateWorkflowExecutionRequest{
×
1238
                UpdateWorkflowMutation: persistence.WorkflowMutation{
×
1239
                        ExecutionInfo:       updatedInfo,
×
1240
                        ExecutionStats:      updatedStats,
×
1241
                        NewBufferedEvents:   bufferEvents,
×
1242
                        Condition:           condition,
×
1243
                        ClearBufferedEvents: clearBufferedEvents,
×
1244
                        VersionHistories:    versionHistories,
×
1245
                },
×
1246
                RangeID:  s.ShardInfo.RangeID,
×
1247
                Encoding: pickRandomEncoding(),
×
1248
        })
×
1249
        return err
×
1250
}
×
1251

1252
// UpdateAllMutableState is a utility method to update workflow execution
1253
func (s *TestBase) UpdateAllMutableState(ctx context.Context, updatedMutableState *persistence.WorkflowMutableState, condition int64) error {
×
1254
        var aInfos []*persistence.ActivityInfo
×
1255
        for _, ai := range updatedMutableState.ActivityInfos {
×
1256
                aInfos = append(aInfos, ai)
×
1257
        }
×
1258

1259
        var tInfos []*persistence.TimerInfo
×
1260
        for _, ti := range updatedMutableState.TimerInfos {
×
1261
                tInfos = append(tInfos, ti)
×
1262
        }
×
1263

1264
        var cInfos []*persistence.ChildExecutionInfo
×
1265
        for _, ci := range updatedMutableState.ChildExecutionInfos {
×
1266
                cInfos = append(cInfos, ci)
×
1267
        }
×
1268

1269
        var rcInfos []*persistence.RequestCancelInfo
×
1270
        for _, rci := range updatedMutableState.RequestCancelInfos {
×
1271
                rcInfos = append(rcInfos, rci)
×
1272
        }
×
1273

1274
        var sInfos []*persistence.SignalInfo
×
1275
        for _, si := range updatedMutableState.SignalInfos {
×
1276
                sInfos = append(sInfos, si)
×
1277
        }
×
1278

1279
        var srIDs []string
×
1280
        for id := range updatedMutableState.SignalRequestedIDs {
×
1281
                srIDs = append(srIDs, id)
×
1282
        }
×
1283
        _, err := s.ExecutionManager.UpdateWorkflowExecution(ctx, &persistence.UpdateWorkflowExecutionRequest{
×
1284
                RangeID: s.ShardInfo.RangeID,
×
1285
                UpdateWorkflowMutation: persistence.WorkflowMutation{
×
1286
                        ExecutionInfo:             updatedMutableState.ExecutionInfo,
×
1287
                        ExecutionStats:            updatedMutableState.ExecutionStats,
×
1288
                        Condition:                 condition,
×
1289
                        UpsertActivityInfos:       aInfos,
×
1290
                        UpsertTimerInfos:          tInfos,
×
1291
                        UpsertChildExecutionInfos: cInfos,
×
1292
                        UpsertRequestCancelInfos:  rcInfos,
×
1293
                        UpsertSignalInfos:         sInfos,
×
1294
                        UpsertSignalRequestedIDs:  srIDs,
×
1295
                        VersionHistories:          updatedMutableState.VersionHistories,
×
1296
                },
×
1297
                Encoding: pickRandomEncoding(),
×
1298
        })
×
1299
        return err
×
1300
}
1301

1302
// ConflictResolveWorkflowExecution is  utility method to reset mutable state
1303
func (s *TestBase) ConflictResolveWorkflowExecution(
1304
        ctx context.Context,
1305
        info *persistence.WorkflowExecutionInfo,
1306
        stats *persistence.ExecutionStats,
1307
        nextEventID int64,
1308
        activityInfos []*persistence.ActivityInfo,
1309
        timerInfos []*persistence.TimerInfo,
1310
        childExecutionInfos []*persistence.ChildExecutionInfo,
1311
        requestCancelInfos []*persistence.RequestCancelInfo,
1312
        signalInfos []*persistence.SignalInfo,
1313
        ids []string,
1314
        versionHistories *persistence.VersionHistories,
1315
) error {
×
1316

×
1317
        _, err := s.ExecutionManager.ConflictResolveWorkflowExecution(ctx, &persistence.ConflictResolveWorkflowExecutionRequest{
×
1318
                RangeID: s.ShardInfo.RangeID,
×
1319
                ResetWorkflowSnapshot: persistence.WorkflowSnapshot{
×
1320
                        ExecutionInfo:       info,
×
1321
                        ExecutionStats:      stats,
×
1322
                        Condition:           nextEventID,
×
1323
                        ActivityInfos:       activityInfos,
×
1324
                        TimerInfos:          timerInfos,
×
1325
                        ChildExecutionInfos: childExecutionInfos,
×
1326
                        RequestCancelInfos:  requestCancelInfos,
×
1327
                        SignalInfos:         signalInfos,
×
1328
                        SignalRequestedIDs:  ids,
×
1329
                        Checksum:            testWorkflowChecksum,
×
1330
                        VersionHistories:    versionHistories,
×
1331
                },
×
1332
                Encoding: pickRandomEncoding(),
×
1333
        })
×
1334
        return err
×
1335
}
×
1336

1337
// DeleteWorkflowExecution is a utility method to delete a workflow execution
1338
func (s *TestBase) DeleteWorkflowExecution(ctx context.Context, info *persistence.WorkflowExecutionInfo) error {
×
1339
        return s.ExecutionManager.DeleteWorkflowExecution(ctx, &persistence.DeleteWorkflowExecutionRequest{
×
1340
                DomainID:   info.DomainID,
×
1341
                WorkflowID: info.WorkflowID,
×
1342
                RunID:      info.RunID,
×
1343
        })
×
1344
}
×
1345

1346
// DeleteCurrentWorkflowExecution is a utility method to delete the workflow current execution
1347
func (s *TestBase) DeleteCurrentWorkflowExecution(ctx context.Context, info *persistence.WorkflowExecutionInfo) error {
×
1348
        return s.ExecutionManager.DeleteCurrentWorkflowExecution(ctx, &persistence.DeleteCurrentWorkflowExecutionRequest{
×
1349
                DomainID:   info.DomainID,
×
1350
                WorkflowID: info.WorkflowID,
×
1351
                RunID:      info.RunID,
×
1352
        })
×
1353
}
×
1354

1355
// GetTransferTasks is a utility method to get tasks from transfer task queue
1356
func (s *TestBase) GetTransferTasks(ctx context.Context, batchSize int, getAll bool) ([]*persistence.TransferTaskInfo, error) {
×
1357
        result := []*persistence.TransferTaskInfo{}
×
1358
        var token []byte
×
1359

×
1360
Loop:
×
1361
        for {
×
1362
                response, err := s.ExecutionManager.GetTransferTasks(ctx, &persistence.GetTransferTasksRequest{
×
1363
                        ReadLevel:     0,
×
1364
                        MaxReadLevel:  math.MaxInt64,
×
1365
                        BatchSize:     batchSize,
×
1366
                        NextPageToken: token,
×
1367
                })
×
1368
                if err != nil {
×
1369
                        return nil, err
×
1370
                }
×
1371

1372
                token = response.NextPageToken
×
1373
                result = append(result, response.Tasks...)
×
1374
                if len(token) == 0 || !getAll {
×
1375
                        break Loop
×
1376
                }
1377
        }
1378

1379
        return result, nil
×
1380
}
1381

1382
// GetCrossClusterTasks is a utility method to get tasks from transfer task queue
1383
func (s *TestBase) GetCrossClusterTasks(ctx context.Context, targetCluster string, readLevel int64, batchSize int, getAll bool) ([]*persistence.CrossClusterTaskInfo, error) {
×
1384
        result := []*persistence.CrossClusterTaskInfo{}
×
1385
        var token []byte
×
1386

×
1387
        for {
×
1388
                response, err := s.ExecutionManager.GetCrossClusterTasks(ctx, &persistence.GetCrossClusterTasksRequest{
×
1389
                        TargetCluster: targetCluster,
×
1390
                        ReadLevel:     readLevel,
×
1391
                        MaxReadLevel:  int64(math.MaxInt64),
×
1392
                        BatchSize:     batchSize,
×
1393
                        NextPageToken: token,
×
1394
                })
×
1395
                if err != nil {
×
1396
                        return nil, err
×
1397
                }
×
1398

1399
                token = response.NextPageToken
×
1400
                result = append(result, response.Tasks...)
×
1401
                if len(response.NextPageToken) == 0 || !getAll {
×
1402
                        break
×
1403
                }
1404
        }
1405

1406
        return result, nil
×
1407
}
1408

1409
// GetReplicationTasks is a utility method to get tasks from replication task queue
1410
func (s *TestBase) GetReplicationTasks(ctx context.Context, batchSize int, getAll bool) ([]*persistence.ReplicationTaskInfo, error) {
×
1411
        result := []*persistence.ReplicationTaskInfo{}
×
1412
        var token []byte
×
1413

×
1414
Loop:
×
1415
        for {
×
1416
                response, err := s.ExecutionManager.GetReplicationTasks(ctx, &persistence.GetReplicationTasksRequest{
×
1417
                        ReadLevel:     0,
×
1418
                        MaxReadLevel:  math.MaxInt64,
×
1419
                        BatchSize:     batchSize,
×
1420
                        NextPageToken: token,
×
1421
                })
×
1422
                if err != nil {
×
1423
                        return nil, err
×
1424
                }
×
1425

1426
                token = response.NextPageToken
×
1427
                result = append(result, response.Tasks...)
×
1428
                if len(token) == 0 || !getAll {
×
1429
                        break Loop
×
1430
                }
1431
        }
1432

1433
        return result, nil
×
1434
}
1435

1436
// RangeCompleteReplicationTask is a utility method to complete a range of replication tasks
1437
func (s *TestBase) RangeCompleteReplicationTask(ctx context.Context, inclusiveEndTaskID int64) error {
×
1438
        for {
×
1439
                resp, err := s.ExecutionManager.RangeCompleteReplicationTask(ctx, &persistence.RangeCompleteReplicationTaskRequest{
×
1440
                        InclusiveEndTaskID: inclusiveEndTaskID,
×
1441
                        PageSize:           1,
×
1442
                })
×
1443
                if err != nil {
×
1444
                        return err
×
1445
                }
×
1446
                if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, 1) {
×
1447
                        break
×
1448
                }
1449
        }
1450
        return nil
×
1451
}
1452

1453
// PutReplicationTaskToDLQ is a utility method to insert a replication task info
1454
func (s *TestBase) PutReplicationTaskToDLQ(
1455
        ctx context.Context,
1456
        sourceCluster string,
1457
        taskInfo *persistence.ReplicationTaskInfo,
1458
) error {
×
1459

×
1460
        return s.ExecutionManager.PutReplicationTaskToDLQ(ctx, &persistence.PutReplicationTaskToDLQRequest{
×
1461
                SourceClusterName: sourceCluster,
×
1462
                TaskInfo:          taskInfo,
×
1463
        })
×
1464
}
×
1465

1466
// GetReplicationTasksFromDLQ is a utility method to read replication task info
1467
func (s *TestBase) GetReplicationTasksFromDLQ(
1468
        ctx context.Context,
1469
        sourceCluster string,
1470
        readLevel int64,
1471
        maxReadLevel int64,
1472
        pageSize int,
1473
        pageToken []byte,
1474
) (*persistence.GetReplicationTasksFromDLQResponse, error) {
×
1475

×
1476
        return s.ExecutionManager.GetReplicationTasksFromDLQ(ctx, &persistence.GetReplicationTasksFromDLQRequest{
×
1477
                SourceClusterName: sourceCluster,
×
1478
                GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
×
1479
                        ReadLevel:     readLevel,
×
1480
                        MaxReadLevel:  maxReadLevel,
×
1481
                        BatchSize:     pageSize,
×
1482
                        NextPageToken: pageToken,
×
1483
                },
×
1484
        })
×
1485
}
×
1486

1487
// GetReplicationDLQSize is a utility method to read replication dlq size
1488
func (s *TestBase) GetReplicationDLQSize(
1489
        ctx context.Context,
1490
        sourceCluster string,
1491
) (*persistence.GetReplicationDLQSizeResponse, error) {
×
1492

×
1493
        return s.ExecutionManager.GetReplicationDLQSize(ctx, &persistence.GetReplicationDLQSizeRequest{
×
1494
                SourceClusterName: sourceCluster,
×
1495
        })
×
1496
}
×
1497

1498
// DeleteReplicationTaskFromDLQ is a utility method to delete a replication task info
1499
func (s *TestBase) DeleteReplicationTaskFromDLQ(
1500
        ctx context.Context,
1501
        sourceCluster string,
1502
        taskID int64,
1503
) error {
×
1504

×
1505
        return s.ExecutionManager.DeleteReplicationTaskFromDLQ(ctx, &persistence.DeleteReplicationTaskFromDLQRequest{
×
1506
                SourceClusterName: sourceCluster,
×
1507
                TaskID:            taskID,
×
1508
        })
×
1509
}
×
1510

1511
// RangeDeleteReplicationTaskFromDLQ is a utility method to delete  replication task info
1512
func (s *TestBase) RangeDeleteReplicationTaskFromDLQ(
1513
        ctx context.Context,
1514
        sourceCluster string,
1515
        beginTaskID int64,
1516
        endTaskID int64,
1517
) error {
×
1518

×
1519
        _, err := s.ExecutionManager.RangeDeleteReplicationTaskFromDLQ(ctx, &persistence.RangeDeleteReplicationTaskFromDLQRequest{
×
1520
                SourceClusterName:    sourceCluster,
×
1521
                ExclusiveBeginTaskID: beginTaskID,
×
1522
                InclusiveEndTaskID:   endTaskID,
×
1523
        })
×
1524
        return err
×
1525
}
×
1526

1527
// CreateFailoverMarkers is a utility method to create failover markers
1528
func (s *TestBase) CreateFailoverMarkers(
1529
        ctx context.Context,
1530
        markers []*persistence.FailoverMarkerTask,
1531
) error {
×
1532

×
1533
        return s.ExecutionManager.CreateFailoverMarkerTasks(ctx, &persistence.CreateFailoverMarkersRequest{
×
1534
                RangeID: s.ShardInfo.RangeID,
×
1535
                Markers: markers,
×
1536
        })
×
1537
}
×
1538

1539
// CompleteTransferTask is a utility method to complete a transfer task
1540
func (s *TestBase) CompleteTransferTask(ctx context.Context, taskID int64) error {
×
1541

×
1542
        return s.ExecutionManager.CompleteTransferTask(ctx, &persistence.CompleteTransferTaskRequest{
×
1543
                TaskID: taskID,
×
1544
        })
×
1545
}
×
1546

1547
// RangeCompleteTransferTask is a utility method to complete a range of transfer tasks
1548
func (s *TestBase) RangeCompleteTransferTask(ctx context.Context, exclusiveBeginTaskID int64, inclusiveEndTaskID int64) error {
×
1549
        for {
×
1550
                resp, err := s.ExecutionManager.RangeCompleteTransferTask(ctx, &persistence.RangeCompleteTransferTaskRequest{
×
1551
                        ExclusiveBeginTaskID: exclusiveBeginTaskID,
×
1552
                        InclusiveEndTaskID:   inclusiveEndTaskID,
×
1553
                        PageSize:             1,
×
1554
                })
×
1555
                if err != nil {
×
1556
                        return err
×
1557
                }
×
1558
                if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, 1) {
×
1559
                        break
×
1560
                }
1561
        }
1562
        return nil
×
1563
}
1564

1565
// CompleteCrossClusterTask is a utility method to complete a cross-cluster task
1566
func (s *TestBase) CompleteCrossClusterTask(ctx context.Context, targetCluster string, taskID int64) error {
×
1567
        return s.ExecutionManager.CompleteCrossClusterTask(ctx, &persistence.CompleteCrossClusterTaskRequest{
×
1568
                TargetCluster: targetCluster,
×
1569
                TaskID:        taskID,
×
1570
        })
×
1571
}
×
1572

1573
// RangeCompleteCrossClusterTask is a utility method to complete a range of cross-cluster tasks
1574
func (s *TestBase) RangeCompleteCrossClusterTask(ctx context.Context, targetCluster string, exclusiveBeginTaskID int64, inclusiveEndTaskID int64) error {
×
1575
        for {
×
1576
                resp, err := s.ExecutionManager.RangeCompleteCrossClusterTask(ctx, &persistence.RangeCompleteCrossClusterTaskRequest{
×
1577
                        TargetCluster:        targetCluster,
×
1578
                        ExclusiveBeginTaskID: exclusiveBeginTaskID,
×
1579
                        InclusiveEndTaskID:   inclusiveEndTaskID,
×
1580
                        PageSize:             1,
×
1581
                })
×
1582
                if err != nil {
×
1583
                        return err
×
1584
                }
×
1585
                if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, 1) {
×
1586
                        break
×
1587
                }
1588
        }
1589
        return nil
×
1590
}
1591

1592
// CompleteReplicationTask is a utility method to complete a replication task
1593
func (s *TestBase) CompleteReplicationTask(ctx context.Context, taskID int64) error {
×
1594

×
1595
        return s.ExecutionManager.CompleteReplicationTask(ctx, &persistence.CompleteReplicationTaskRequest{
×
1596
                TaskID: taskID,
×
1597
        })
×
1598
}
×
1599

1600
// GetTimerIndexTasks is a utility method to get tasks from transfer task queue
1601
func (s *TestBase) GetTimerIndexTasks(ctx context.Context, batchSize int, getAll bool) ([]*persistence.TimerTaskInfo, error) {
×
1602
        result := []*persistence.TimerTaskInfo{}
×
1603
        var token []byte
×
1604

×
1605
Loop:
×
1606
        for {
×
1607
                response, err := s.ExecutionManager.GetTimerIndexTasks(ctx, &persistence.GetTimerIndexTasksRequest{
×
1608
                        MinTimestamp:  time.Time{},
×
1609
                        MaxTimestamp:  time.Unix(0, math.MaxInt64),
×
1610
                        BatchSize:     batchSize,
×
1611
                        NextPageToken: token,
×
1612
                })
×
1613
                if err != nil {
×
1614
                        return nil, err
×
1615
                }
×
1616

1617
                token = response.NextPageToken
×
1618
                result = append(result, response.Timers...)
×
1619
                if len(token) == 0 || !getAll {
×
1620
                        break Loop
×
1621
                }
1622
        }
1623

1624
        return result, nil
×
1625
}
1626

1627
// CompleteTimerTask is a utility method to complete a timer task
1628
func (s *TestBase) CompleteTimerTask(ctx context.Context, ts time.Time, taskID int64) error {
×
1629
        return s.ExecutionManager.CompleteTimerTask(ctx, &persistence.CompleteTimerTaskRequest{
×
1630
                VisibilityTimestamp: ts,
×
1631
                TaskID:              taskID,
×
1632
        })
×
1633
}
×
1634

1635
// RangeCompleteTimerTask is a utility method to complete a range of timer tasks
1636
func (s *TestBase) RangeCompleteTimerTask(ctx context.Context, inclusiveBeginTimestamp time.Time, exclusiveEndTimestamp time.Time) error {
×
1637
        for {
×
1638
                resp, err := s.ExecutionManager.RangeCompleteTimerTask(ctx, &persistence.RangeCompleteTimerTaskRequest{
×
1639
                        InclusiveBeginTimestamp: inclusiveBeginTimestamp,
×
1640
                        ExclusiveEndTimestamp:   exclusiveEndTimestamp,
×
1641
                        PageSize:                1,
×
1642
                })
×
1643
                if err != nil {
×
1644
                        return err
×
1645
                }
×
1646
                if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, 1) {
×
1647
                        break
×
1648
                }
1649
        }
1650
        return nil
×
1651
}
1652

1653
// CreateDecisionTask is a utility method to create a task
1654
func (s *TestBase) CreateDecisionTask(ctx context.Context, domainID string, workflowExecution types.WorkflowExecution, taskList string,
1655
        decisionScheduleID int64, partitionConfig map[string]string) (int64, error) {
×
1656
        leaseResponse, err := s.TaskMgr.LeaseTaskList(ctx, &persistence.LeaseTaskListRequest{
×
1657
                DomainID: domainID,
×
1658
                TaskList: taskList,
×
1659
                TaskType: persistence.TaskListTypeDecision,
×
1660
        })
×
1661
        if err != nil {
×
1662
                return 0, err
×
1663
        }
×
1664

1665
        // clearing this field since when creating task in matching we don't have the LastUpdate information
1666
        leaseResponse.TaskListInfo.LastUpdated = time.Time{}
×
1667

×
1668
        taskID := s.GetNextSequenceNumber()
×
1669
        tasks := []*persistence.CreateTaskInfo{
×
1670
                {
×
1671
                        TaskID:    taskID,
×
1672
                        Execution: workflowExecution,
×
1673
                        Data: &persistence.TaskInfo{
×
1674
                                DomainID:        domainID,
×
1675
                                WorkflowID:      workflowExecution.WorkflowID,
×
1676
                                RunID:           workflowExecution.RunID,
×
1677
                                TaskID:          taskID,
×
1678
                                ScheduleID:      decisionScheduleID,
×
1679
                                PartitionConfig: partitionConfig,
×
1680
                        },
×
1681
                },
×
1682
        }
×
1683

×
1684
        _, err = s.TaskMgr.CreateTasks(ctx, &persistence.CreateTasksRequest{
×
1685
                TaskListInfo: leaseResponse.TaskListInfo,
×
1686
                Tasks:        tasks,
×
1687
        })
×
1688

×
1689
        if err != nil {
×
1690
                return 0, err
×
1691
        }
×
1692

1693
        return taskID, err
×
1694
}
1695

1696
// CreateActivityTasks is a utility method to create tasks
1697
func (s *TestBase) CreateActivityTasks(ctx context.Context, domainID string, workflowExecution types.WorkflowExecution,
1698
        activities map[int64]string, partitionConfig map[string]string) ([]int64, error) {
×
1699

×
1700
        taskLists := make(map[string]*persistence.TaskListInfo)
×
1701
        for _, tl := range activities {
×
1702
                _, ok := taskLists[tl]
×
1703
                if !ok {
×
1704
                        resp, err := s.TaskMgr.LeaseTaskList(
×
1705
                                ctx,
×
1706
                                &persistence.LeaseTaskListRequest{DomainID: domainID, TaskList: tl, TaskType: persistence.TaskListTypeActivity})
×
1707
                        if err != nil {
×
1708
                                return []int64{}, err
×
1709
                        }
×
1710
                        taskLists[tl] = resp.TaskListInfo
×
1711
                        // clearing this field since when creating task in matching we don't have the LastUpdate information
×
1712
                        taskLists[tl].LastUpdated = time.Time{}
×
1713
                }
1714
        }
1715

1716
        var taskIDs []int64
×
1717
        for activityScheduleID, taskList := range activities {
×
1718
                taskID := s.GetNextSequenceNumber()
×
1719
                tasks := []*persistence.CreateTaskInfo{
×
1720
                        {
×
1721
                                TaskID:    taskID,
×
1722
                                Execution: workflowExecution,
×
1723
                                Data: &persistence.TaskInfo{
×
1724
                                        DomainID:               domainID,
×
1725
                                        WorkflowID:             workflowExecution.WorkflowID,
×
1726
                                        RunID:                  workflowExecution.RunID,
×
1727
                                        TaskID:                 taskID,
×
1728
                                        ScheduleID:             activityScheduleID,
×
1729
                                        ScheduleToStartTimeout: defaultScheduleToStartTimeout,
×
1730
                                        PartitionConfig:        partitionConfig,
×
1731
                                },
×
1732
                        },
×
1733
                }
×
1734
                _, err := s.TaskMgr.CreateTasks(ctx, &persistence.CreateTasksRequest{
×
1735
                        TaskListInfo: taskLists[taskList],
×
1736
                        Tasks:        tasks,
×
1737
                })
×
1738
                if err != nil {
×
1739
                        return nil, err
×
1740
                }
×
1741
                taskIDs = append(taskIDs, taskID)
×
1742
        }
1743

1744
        return taskIDs, nil
×
1745
}
1746

1747
// GetTasks is a utility method to get tasks from persistence
1748
func (s *TestBase) GetTasks(ctx context.Context, domainID, taskList string, taskType int, batchSize int) (*persistence.GetTasksResponse, error) {
×
1749
        response, err := s.TaskMgr.GetTasks(ctx, &persistence.GetTasksRequest{
×
1750
                DomainID:     domainID,
×
1751
                TaskList:     taskList,
×
1752
                TaskType:     taskType,
×
1753
                BatchSize:    batchSize,
×
1754
                MaxReadLevel: common.Int64Ptr(math.MaxInt64),
×
1755
        })
×
1756

×
1757
        if err != nil {
×
1758
                return nil, err
×
1759
        }
×
1760

1761
        return &persistence.GetTasksResponse{Tasks: response.Tasks}, nil
×
1762
}
1763

1764
// CompleteTask is a utility method to complete a task
1765
func (s *TestBase) CompleteTask(ctx context.Context, domainID, taskList string, taskType int, taskID int64, ackLevel int64) error {
×
1766
        return s.TaskMgr.CompleteTask(ctx, &persistence.CompleteTaskRequest{
×
1767
                TaskList: &persistence.TaskListInfo{
×
1768
                        DomainID: domainID,
×
1769
                        AckLevel: ackLevel,
×
1770
                        TaskType: taskType,
×
1771
                        Name:     taskList,
×
1772
                },
×
1773
                TaskID: taskID,
×
1774
        })
×
1775
}
×
1776

1777
// TearDownWorkflowStore to cleanup
1778
func (s *TestBase) TearDownWorkflowStore() {
15✔
1779
        s.ExecutionMgrFactory.Close()
15✔
1780

15✔
1781
        s.DefaultTestCluster.TearDownTestDatabase()
15✔
1782
}
15✔
1783

1784
// GetNextSequenceNumber generates a unique sequence number for can be used for transfer queue taskId
1785
func (s *TestBase) GetNextSequenceNumber() int64 {
×
1786
        taskID, _ := s.TaskIDGenerator.GenerateTransferTaskID()
×
1787
        return taskID
×
1788
}
×
1789

1790
// ClearTasks completes all transfer tasks and replication tasks
1791
func (s *TestBase) ClearTasks() {
×
1792
        s.ClearTransferQueue()
×
1793
        s.ClearReplicationQueue()
×
1794
}
×
1795

1796
// ClearTransferQueue completes all tasks in transfer queue
1797
func (s *TestBase) ClearTransferQueue() {
×
1798
        s.Logger.Info("Clearing transfer tasks", tag.ShardRangeID(s.ShardInfo.RangeID))
×
1799
        tasks, err := s.GetTransferTasks(context.Background(), 100, true)
×
1800
        if err != nil {
×
1801
                s.Logger.Fatal("Error during cleanup", tag.Error(err))
×
1802
        }
×
1803

1804
        counter := 0
×
1805
        for _, t := range tasks {
×
1806
                s.Logger.Info("Deleting transfer task with ID", tag.TaskID(t.TaskID))
×
1807
                s.NoError(s.CompleteTransferTask(context.Background(), t.TaskID))
×
1808
                counter++
×
1809
        }
×
1810

1811
        s.Logger.Info("Deleted transfer tasks.", tag.Counter(counter))
×
1812
}
1813

1814
// ClearReplicationQueue completes all tasks in replication queue
1815
func (s *TestBase) ClearReplicationQueue() {
×
1816
        s.Logger.Info("Clearing replication tasks", tag.ShardRangeID(s.ShardInfo.RangeID))
×
1817
        tasks, err := s.GetReplicationTasks(context.Background(), 100, true)
×
1818
        if err != nil {
×
1819
                s.Logger.Fatal("Error during cleanup", tag.Error(err))
×
1820
        }
×
1821

1822
        counter := 0
×
1823
        for _, t := range tasks {
×
1824
                s.Logger.Info("Deleting replication task with ID", tag.TaskID(t.TaskID))
×
1825
                s.NoError(s.CompleteReplicationTask(context.Background(), t.TaskID))
×
1826
                counter++
×
1827
        }
×
1828

1829
        s.Logger.Info("Deleted replication tasks.", tag.Counter(counter))
×
1830
}
1831

1832
// EqualTimesWithPrecision assertion that two times are equal within precision
1833
func (s *TestBase) EqualTimesWithPrecision(t1, t2 time.Time, precision time.Duration) {
×
1834
        s.True(timeComparator(t1, t2, precision),
×
1835
                "Not equal: \n"+
×
1836
                        "expected: %s\n"+
×
1837
                        "actual  : %s%s", t1, t2,
×
1838
        )
×
1839
}
×
1840

1841
// EqualTimes assertion that two times are equal within two millisecond precision
1842
func (s *TestBase) EqualTimes(t1, t2 time.Time) {
×
1843
        s.EqualTimesWithPrecision(t1, t2, TimePrecision)
×
1844
}
×
1845

1846
func (s *TestBase) validateTimeRange(t time.Time, expectedDuration time.Duration) bool {
×
1847
        currentTime := time.Now()
×
1848
        diff := time.Duration(currentTime.UnixNano() - t.UnixNano())
×
1849
        if diff > expectedDuration {
×
1850
                s.Logger.Info("Check Current time, Application time, Difference", tag.Timestamp(t), tag.CursorTimestamp(currentTime), tag.Number(int64(diff)))
×
1851
                return false
×
1852
        }
×
1853
        return true
×
1854
}
1855

1856
// GenerateTransferTaskID helper
1857
func (g *TestTransferTaskIDGenerator) GenerateTransferTaskID() (int64, error) {
×
1858
        return atomic.AddInt64(&g.seqNum, 1), nil
×
1859
}
×
1860

1861
// Publish is a utility method to add messages to the queue
1862
func (s *TestBase) Publish(
1863
        ctx context.Context,
1864
        messagePayload []byte,
1865
) error {
×
1866

×
1867
        retryPolicy := backoff.NewExponentialRetryPolicy(100 * time.Millisecond)
×
1868
        retryPolicy.SetBackoffCoefficient(1.5)
×
1869
        retryPolicy.SetMaximumAttempts(5)
×
1870

×
1871
        throttleRetry := backoff.NewThrottleRetry(
×
1872
                backoff.WithRetryPolicy(retryPolicy),
×
1873
                backoff.WithRetryableError(func(e error) bool {
×
1874
                        return persistence.IsTransientError(e) || isMessageIDConflictError(e)
×
1875
                }),
×
1876
        )
1877
        return throttleRetry.Do(ctx, func() error {
×
1878
                return s.DomainReplicationQueueMgr.EnqueueMessage(ctx, messagePayload)
×
1879
        })
×
1880
}
1881

1882
func isMessageIDConflictError(err error) bool {
×
1883
        _, ok := err.(*persistence.ConditionFailedError)
×
1884
        return ok
×
1885
}
×
1886

1887
// GetReplicationMessages is a utility method to get messages from the queue
1888
func (s *TestBase) GetReplicationMessages(
1889
        ctx context.Context,
1890
        lastMessageID int64,
1891
        maxCount int,
1892
) ([]*persistence.QueueMessage, error) {
×
1893

×
1894
        return s.DomainReplicationQueueMgr.ReadMessages(ctx, lastMessageID, maxCount)
×
1895
}
×
1896

1897
// UpdateAckLevel updates replication queue ack level
1898
func (s *TestBase) UpdateAckLevel(
1899
        ctx context.Context,
1900
        lastProcessedMessageID int64,
1901
        clusterName string,
1902
) error {
×
1903

×
1904
        return s.DomainReplicationQueueMgr.UpdateAckLevel(ctx, lastProcessedMessageID, clusterName)
×
1905
}
×
1906

1907
// GetAckLevels returns replication queue ack levels
1908
func (s *TestBase) GetAckLevels(
1909
        ctx context.Context,
1910
) (map[string]int64, error) {
×
1911
        return s.DomainReplicationQueueMgr.GetAckLevels(ctx)
×
1912
}
×
1913

1914
// PublishToDomainDLQ is a utility method to add messages to the domain DLQ
1915
func (s *TestBase) PublishToDomainDLQ(
1916
        ctx context.Context,
1917
        messagePayload []byte,
1918
) error {
×
1919

×
1920
        retryPolicy := backoff.NewExponentialRetryPolicy(100 * time.Millisecond)
×
1921
        retryPolicy.SetBackoffCoefficient(1.5)
×
1922
        retryPolicy.SetMaximumAttempts(5)
×
1923

×
1924
        throttleRetry := backoff.NewThrottleRetry(
×
1925
                backoff.WithRetryPolicy(retryPolicy),
×
1926
                backoff.WithRetryableError(func(e error) bool {
×
1927
                        return persistence.IsTransientError(e) || isMessageIDConflictError(e)
×
1928
                }),
×
1929
        )
1930
        return throttleRetry.Do(ctx, func() error {
×
1931
                return s.DomainReplicationQueueMgr.EnqueueMessageToDLQ(ctx, messagePayload)
×
1932
        })
×
1933
}
1934

1935
// GetMessagesFromDomainDLQ is a utility method to get messages from the domain DLQ
1936
func (s *TestBase) GetMessagesFromDomainDLQ(
1937
        ctx context.Context,
1938
        firstMessageID int64,
1939
        lastMessageID int64,
1940
        pageSize int,
1941
        pageToken []byte,
1942
) ([]*persistence.QueueMessage, []byte, error) {
×
1943

×
1944
        return s.DomainReplicationQueueMgr.ReadMessagesFromDLQ(
×
1945
                ctx,
×
1946
                firstMessageID,
×
1947
                lastMessageID,
×
1948
                pageSize,
×
1949
                pageToken,
×
1950
        )
×
1951
}
×
1952

1953
// UpdateDomainDLQAckLevel updates domain dlq ack level
1954
func (s *TestBase) UpdateDomainDLQAckLevel(
1955
        ctx context.Context,
1956
        lastProcessedMessageID int64,
1957
        clusterName string,
1958
) error {
×
1959

×
1960
        return s.DomainReplicationQueueMgr.UpdateDLQAckLevel(ctx, lastProcessedMessageID, clusterName)
×
1961
}
×
1962

1963
// GetDomainDLQAckLevel returns domain dlq ack level
1964
func (s *TestBase) GetDomainDLQAckLevel(
1965
        ctx context.Context,
1966
) (map[string]int64, error) {
×
1967
        return s.DomainReplicationQueueMgr.GetDLQAckLevels(ctx)
×
1968
}
×
1969

1970
// GetDomainDLQSize returns domain dlq size
1971
func (s *TestBase) GetDomainDLQSize(
1972
        ctx context.Context,
1973
) (int64, error) {
×
1974
        return s.DomainReplicationQueueMgr.GetDLQSize(ctx)
×
1975
}
×
1976

1977
// DeleteMessageFromDomainDLQ deletes one message from domain DLQ
1978
func (s *TestBase) DeleteMessageFromDomainDLQ(
1979
        ctx context.Context,
1980
        messageID int64,
1981
) error {
×
1982

×
1983
        return s.DomainReplicationQueueMgr.DeleteMessageFromDLQ(ctx, messageID)
×
1984
}
×
1985

1986
// RangeDeleteMessagesFromDomainDLQ deletes messages from domain DLQ
1987
func (s *TestBase) RangeDeleteMessagesFromDomainDLQ(
1988
        ctx context.Context,
1989
        firstMessageID int64,
1990
        lastMessageID int64,
1991
) error {
×
1992

×
1993
        return s.DomainReplicationQueueMgr.RangeDeleteMessagesFromDLQ(ctx, firstMessageID, lastMessageID)
×
1994
}
×
1995

1996
// GenerateTransferTaskIDs helper
1997
func (g *TestTransferTaskIDGenerator) GenerateTransferTaskIDs(number int) ([]int64, error) {
×
1998
        result := []int64{}
×
1999
        for i := 0; i < number; i++ {
×
2000
                id, err := g.GenerateTransferTaskID()
×
2001
                if err != nil {
×
2002
                        return nil, err
×
2003
                }
×
2004
                result = append(result, id)
×
2005
        }
2006
        return result, nil
×
2007
}
2008

2009
// GenerateRandomDBName helper
2010
func GenerateRandomDBName(n int) string {
12✔
2011
        rand.Seed(time.Now().UnixNano())
12✔
2012
        letterRunes := []rune("workflow")
12✔
2013
        b := make([]rune, n)
12✔
2014
        for i := range b {
132✔
2015
                b[i] = letterRunes[rand.Intn(len(letterRunes))]
120✔
2016
        }
120✔
2017
        ts := time.Now().Unix()
12✔
2018
        return fmt.Sprintf("%v_%v", ts, string(b))
12✔
2019
}
2020

2021
func pickRandomEncoding() common.EncodingType {
×
2022
        // randomly pick json/thriftrw/empty as encoding type
×
2023
        var encoding common.EncodingType
×
2024
        i := rand.Intn(3)
×
2025
        switch i {
×
2026
        case 0:
×
2027
                encoding = common.EncodingTypeJSON
×
2028
        case 1:
×
2029
                encoding = common.EncodingTypeThriftRW
×
2030
        case 2:
×
2031
                encoding = common.EncodingType("")
×
2032
        }
2033
        return encoding
×
2034
}
2035

2036
func int64Ptr(i int64) *int64 {
×
2037
        return &i
×
2038
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc