• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

08 May 2023 05:10PM UTC coverage: 57.153% (-0.07%) from 57.225%
0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

push

buildkite

GitHub
Update persistence layer to adopt idl update for isolation (#5254)

204 of 204 new or added lines in 15 files covered. (100.0%)

85781 of 150089 relevant lines covered (57.15%)

2419.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/common/persistence/persistence-tests/executionManagerTestForEventsV2.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package persistencetests
22

23
import (
24
        "context"
25
        "os"
26
        "runtime/debug"
27
        "testing"
28
        "time"
29

30
        "github.com/pborman/uuid"
31
        log "github.com/sirupsen/logrus"
32
        "github.com/stretchr/testify/require"
33

34
        "github.com/uber/cadence/common"
35
        "github.com/uber/cadence/common/checksum"
36
        p "github.com/uber/cadence/common/persistence"
37
        "github.com/uber/cadence/common/types"
38
)
39

40
type (
41
        // ExecutionManagerSuiteForEventsV2 contains matching persistence tests
42
        ExecutionManagerSuiteForEventsV2 struct {
43
                TestBase
44
                // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
45
                // not merely log an error
46
                *require.Assertions
47
        }
48
)
49

50
func failOnPanic(t *testing.T) {
×
51
        defer func() {
×
52
                r := recover()
×
53
                if r != nil {
×
54
                        t.Errorf("test panicked: %v %s", r, debug.Stack())
×
55
                        t.FailNow()
×
56
                }
×
57
        }()
58
}
59

60
// SetupSuite implementation
61
func (s *ExecutionManagerSuiteForEventsV2) SetupSuite() {
×
62
        defer failOnPanic(s.T())
×
63
        if testing.Verbose() {
×
64
                log.SetOutput(os.Stdout)
×
65
        }
×
66
}
67

68
// TearDownSuite implementation
69
func (s *ExecutionManagerSuiteForEventsV2) TearDownSuite() {
×
70
        defer failOnPanic(s.T())
×
71
        s.TearDownWorkflowStore()
×
72
}
×
73

74
// SetupTest implementation
75
func (s *ExecutionManagerSuiteForEventsV2) SetupTest() {
×
76
        defer failOnPanic(s.T())
×
77
        // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
×
78
        s.Assertions = require.New(s.T())
×
79
        s.ClearTasks()
×
80
}
×
81

82
func (s *ExecutionManagerSuiteForEventsV2) newRandomChecksum() checksum.Checksum {
×
83
        return checksum.Checksum{
×
84
                Flavor:  checksum.FlavorIEEECRC32OverThriftBinary,
×
85
                Version: 22,
×
86
                Value:   []byte(uuid.NewRandom()),
×
87
        }
×
88
}
×
89

90
func (s *ExecutionManagerSuiteForEventsV2) assertChecksumsEqual(expected checksum.Checksum, actual checksum.Checksum) {
×
91
        if !actual.Flavor.IsValid() {
×
92
                // not all stores support checksum persistence today
×
93
                // if its not supported, assert that everything is zero'd out
×
94
                expected = checksum.Checksum{}
×
95
        }
×
96
        s.EqualValues(expected, actual)
×
97
}
98

99
// TestWorkflowCreation test
100
func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreation() {
×
101
        ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
×
102
        defer cancel()
×
103

×
104
        defer failOnPanic(s.T())
×
105
        domainID := uuid.New()
×
106
        workflowExecution := types.WorkflowExecution{
×
107
                WorkflowID: "test-eventsv2-workflow",
×
108
                RunID:      "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
×
109
        }
×
110
        domainName := uuid.New()
×
111
        csum := s.newRandomChecksum()
×
112
        decisionScheduleID := int64(2)
×
113
        versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
×
114
                {decisionScheduleID, common.EmptyVersion},
×
115
        })
×
116
        versionHistories := p.NewVersionHistories(versionHistory)
×
117
        _, err0 := s.ExecutionManager.CreateWorkflowExecution(ctx, &p.CreateWorkflowExecutionRequest{
×
118
                NewWorkflowSnapshot: p.WorkflowSnapshot{
×
119
                        ExecutionInfo: &p.WorkflowExecutionInfo{
×
120
                                CreateRequestID:             uuid.New(),
×
121
                                DomainID:                    domainID,
×
122
                                WorkflowID:                  workflowExecution.GetWorkflowID(),
×
123
                                RunID:                       workflowExecution.GetRunID(),
×
124
                                FirstExecutionRunID:         workflowExecution.GetRunID(),
×
125
                                TaskList:                    "taskList",
×
126
                                WorkflowTypeName:            "wType",
×
127
                                WorkflowTimeout:             20,
×
128
                                DecisionStartToCloseTimeout: 13,
×
129
                                ExecutionContext:            nil,
×
130
                                State:                       p.WorkflowStateRunning,
×
131
                                CloseStatus:                 p.WorkflowCloseStatusNone,
×
132
                                NextEventID:                 3,
×
133
                                LastProcessedEvent:          0,
×
134
                                DecisionScheduleID:          decisionScheduleID,
×
135
                                DecisionStartedID:           common.EmptyEventID,
×
136
                                DecisionTimeout:             1,
×
137
                                BranchToken:                 []byte("branchToken1"),
×
138
                        },
×
139
                        ExecutionStats: &p.ExecutionStats{},
×
140
                        TransferTasks: []p.Task{
×
141
                                &p.DecisionTask{
×
142
                                        TaskID:              s.GetNextSequenceNumber(),
×
143
                                        DomainID:            domainID,
×
144
                                        TaskList:            "taskList",
×
145
                                        ScheduleID:          2,
×
146
                                        VisibilityTimestamp: time.Now(),
×
147
                                },
×
148
                        },
×
149
                        TimerTasks:       nil,
×
150
                        Checksum:         csum,
×
151
                        VersionHistories: versionHistories,
×
152
                },
×
153
                RangeID:    s.ShardInfo.RangeID,
×
154
                DomainName: domainName,
×
155
        })
×
156

×
157
        s.NoError(err0)
×
158

×
159
        state0, err1 := s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
×
160
        s.NoError(err1)
×
161
        info0 := state0.ExecutionInfo
×
162
        s.NotNil(info0, "Valid Workflow info expected.")
×
163
        s.Equal([]byte("branchToken1"), info0.BranchToken)
×
164
        s.assertChecksumsEqual(csum, state0.Checksum)
×
165

×
166
        updatedInfo := copyWorkflowExecutionInfo(info0)
×
167
        updatedStats := copyExecutionStats(state0.ExecutionStats)
×
168
        updatedInfo.NextEventID = int64(5)
×
169
        updatedInfo.LastProcessedEvent = int64(2)
×
170
        currentTime := time.Now().UTC()
×
171
        timerID := "id_1"
×
172
        timerInfos := []*p.TimerInfo{{
×
173
                Version:    3345,
×
174
                TimerID:    timerID,
×
175
                ExpiryTime: currentTime,
×
176
                TaskStatus: 2,
×
177
                StartedID:  5,
×
178
        }}
×
179
        updatedInfo.BranchToken = []byte("branchToken2")
×
180

×
181
        err2 := s.UpdateWorkflowExecution(ctx, updatedInfo, updatedStats, versionHistories, []int64{int64(4)}, nil, int64(3), nil, nil, nil, timerInfos, nil)
×
182
        s.NoError(err2)
×
183

×
184
        state, err1 := s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
×
185
        s.NoError(err1)
×
186
        s.NotNil(state, "expected valid state.")
×
187
        s.Equal(1, len(state.TimerInfos))
×
188
        s.Equal(int64(3345), state.TimerInfos[timerID].Version)
×
189
        s.Equal(timerID, state.TimerInfos[timerID].TimerID)
×
190
        s.EqualTimesWithPrecision(currentTime, state.TimerInfos[timerID].ExpiryTime, time.Millisecond*500)
×
191
        s.Equal(int64(2), state.TimerInfos[timerID].TaskStatus)
×
192
        s.Equal(int64(5), state.TimerInfos[timerID].StartedID)
×
193
        s.assertChecksumsEqual(testWorkflowChecksum, state.Checksum)
×
194

×
195
        err2 = s.UpdateWorkflowExecution(ctx, updatedInfo, updatedStats, versionHistories, nil, nil, int64(5), nil, nil, nil, nil, []string{timerID})
×
196
        s.NoError(err2)
×
197

×
198
        state, err2 = s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
×
199
        s.NoError(err2)
×
200
        s.NotNil(state, "expected valid state.")
×
201
        s.Equal(0, len(state.TimerInfos))
×
202
        info1 := state.ExecutionInfo
×
203
        s.Equal([]byte("branchToken2"), info1.BranchToken)
×
204
        s.assertChecksumsEqual(testWorkflowChecksum, state.Checksum)
×
205
}
×
206

207
// TestWorkflowCreationWithVersionHistories test
208
func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreationWithVersionHistories() {
×
209
        ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
×
210
        defer cancel()
×
211

×
212
        defer failOnPanic(s.T())
×
213
        domainID := uuid.New()
×
214
        domainName := uuid.New()
×
215
        workflowExecution := types.WorkflowExecution{
×
216
                WorkflowID: "test-eventsv2-workflow-version-history",
×
217
                RunID:      "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
×
218
        }
×
219
        versionHistory := p.NewVersionHistory(
×
220
                []byte{1},
×
221
                []*p.VersionHistoryItem{p.NewVersionHistoryItem(1, 0)},
×
222
        )
×
223
        versionHistories := p.NewVersionHistories(versionHistory)
×
224

×
225
        csum := s.newRandomChecksum()
×
226

×
227
        _, err0 := s.ExecutionManager.CreateWorkflowExecution(ctx, &p.CreateWorkflowExecutionRequest{
×
228
                RangeID: s.ShardInfo.RangeID,
×
229
                NewWorkflowSnapshot: p.WorkflowSnapshot{
×
230
                        ExecutionInfo: &p.WorkflowExecutionInfo{
×
231
                                CreateRequestID:             uuid.New(),
×
232
                                DomainID:                    domainID,
×
233
                                WorkflowID:                  workflowExecution.GetWorkflowID(),
×
234
                                RunID:                       workflowExecution.GetRunID(),
×
235
                                FirstExecutionRunID:         workflowExecution.GetRunID(),
×
236
                                TaskList:                    "taskList",
×
237
                                WorkflowTypeName:            "wType",
×
238
                                WorkflowTimeout:             20,
×
239
                                DecisionStartToCloseTimeout: 13,
×
240
                                ExecutionContext:            nil,
×
241
                                State:                       p.WorkflowStateRunning,
×
242
                                CloseStatus:                 p.WorkflowCloseStatusNone,
×
243
                                NextEventID:                 common.EmptyEventID,
×
244
                                LastProcessedEvent:          0,
×
245
                                DecisionScheduleID:          2,
×
246
                                DecisionStartedID:           common.EmptyEventID,
×
247
                                DecisionTimeout:             1,
×
248
                                BranchToken:                 nil,
×
249
                        },
×
250
                        ExecutionStats:   &p.ExecutionStats{},
×
251
                        VersionHistories: versionHistories,
×
252
                        TransferTasks: []p.Task{
×
253
                                &p.DecisionTask{
×
254
                                        TaskID:              s.GetNextSequenceNumber(),
×
255
                                        DomainID:            domainID,
×
256
                                        TaskList:            "taskList",
×
257
                                        ScheduleID:          2,
×
258
                                        VisibilityTimestamp: time.Now(),
×
259
                                },
×
260
                        },
×
261
                        TimerTasks: nil,
×
262
                        Checksum:   csum,
×
263
                },
×
264
                DomainName: domainName,
×
265
        })
×
266

×
267
        s.NoError(err0)
×
268

×
269
        state0, err1 := s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
×
270
        s.NoError(err1)
×
271
        info0 := state0.ExecutionInfo
×
272
        s.NotNil(info0, "Valid Workflow info expected.")
×
273
        s.Equal(versionHistories, state0.VersionHistories)
×
274
        s.assertChecksumsEqual(csum, state0.Checksum)
×
275

×
276
        updatedInfo := copyWorkflowExecutionInfo(info0)
×
277
        updatedStats := copyExecutionStats(state0.ExecutionStats)
×
278
        updatedInfo.LastProcessedEvent = int64(2)
×
279
        currentTime := time.Now().UTC()
×
280
        timerID := "id_1"
×
281
        timerInfos := []*p.TimerInfo{{
×
282
                Version:    3345,
×
283
                TimerID:    timerID,
×
284
                ExpiryTime: currentTime,
×
285
                TaskStatus: 2,
×
286
                StartedID:  5,
×
287
        }}
×
288
        versionHistory, err := versionHistories.GetCurrentVersionHistory()
×
289
        s.NoError(err)
×
290
        err = versionHistory.AddOrUpdateItem(p.NewVersionHistoryItem(2, 0))
×
291
        s.NoError(err)
×
292

×
293
        err2 := s.UpdateWorkflowExecution(ctx, updatedInfo, updatedStats, versionHistories, []int64{int64(4)}, nil, common.EmptyEventID, nil, nil, nil, timerInfos, nil)
×
294
        s.NoError(err2)
×
295

×
296
        state, err1 := s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
×
297
        s.NoError(err1)
×
298
        s.NotNil(state, "expected valid state.")
×
299
        s.Equal(1, len(state.TimerInfos))
×
300
        s.Equal(int64(3345), state.TimerInfos[timerID].Version)
×
301
        s.Equal(timerID, state.TimerInfos[timerID].TimerID)
×
302
        s.EqualTimesWithPrecision(currentTime, state.TimerInfos[timerID].ExpiryTime, time.Millisecond*500)
×
303
        s.Equal(int64(2), state.TimerInfos[timerID].TaskStatus)
×
304
        s.Equal(int64(5), state.TimerInfos[timerID].StartedID)
×
305
        s.Equal(state.VersionHistories, versionHistories)
×
306
        s.assertChecksumsEqual(testWorkflowChecksum, state.Checksum)
×
307
}
×
308

309
// TestContinueAsNew test
310
func (s *ExecutionManagerSuiteForEventsV2) TestContinueAsNew() {
×
311
        ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
×
312
        defer cancel()
×
313

×
314
        domainID := uuid.New()
×
315
        workflowExecution := types.WorkflowExecution{
×
316
                WorkflowID: "continue-as-new-workflow-test",
×
317
                RunID:      "551c88d2-d9e6-404f-8131-9eec14f36643",
×
318
        }
×
319
        partitionConfig := map[string]string{
×
320
                "userID": uuid.New(),
×
321
        }
×
322
        decisionScheduleID := int64(2)
×
323
        _, err0 := s.CreateWorkflowExecution(ctx, domainID, workflowExecution, "queue1", "wType", 20, 13, nil, 3, 0, decisionScheduleID, nil, partitionConfig)
×
324
        s.NoError(err0)
×
325

×
326
        state0, err1 := s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
×
327
        s.NoError(err1)
×
328
        info0 := state0.ExecutionInfo
×
329
        updatedInfo := copyWorkflowExecutionInfo(info0)
×
330
        updatedStats := copyExecutionStats(state0.ExecutionStats)
×
331
        updatedInfo.State = p.WorkflowStateCompleted
×
332
        updatedInfo.CloseStatus = p.WorkflowCloseStatusCompleted
×
333
        updatedInfo.NextEventID = int64(5)
×
334
        updatedInfo.LastProcessedEvent = int64(2)
×
335
        versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
×
336
                {decisionScheduleID, common.EmptyVersion},
×
337
        })
×
338
        versionHistories := p.NewVersionHistories(versionHistory)
×
339

×
340
        newWorkflowExecution := types.WorkflowExecution{
×
341
                WorkflowID: "continue-as-new-workflow-test",
×
342
                RunID:      "64c7e15a-3fd7-4182-9c6f-6f25a4fa2614",
×
343
        }
×
344

×
345
        newdecisionTask := &p.DecisionTask{
×
346
                TaskID:     s.GetNextSequenceNumber(),
×
347
                DomainID:   updatedInfo.DomainID,
×
348
                TaskList:   updatedInfo.TaskList,
×
349
                ScheduleID: int64(2),
×
350
        }
×
351

×
352
        _, err2 := s.ExecutionManager.UpdateWorkflowExecution(ctx, &p.UpdateWorkflowExecutionRequest{
×
353
                UpdateWorkflowMutation: p.WorkflowMutation{
×
354
                        ExecutionInfo:       updatedInfo,
×
355
                        ExecutionStats:      updatedStats,
×
356
                        TransferTasks:       []p.Task{newdecisionTask},
×
357
                        TimerTasks:          nil,
×
358
                        Condition:           info0.NextEventID,
×
359
                        UpsertActivityInfos: nil,
×
360
                        DeleteActivityInfos: nil,
×
361
                        UpsertTimerInfos:    nil,
×
362
                        DeleteTimerInfos:    nil,
×
363
                        VersionHistories:    versionHistories,
×
364
                },
×
365
                NewWorkflowSnapshot: &p.WorkflowSnapshot{
×
366
                        ExecutionInfo: &p.WorkflowExecutionInfo{
×
367
                                CreateRequestID:             uuid.New(),
×
368
                                DomainID:                    updatedInfo.DomainID,
×
369
                                WorkflowID:                  newWorkflowExecution.GetWorkflowID(),
×
370
                                RunID:                       newWorkflowExecution.GetRunID(),
×
371
                                FirstExecutionRunID:         updatedInfo.FirstExecutionRunID,
×
372
                                TaskList:                    updatedInfo.TaskList,
×
373
                                WorkflowTypeName:            updatedInfo.WorkflowTypeName,
×
374
                                WorkflowTimeout:             updatedInfo.WorkflowTimeout,
×
375
                                DecisionStartToCloseTimeout: updatedInfo.DecisionStartToCloseTimeout,
×
376
                                ExecutionContext:            nil,
×
377
                                State:                       p.WorkflowStateRunning,
×
378
                                CloseStatus:                 p.WorkflowCloseStatusNone,
×
379
                                NextEventID:                 info0.NextEventID,
×
380
                                LastProcessedEvent:          common.EmptyEventID,
×
381
                                DecisionScheduleID:          int64(2),
×
382
                                DecisionStartedID:           common.EmptyEventID,
×
383
                                DecisionTimeout:             1,
×
384
                                BranchToken:                 []byte("branchToken1"),
×
385
                                PartitionConfig:             partitionConfig,
×
386
                        },
×
387
                        ExecutionStats:   &p.ExecutionStats{},
×
388
                        TransferTasks:    nil,
×
389
                        TimerTasks:       nil,
×
390
                        VersionHistories: versionHistories,
×
391
                },
×
392
                RangeID:  s.ShardInfo.RangeID,
×
393
                Encoding: pickRandomEncoding(),
×
394
        })
×
395

×
396
        s.NoError(err2)
×
397

×
398
        prevExecutionState, err3 := s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
×
399
        s.NoError(err3)
×
400
        prevExecutionInfo := prevExecutionState.ExecutionInfo
×
401
        s.Equal("551c88d2-d9e6-404f-8131-9eec14f36643", prevExecutionInfo.FirstExecutionRunID)
×
402
        s.Equal(p.WorkflowStateCompleted, prevExecutionInfo.State)
×
403
        s.Equal(int64(5), prevExecutionInfo.NextEventID)
×
404
        s.Equal(int64(2), prevExecutionInfo.LastProcessedEvent)
×
405
        s.Equal(partitionConfig, prevExecutionInfo.PartitionConfig)
×
406

×
407
        newExecutionState, err4 := s.GetWorkflowExecutionInfo(ctx, domainID, newWorkflowExecution)
×
408
        s.NoError(err4)
×
409
        newExecutionInfo := newExecutionState.ExecutionInfo
×
410
        s.Equal("551c88d2-d9e6-404f-8131-9eec14f36643", newExecutionInfo.FirstExecutionRunID)
×
411
        s.Equal(p.WorkflowStateRunning, newExecutionInfo.State)
×
412
        s.Equal(p.WorkflowCloseStatusNone, newExecutionInfo.CloseStatus)
×
413
        s.Equal(int64(3), newExecutionInfo.NextEventID)
×
414
        s.Equal(common.EmptyEventID, newExecutionInfo.LastProcessedEvent)
×
415
        s.Equal(int64(2), newExecutionInfo.DecisionScheduleID)
×
416
        s.Equal([]byte("branchToken1"), newExecutionInfo.BranchToken)
×
417
        s.Equal(partitionConfig, newExecutionInfo.PartitionConfig)
×
418

×
419
        newRunID, err5 := s.GetCurrentWorkflowRunID(ctx, domainID, workflowExecution.WorkflowID)
×
420
        s.NoError(err5)
×
421
        s.Equal(newWorkflowExecution.RunID, newRunID)
×
422
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc