• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01875e2f-959c-4c4d-87af-1d7805759bcc

08 Apr 2023 12:26AM UTC coverage: 57.178% (+0.1%) from 57.072%
01875e2f-959c-4c4d-87af-1d7805759bcc

Pull #5197

buildkite

Steven L
bad cleanup -> good cleanup
Pull Request #5197: Demonstrate a way to get rid of the cadence-idl repo

85396 of 149351 relevant lines covered (57.18%)

2283.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.86
/service/history/execution/mutable_state_task_refresher.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination mutable_state_task_refresher_mock.go -self_package github.com/uber/cadence/service/history/execution
22

23
package execution
24

25
import (
26
        "context"
27
        "time"
28

29
        "github.com/uber/cadence/common"
30
        "github.com/uber/cadence/common/cache"
31
        "github.com/uber/cadence/common/cluster"
32
        "github.com/uber/cadence/common/persistence"
33
        "github.com/uber/cadence/common/types"
34
        "github.com/uber/cadence/service/history/config"
35
        "github.com/uber/cadence/service/history/events"
36
)
37

38
var emptyTasks = []persistence.Task{}
39

40
type (
41
        // MutableStateTaskRefresher refreshes workflow transfer and timer tasks
42
        MutableStateTaskRefresher interface {
43
                RefreshTasks(ctx context.Context, startTime time.Time, mutableState MutableState) error
44
        }
45

46
        mutableStateTaskRefresherImpl struct {
47
                config          *config.Config
48
                clusterMetadata cluster.Metadata
49
                domainCache     cache.DomainCache
50
                eventsCache     events.Cache
51
                shardID         int
52
        }
53
)
54

55
// NewMutableStateTaskRefresher creates a new task refresher for mutable state
56
func NewMutableStateTaskRefresher(
57
        config *config.Config,
58
        clusterMetadata cluster.Metadata,
59
        domainCache cache.DomainCache,
60
        eventsCache events.Cache,
61
        shardID int,
62
) MutableStateTaskRefresher {
23✔
63

23✔
64
        return &mutableStateTaskRefresherImpl{
23✔
65
                config:          config,
23✔
66
                clusterMetadata: clusterMetadata,
23✔
67
                domainCache:     domainCache,
23✔
68
                eventsCache:     eventsCache,
23✔
69
                shardID:         shardID,
23✔
70
        }
23✔
71
}
23✔
72

73
func (r *mutableStateTaskRefresherImpl) RefreshTasks(
74
        ctx context.Context,
75
        startTime time.Time,
76
        mutableState MutableState,
77
) error {
18✔
78

18✔
79
        taskGenerator := NewMutableStateTaskGenerator(
18✔
80
                r.clusterMetadata,
18✔
81
                r.domainCache,
18✔
82
                mutableState,
18✔
83
        )
18✔
84

18✔
85
        if err := r.refreshTasksForWorkflowStart(
18✔
86
                ctx,
18✔
87
                startTime,
18✔
88
                mutableState,
18✔
89
                taskGenerator,
18✔
90
        ); err != nil {
18✔
91
                return err
×
92
        }
×
93

94
        if err := r.refreshTasksForWorkflowClose(
18✔
95
                ctx,
18✔
96
                mutableState,
18✔
97
                taskGenerator,
18✔
98
        ); err != nil {
18✔
99
                return err
×
100
        }
×
101

102
        if err := r.refreshTasksForRecordWorkflowStarted(
18✔
103
                ctx,
18✔
104
                mutableState,
18✔
105
                taskGenerator,
18✔
106
        ); err != nil {
18✔
107
                return err
×
108
        }
×
109

110
        if err := r.refreshTasksForDecision(
18✔
111
                ctx,
18✔
112
                mutableState,
18✔
113
                taskGenerator,
18✔
114
        ); err != nil {
18✔
115
                return err
×
116
        }
×
117

118
        if err := r.refreshTasksForActivity(
18✔
119
                ctx,
18✔
120
                mutableState,
18✔
121
                taskGenerator,
18✔
122
        ); err != nil {
18✔
123
                return err
×
124
        }
×
125

126
        if err := r.refreshTasksForTimer(
18✔
127
                ctx,
18✔
128
                mutableState,
18✔
129
                taskGenerator,
18✔
130
        ); err != nil {
18✔
131
                return err
×
132
        }
×
133

134
        if err := r.refreshTasksForChildWorkflow(
18✔
135
                ctx,
18✔
136
                mutableState,
18✔
137
                taskGenerator,
18✔
138
        ); err != nil {
18✔
139
                return err
×
140
        }
×
141

142
        if err := r.refreshTasksForRequestCancelExternalWorkflow(
18✔
143
                ctx,
18✔
144
                mutableState,
18✔
145
                taskGenerator,
18✔
146
        ); err != nil {
18✔
147
                return err
×
148
        }
×
149

150
        if err := r.refreshTasksForSignalExternalWorkflow(
18✔
151
                ctx,
18✔
152
                mutableState,
18✔
153
                taskGenerator,
18✔
154
        ); err != nil {
18✔
155
                return err
×
156
        }
×
157

158
        if common.IsAdvancedVisibilityWritingEnabled(r.config.AdvancedVisibilityWritingMode(), r.config.IsAdvancedVisConfigExist) {
18✔
159
                if err := r.refreshTasksForWorkflowSearchAttr(
×
160
                        ctx,
×
161
                        mutableState,
×
162
                        taskGenerator,
×
163
                ); err != nil {
×
164
                        return err
×
165
                }
×
166
        }
167

168
        return nil
18✔
169
}
170

171
func (r *mutableStateTaskRefresherImpl) refreshTasksForWorkflowStart(
172
        ctx context.Context,
173
        startTime time.Time,
174
        mutableState MutableState,
175
        taskGenerator MutableStateTaskGenerator,
176
) error {
18✔
177

18✔
178
        startEvent, err := mutableState.GetStartEvent(ctx)
18✔
179
        if err != nil {
18✔
180
                return err
×
181
        }
×
182

183
        if err := taskGenerator.GenerateWorkflowStartTasks(
18✔
184
                startTime,
18✔
185
                startEvent,
18✔
186
        ); err != nil {
18✔
187
                return err
×
188
        }
×
189

190
        startAttr := startEvent.WorkflowExecutionStartedEventAttributes
18✔
191
        if !mutableState.HasProcessedOrPendingDecision() && startAttr.GetFirstDecisionTaskBackoffSeconds() > 0 {
18✔
192
                if err := taskGenerator.GenerateDelayedDecisionTasks(
×
193
                        startEvent,
×
194
                ); err != nil {
×
195
                        return err
×
196
                }
×
197
        }
198

199
        return nil
18✔
200
}
201

202
func (r *mutableStateTaskRefresherImpl) refreshTasksForWorkflowClose(
203
        ctx context.Context,
204
        mutableState MutableState,
205
        taskGenerator MutableStateTaskGenerator,
206
) error {
18✔
207

18✔
208
        executionInfo := mutableState.GetExecutionInfo()
18✔
209
        if executionInfo.CloseStatus != persistence.WorkflowCloseStatusNone {
18✔
210
                closeEvent, err := mutableState.GetCompletionEvent(ctx)
×
211
                if err != nil {
×
212
                        return err
×
213
                }
×
214
                return taskGenerator.GenerateWorkflowCloseTasks(
×
215
                        closeEvent,
×
216
                        r.config.WorkflowDeletionJitterRange(mutableState.GetDomainEntry().GetInfo().Name),
×
217
                )
×
218
        }
219

220
        return nil
18✔
221
}
222

223
func (r *mutableStateTaskRefresherImpl) refreshTasksForRecordWorkflowStarted(
224
        ctx context.Context,
225
        mutableState MutableState,
226
        taskGenerator MutableStateTaskGenerator,
227
) error {
18✔
228

18✔
229
        startEvent, err := mutableState.GetStartEvent(ctx)
18✔
230
        if err != nil {
18✔
231
                return err
×
232
        }
×
233

234
        executionInfo := mutableState.GetExecutionInfo()
18✔
235

18✔
236
        if executionInfo.CloseStatus == persistence.WorkflowCloseStatusNone {
36✔
237
                return taskGenerator.GenerateRecordWorkflowStartedTasks(
18✔
238
                        startEvent,
18✔
239
                )
18✔
240
        }
18✔
241

242
        return nil
×
243
}
244

245
func (r *mutableStateTaskRefresherImpl) refreshTasksForDecision(
246
        ctx context.Context,
247
        mutableState MutableState,
248
        taskGenerator MutableStateTaskGenerator,
249
) error {
18✔
250

18✔
251
        if !mutableState.HasPendingDecision() {
18✔
252
                // no decision task at all
×
253
                return nil
×
254
        }
×
255

256
        decision, ok := mutableState.GetPendingDecision()
18✔
257
        if !ok {
18✔
258
                return &types.InternalServiceError{Message: "it could be a bug, cannot get pending decision"}
×
259
        }
×
260

261
        // decision already started
262
        if decision.StartedID != common.EmptyEventID {
30✔
263
                return taskGenerator.GenerateDecisionStartTasks(
12✔
264
                        decision.ScheduleID,
12✔
265
                )
12✔
266
        }
12✔
267

268
        // decision only scheduled
269
        return taskGenerator.GenerateDecisionScheduleTasks(
9✔
270
                decision.ScheduleID,
9✔
271
        )
9✔
272
}
273

274
func (r *mutableStateTaskRefresherImpl) refreshTasksForActivity(
275
        ctx context.Context,
276
        mutableState MutableState,
277
        taskGenerator MutableStateTaskGenerator,
278
) error {
18✔
279

18✔
280
        executionInfo := mutableState.GetExecutionInfo()
18✔
281
        pendingActivityInfos := mutableState.GetPendingActivityInfos()
18✔
282

18✔
283
        currentBranchToken, err := mutableState.GetCurrentBranchToken()
18✔
284
        if err != nil {
18✔
285
                return err
×
286
        }
×
287

288
Loop:
18✔
289
        for _, activityInfo := range pendingActivityInfos {
39✔
290
                // clear all activity timer task mask for later activity timer task re-generation
21✔
291
                activityInfo.TimerTaskStatus = TimerTaskStatusNone
21✔
292

21✔
293
                // need to update activity timer task mask for which task is generated
21✔
294
                if err := mutableState.UpdateActivity(
21✔
295
                        activityInfo,
21✔
296
                ); err != nil {
21✔
297
                        return err
×
298
                }
×
299

300
                if activityInfo.StartedID != common.EmptyEventID {
24✔
301
                        continue Loop
3✔
302
                }
303

304
                scheduleEvent, err := r.eventsCache.GetEvent(
19✔
305
                        ctx,
19✔
306
                        r.shardID,
19✔
307
                        executionInfo.DomainID,
19✔
308
                        executionInfo.WorkflowID,
19✔
309
                        executionInfo.RunID,
19✔
310
                        activityInfo.ScheduledEventBatchID,
19✔
311
                        activityInfo.ScheduleID,
19✔
312
                        currentBranchToken,
19✔
313
                )
19✔
314
                if err != nil {
19✔
315
                        return err
×
316
                }
×
317

318
                if err := taskGenerator.GenerateActivityTransferTasks(
19✔
319
                        scheduleEvent,
19✔
320
                ); err != nil {
19✔
321
                        return err
×
322
                }
×
323
        }
324

325
        if _, err := NewTimerSequence(
18✔
326
                mutableState,
18✔
327
        ).CreateNextActivityTimer(); err != nil {
18✔
328
                return err
×
329
        }
×
330

331
        return nil
18✔
332
}
333

334
func (r *mutableStateTaskRefresherImpl) refreshTasksForTimer(
335
        ctx context.Context,
336
        mutableState MutableState,
337
        taskGenerator MutableStateTaskGenerator,
338
) error {
18✔
339

18✔
340
        pendingTimerInfos := mutableState.GetPendingTimerInfos()
18✔
341

18✔
342
        for _, timerInfo := range pendingTimerInfos {
19✔
343
                // clear all timer task mask for later timer task re-generation
1✔
344
                timerInfo.TaskStatus = TimerTaskStatusNone
1✔
345

1✔
346
                // need to update user timer task mask for which task is generated
1✔
347
                if err := mutableState.UpdateUserTimer(
1✔
348
                        timerInfo,
1✔
349
                ); err != nil {
1✔
350
                        return err
×
351
                }
×
352
        }
353

354
        if _, err := NewTimerSequence(
18✔
355
                mutableState,
18✔
356
        ).CreateNextUserTimer(); err != nil {
18✔
357
                return err
×
358
        }
×
359

360
        return nil
18✔
361
}
362

363
func (r *mutableStateTaskRefresherImpl) refreshTasksForChildWorkflow(
364
        ctx context.Context,
365
        mutableState MutableState,
366
        taskGenerator MutableStateTaskGenerator,
367
) error {
18✔
368

18✔
369
        executionInfo := mutableState.GetExecutionInfo()
18✔
370
        pendingChildWorkflowInfos := mutableState.GetPendingChildExecutionInfos()
18✔
371

18✔
372
        currentBranchToken, err := mutableState.GetCurrentBranchToken()
18✔
373
        if err != nil {
18✔
374
                return err
×
375
        }
×
376

377
Loop:
18✔
378
        for _, childWorkflowInfo := range pendingChildWorkflowInfos {
18✔
379
                if childWorkflowInfo.StartedID != common.EmptyEventID {
×
380
                        continue Loop
×
381
                }
382

383
                scheduleEvent, err := r.eventsCache.GetEvent(
×
384
                        ctx,
×
385
                        r.shardID,
×
386
                        executionInfo.DomainID,
×
387
                        executionInfo.WorkflowID,
×
388
                        executionInfo.RunID,
×
389
                        childWorkflowInfo.InitiatedEventBatchID,
×
390
                        childWorkflowInfo.InitiatedID,
×
391
                        currentBranchToken,
×
392
                )
×
393
                if err != nil {
×
394
                        return err
×
395
                }
×
396

397
                if err := taskGenerator.GenerateChildWorkflowTasks(
×
398
                        scheduleEvent,
×
399
                ); err != nil {
×
400
                        return err
×
401
                }
×
402
        }
403

404
        return nil
18✔
405
}
406

407
func (r *mutableStateTaskRefresherImpl) refreshTasksForRequestCancelExternalWorkflow(
408
        ctx context.Context,
409
        mutableState MutableState,
410
        taskGenerator MutableStateTaskGenerator,
411
) error {
18✔
412

18✔
413
        executionInfo := mutableState.GetExecutionInfo()
18✔
414
        pendingRequestCancelInfos := mutableState.GetPendingRequestCancelExternalInfos()
18✔
415

18✔
416
        currentBranchToken, err := mutableState.GetCurrentBranchToken()
18✔
417
        if err != nil {
18✔
418
                return err
×
419
        }
×
420

421
        for _, requestCancelInfo := range pendingRequestCancelInfos {
18✔
422
                initiateEvent, err := r.eventsCache.GetEvent(
×
423
                        ctx,
×
424
                        r.shardID,
×
425
                        executionInfo.DomainID,
×
426
                        executionInfo.WorkflowID,
×
427
                        executionInfo.RunID,
×
428
                        requestCancelInfo.InitiatedEventBatchID,
×
429
                        requestCancelInfo.InitiatedID,
×
430
                        currentBranchToken,
×
431
                )
×
432
                if err != nil {
×
433
                        return err
×
434
                }
×
435

436
                if err := taskGenerator.GenerateRequestCancelExternalTasks(
×
437
                        initiateEvent,
×
438
                ); err != nil {
×
439
                        return err
×
440
                }
×
441
        }
442

443
        return nil
18✔
444
}
445

446
func (r *mutableStateTaskRefresherImpl) refreshTasksForSignalExternalWorkflow(
447
        ctx context.Context,
448
        mutableState MutableState,
449
        taskGenerator MutableStateTaskGenerator,
450
) error {
18✔
451

18✔
452
        executionInfo := mutableState.GetExecutionInfo()
18✔
453
        pendingSignalInfos := mutableState.GetPendingSignalExternalInfos()
18✔
454

18✔
455
        currentBranchToken, err := mutableState.GetCurrentBranchToken()
18✔
456
        if err != nil {
18✔
457
                return err
×
458
        }
×
459

460
        for _, signalInfo := range pendingSignalInfos {
18✔
461
                initiateEvent, err := r.eventsCache.GetEvent(
×
462
                        ctx,
×
463
                        r.shardID,
×
464
                        executionInfo.DomainID,
×
465
                        executionInfo.WorkflowID,
×
466
                        executionInfo.RunID,
×
467
                        signalInfo.InitiatedEventBatchID,
×
468
                        signalInfo.InitiatedID,
×
469
                        currentBranchToken,
×
470
                )
×
471
                if err != nil {
×
472
                        return err
×
473
                }
×
474

475
                if err := taskGenerator.GenerateSignalExternalTasks(
×
476
                        initiateEvent,
×
477
                ); err != nil {
×
478
                        return err
×
479
                }
×
480
        }
481

482
        return nil
18✔
483
}
484

485
func (r *mutableStateTaskRefresherImpl) refreshTasksForWorkflowSearchAttr(
486
        ctx context.Context,
487
        mutableState MutableState,
488
        taskGenerator MutableStateTaskGenerator,
489
) error {
×
490

×
491
        return taskGenerator.GenerateWorkflowSearchAttrTasks()
×
492
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc