• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018fa65b-f10a-4dc6-8f5b-50bc0a76f08d

23 May 2024 04:49PM UTC coverage: 69.355% (+0.005%) from 69.35%
018fa65b-f10a-4dc6-8f5b-50bc0a76f08d

push

buildkite

web-flow
Move RetryActivity to the corresponding file (#6038)

40 of 44 new or added lines in 1 file covered. (90.91%)

33 existing lines in 10 files now uncovered.

102185 of 147336 relevant lines covered (69.36%)

2585.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.33
/service/history/task/timer_active_task_executor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "fmt"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/backoff"
30
        "github.com/uber/cadence/common/log"
31
        "github.com/uber/cadence/common/log/tag"
32
        "github.com/uber/cadence/common/metrics"
33
        "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/types"
35
        "github.com/uber/cadence/service/history/config"
36
        "github.com/uber/cadence/service/history/execution"
37
        "github.com/uber/cadence/service/history/shard"
38
        "github.com/uber/cadence/service/worker/archiver"
39
)
40

41
const (
42
        scanWorkflowTimeout = 30 * time.Second
43
)
44

45
var (
46
        normalDecisionTypeTag = metrics.DecisionTypeTag("normal")
47
        stickyDecisionTypeTag = metrics.DecisionTypeTag("sticky")
48
)
49

50
type (
51
        timerActiveTaskExecutor struct {
52
                *timerTaskExecutorBase
53
        }
54
)
55

56
// NewTimerActiveTaskExecutor creates a new task executor for active timer task
57
func NewTimerActiveTaskExecutor(
58
        shard shard.Context,
59
        archiverClient archiver.Client,
60
        executionCache *execution.Cache,
61
        logger log.Logger,
62
        metricsClient metrics.Client,
63
        config *config.Config,
64
) Executor {
98✔
65
        return &timerActiveTaskExecutor{
98✔
66
                timerTaskExecutorBase: newTimerTaskExecutorBase(
98✔
67
                        shard,
98✔
68
                        archiverClient,
98✔
69
                        executionCache,
98✔
70
                        logger,
98✔
71
                        metricsClient,
98✔
72
                        config,
98✔
73
                ),
98✔
74
        }
98✔
75
}
98✔
76

77
func (t *timerActiveTaskExecutor) Execute(
78
        task Task,
79
        shouldProcessTask bool,
80
) error {
2,358✔
81
        timerTask, ok := task.GetInfo().(*persistence.TimerTaskInfo)
2,358✔
82
        if !ok {
2,358✔
83
                return errUnexpectedTask
×
84
        }
×
85

86
        if !shouldProcessTask {
2,361✔
87
                return nil
3✔
88
        }
3✔
89

90
        switch timerTask.TaskType {
2,355✔
91
        case persistence.TaskTypeUserTimer:
27✔
92
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
27✔
93
                defer cancel()
27✔
94
                return t.executeUserTimerTimeoutTask(ctx, timerTask)
27✔
95
        case persistence.TaskTypeActivityTimeout:
737✔
96
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
737✔
97
                defer cancel()
737✔
98
                return t.executeActivityTimeoutTask(ctx, timerTask)
737✔
99
        case persistence.TaskTypeDecisionTimeout:
1,115✔
100
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
1,115✔
101
                defer cancel()
1,115✔
102
                return t.executeDecisionTimeoutTask(ctx, timerTask)
1,115✔
103
        case persistence.TaskTypeWorkflowTimeout:
354✔
104
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
354✔
105
                defer cancel()
354✔
106
                return t.executeWorkflowTimeoutTask(ctx, timerTask)
354✔
107
        case persistence.TaskTypeActivityRetryTimer:
11✔
108
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
11✔
109
                defer cancel()
11✔
110
                return t.executeActivityRetryTimerTask(ctx, timerTask)
11✔
111
        case persistence.TaskTypeWorkflowBackoffTimer:
59✔
112
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
59✔
113
                defer cancel()
59✔
114
                return t.executeWorkflowBackoffTimerTask(ctx, timerTask)
59✔
115
        case persistence.TaskTypeDeleteHistoryEvent:
52✔
116
                // special timeout for delete history event
52✔
117
                deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(t.ctx, time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second)
52✔
118
                defer deleteHistoryEventCancel()
52✔
119
                return t.executeDeleteHistoryEventTask(deleteHistoryEventContext, timerTask)
52✔
120
        default:
×
121
                return errUnknownTimerTask
×
122
        }
123
}
124

125
func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask(
126
        ctx context.Context,
127
        task *persistence.TimerTaskInfo,
128
) (retError error) {
27✔
129
        t.logger.Debug("Processing user timer",
27✔
130
                tag.WorkflowDomainID(task.DomainID),
27✔
131
                tag.WorkflowID(task.WorkflowID),
27✔
132
                tag.WorkflowRunID(task.RunID),
27✔
133
                tag.TaskID(task.TaskID),
27✔
134
        )
27✔
135
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
27✔
136
                task.DomainID,
27✔
137
                getWorkflowExecution(task),
27✔
138
                taskGetExecutionContextTimeout,
27✔
139
        )
27✔
140
        if err != nil {
27✔
141
                if err == context.DeadlineExceeded {
×
142
                        return errWorkflowBusy
×
143
                }
×
144
                return err
×
145
        }
146
        defer func() { release(retError) }()
54✔
147

148
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
27✔
149
        if err != nil {
27✔
150
                return err
×
151
        }
×
152
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
36✔
153
                return nil
9✔
154
        }
9✔
155

156
        timerSequence := execution.NewTimerSequence(mutableState)
18✔
157
        referenceTime := t.shard.GetTimeSource().Now()
18✔
158
        resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
18✔
159
        updateMutableState := false
18✔
160
        debugLog := t.logger.Debug
18✔
161
        if t.config.EnableDebugMode && t.config.EnableTimerDebugLogByDomainID(task.DomainID) {
18✔
162
                debugLog = t.logger.Info
×
163
        }
×
164

165
        // initialized when a timer with delay >= resurrectionCheckMinDelay
166
        // is encountered, so that we don't need to scan history multiple times
167
        // where there're multiple timers with high delay
168
        var resurrectedTimer map[string]struct{}
18✔
169
        scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout)
18✔
170
        defer cancel()
18✔
171

18✔
172
        sortedUserTimers := timerSequence.LoadAndSortUserTimers()
18✔
173
        debugLog("Sorted user timers",
18✔
174
                tag.WorkflowDomainID(task.DomainID),
18✔
175
                tag.WorkflowID(task.WorkflowID),
18✔
176
                tag.WorkflowRunID(task.RunID),
18✔
177
                tag.Counter(len(sortedUserTimers)),
18✔
178
        )
18✔
179

18✔
180
Loop:
18✔
181
        for _, timerSequenceID := range sortedUserTimers {
36✔
182
                timerInfo, ok := mutableState.GetUserTimerInfoByEventID(timerSequenceID.EventID)
18✔
183
                if !ok {
18✔
184
                        errString := fmt.Sprintf("failed to find in user timer event ID: %v", timerSequenceID.EventID)
×
185
                        t.logger.Error(errString)
×
186
                        return &types.InternalServiceError{Message: errString}
×
187
                }
×
188

189
                delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID)
18✔
190
                debugLog("Processing user timer sequence id",
18✔
191
                        tag.WorkflowDomainID(task.DomainID),
18✔
192
                        tag.WorkflowID(task.WorkflowID),
18✔
193
                        tag.WorkflowRunID(task.RunID),
18✔
194
                        tag.TaskType(task.TaskType),
18✔
195
                        tag.TaskID(task.TaskID),
18✔
196
                        tag.WorkflowTimerID(timerInfo.TimerID),
18✔
197
                        tag.WorkflowScheduleID(timerInfo.StartedID),
18✔
198
                        tag.Dynamic("timer-sequence-id", timerSequenceID),
18✔
199
                        tag.Dynamic("timer-info", timerInfo),
18✔
200
                        tag.Dynamic("delay", delay),
18✔
201
                        tag.Dynamic("expired", expired),
18✔
202
                )
18✔
203

18✔
204
                if !expired {
18✔
205
                        // timer sequence IDs are sorted, once there is one timer
×
206
                        // sequence ID not expired, all after that wil not expired
×
207
                        break Loop
×
208
                }
209

210
                if delay >= resurrectionCheckMinDelay || resurrectedTimer != nil {
20✔
211
                        if resurrectedTimer == nil {
3✔
212
                                // overwrite the context here as scan history may take a long time to complete
1✔
213
                                // ctx will also be used by other operations like updateWorkflow
1✔
214
                                ctx = scanWorkflowCtx
1✔
215
                                resurrectedTimer, err = execution.GetResurrectedTimers(ctx, t.shard, mutableState)
1✔
216
                                if err != nil {
1✔
217
                                        t.logger.Error("Timer resurrection check failed", tag.Error(err))
×
218
                                        return err
×
219
                                }
×
220
                        }
221

222
                        if _, ok := resurrectedTimer[timerInfo.TimerID]; ok {
3✔
223
                                // found timer resurrection
1✔
224
                                domainName := mutableState.GetDomainEntry().GetInfo().Name
1✔
225
                                t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.TimerResurrectionCounter)
1✔
226
                                t.logger.Warn("Encounter resurrected timer, skip",
1✔
227
                                        tag.WorkflowDomainID(task.DomainID),
1✔
228
                                        tag.WorkflowID(task.WorkflowID),
1✔
229
                                        tag.WorkflowRunID(task.RunID),
1✔
230
                                        tag.TaskType(task.TaskType),
1✔
231
                                        tag.TaskID(task.TaskID),
1✔
232
                                        tag.WorkflowTimerID(timerInfo.TimerID),
1✔
233
                                        tag.WorkflowScheduleID(timerInfo.StartedID), // timerStartedEvent is basically scheduled event
1✔
234
                                )
1✔
235

1✔
236
                                // remove resurrected timer from mutable state
1✔
237
                                if err := mutableState.DeleteUserTimer(timerInfo.TimerID); err != nil {
1✔
238
                                        return err
×
239
                                }
×
240
                                updateMutableState = true
1✔
241
                                continue Loop
1✔
242
                        }
243
                }
244

245
                if _, err := mutableState.AddTimerFiredEvent(timerInfo.TimerID); err != nil {
17✔
246
                        return err
×
247
                }
×
248
                updateMutableState = true
17✔
249

17✔
250
                debugLog("User timer fired",
17✔
251
                        tag.WorkflowDomainID(task.DomainID),
17✔
252
                        tag.WorkflowID(task.WorkflowID),
17✔
253
                        tag.WorkflowRunID(task.RunID),
17✔
254
                        tag.TaskType(task.TaskType),
17✔
255
                        tag.TaskID(task.TaskID),
17✔
256
                        tag.WorkflowTimerID(timerInfo.TimerID),
17✔
257
                        tag.WorkflowScheduleID(timerInfo.StartedID),
17✔
258
                        tag.WorkflowNextEventID(mutableState.GetNextEventID()),
17✔
259
                )
17✔
260

261
        }
262

263
        if !updateMutableState {
19✔
264
                return nil
1✔
265
        }
1✔
266

267
        return t.updateWorkflowExecution(ctx, wfContext, mutableState, updateMutableState)
17✔
268
}
269

270
func (t *timerActiveTaskExecutor) executeActivityTimeoutTask(
271
        ctx context.Context,
272
        task *persistence.TimerTaskInfo,
273
) (retError error) {
737✔
274

737✔
275
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
737✔
276
                task.DomainID,
737✔
277
                getWorkflowExecution(task),
737✔
278
                taskGetExecutionContextTimeout,
737✔
279
        )
737✔
280
        if err != nil {
737✔
UNCOV
281
                if err == context.DeadlineExceeded {
×
UNCOV
282
                        return errWorkflowBusy
×
UNCOV
283
                }
×
284
                return err
×
285
        }
286
        defer func() { release(retError) }()
1,474✔
287

288
        domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
737✔
289
        if err != nil {
737✔
290
                return fmt.Errorf("unable to find domainID: %v, err: %v", task.DomainID, err)
×
291
        }
×
292

293
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
737✔
294
        if err != nil {
737✔
295
                return err
×
296
        }
×
297
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
1,241✔
298
                return nil
504✔
299
        }
504✔
300

301
        wfType := mutableState.GetWorkflowType()
233✔
302
        if wfType == nil {
233✔
303
                return fmt.Errorf("unable to find workflow type, task %s", task)
×
304
        }
×
305

306
        timerSequence := execution.NewTimerSequence(mutableState)
233✔
307
        referenceTime := t.shard.GetTimeSource().Now()
233✔
308
        resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
233✔
309
        updateMutableState := false
233✔
310
        scheduleDecision := false
233✔
311

233✔
312
        // initialized when an activity timer with delay >= resurrectionCheckMinDelay
233✔
313
        // is encountered, so that we don't need to scan history multiple times
233✔
314
        // where there're multiple timers with high delay
233✔
315
        var resurrectedActivity map[int64]struct{}
233✔
316
        scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout)
233✔
317
        defer cancel()
233✔
318

233✔
319
        // need to clear activity heartbeat timer task mask for new activity timer task creation
233✔
320
        // NOTE: LastHeartbeatTimeoutVisibilityInSeconds is for deduping heartbeat timer creation as it's possible
233✔
321
        // one heartbeat task was persisted multiple times with different taskIDs due to the retry logic
233✔
322
        // for updating workflow execution. In that case, only one new heartbeat timeout task should be
233✔
323
        // created.
233✔
324
        isHeartBeatTask := task.TimeoutType == int(types.TimeoutTypeHeartbeat)
233✔
325
        activityInfo, ok := mutableState.GetActivityInfo(task.EventID)
233✔
326
        if isHeartBeatTask && ok && activityInfo.LastHeartbeatTimeoutVisibilityInSeconds <= task.VisibilityTimestamp.Unix() {
320✔
327
                activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ execution.TimerTaskStatusCreatedHeartbeat
87✔
328
                if err := mutableState.UpdateActivity(activityInfo); err != nil {
87✔
329
                        return err
×
330
                }
×
331
                updateMutableState = true
87✔
332
        }
333

334
Loop:
233✔
335
        for _, timerSequenceID := range timerSequence.LoadAndSortActivityTimers() {
533✔
336
                activityInfo, ok := mutableState.GetActivityInfo(timerSequenceID.EventID)
300✔
337
                if !ok || timerSequenceID.Attempt < activityInfo.Attempt {
392✔
338
                        // handle 2 cases:
92✔
339
                        // 1. !ok
92✔
340
                        //  this case can happen since each activity can have 4 timers
92✔
341
                        //  and one of those 4 timers may have fired in this loop
92✔
342
                        // 2. timerSequenceID.attempt < activityInfo.Attempt
92✔
343
                        //  retry could update activity attempt, should not timeouts new attempt
92✔
344
                        // 3. it's a resurrected activity and has already been deleted in this loop
92✔
345
                        continue Loop
92✔
346
                }
347

348
                delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID)
208✔
349
                if !expired {
309✔
350
                        // timer sequence IDs are sorted, once there is one timer
101✔
351
                        // sequence ID not expired, all after that wil not expired
101✔
352
                        break Loop
101✔
353
                }
354

355
                if delay >= resurrectionCheckMinDelay || resurrectedActivity != nil {
109✔
356
                        if resurrectedActivity == nil {
3✔
357
                                // overwrite the context here as scan history may take a long time to complete
1✔
358
                                // ctx will also be used by other operations like updateWorkflow
1✔
359
                                ctx = scanWorkflowCtx
1✔
360
                                resurrectedActivity, err = execution.GetResurrectedActivities(ctx, t.shard, mutableState)
1✔
361
                                if err != nil {
1✔
362
                                        t.logger.Error("Activity resurrection check failed", tag.Error(err))
×
363
                                        return err
×
364
                                }
×
365
                        }
366

367
                        if _, ok := resurrectedActivity[activityInfo.ScheduleID]; ok {
3✔
368
                                // found activity resurrection
1✔
369
                                domainName := mutableState.GetDomainEntry().GetInfo().Name
1✔
370
                                t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.ActivityResurrectionCounter)
1✔
371
                                t.logger.Warn("Encounter resurrected activity, skip",
1✔
372
                                        tag.WorkflowDomainID(task.DomainID),
1✔
373
                                        tag.WorkflowID(task.WorkflowID),
1✔
374
                                        tag.WorkflowRunID(task.RunID),
1✔
375
                                        tag.TaskType(task.TaskType),
1✔
376
                                        tag.TaskID(task.TaskID),
1✔
377
                                        tag.WorkflowActivityID(activityInfo.ActivityID),
1✔
378
                                        tag.WorkflowScheduleID(activityInfo.ScheduleID),
1✔
379
                                )
1✔
380

1✔
381
                                // remove resurrected activity from mutable state
1✔
382
                                if err := mutableState.DeleteActivity(activityInfo.ScheduleID); err != nil {
1✔
383
                                        return err
×
384
                                }
×
385
                                updateMutableState = true
1✔
386
                                continue Loop
1✔
387
                        }
388
                }
389

390
                // check if it's possible that the timeout is due to activity task lost
391
                if timerSequenceID.TimerType == execution.TimerTypeScheduleToStart {
166✔
392
                        domainName, err := t.shard.GetDomainCache().GetDomainName(mutableState.GetExecutionInfo().DomainID)
60✔
393
                        if err == nil && activityInfo.ScheduleToStartTimeout >= int32(t.config.ActivityMaxScheduleToStartTimeoutForRetry(domainName).Seconds()) {
60✔
394
                                // note that we ignore the race condition for the dynamic config value change here as it's only for metric and logging purpose.
×
395
                                // theoratically the check only applies to activities with retry policy
×
396
                                // however for activities without retry policy, we also want to check the potential task lost and emit the metric
×
397
                                // so reuse the same config value as a threshold so that the metric only got emitted if the activity has been started after a long time.
×
398
                                t.metricsClient.Scope(metrics.TimerActiveTaskActivityTimeoutScope, metrics.DomainTag(domainName)).IncCounter(metrics.ActivityLostCounter)
×
399
                                t.logger.Warn("Potentially activity task lost",
×
400
                                        tag.WorkflowDomainName(domainName),
×
401
                                        tag.WorkflowID(task.WorkflowID),
×
402
                                        tag.WorkflowRunID(task.RunID),
×
403
                                        tag.WorkflowScheduleID(activityInfo.ScheduleID),
×
404
                                )
×
405
                        }
×
406
                }
407

408
                if ok, err := mutableState.RetryActivity(
106✔
409
                        activityInfo,
106✔
410
                        execution.TimerTypeToReason(timerSequenceID.TimerType),
106✔
411
                        nil,
106✔
412
                ); err != nil {
106✔
413
                        return err
×
414
                } else if ok {
111✔
415
                        updateMutableState = true
5✔
416
                        continue Loop
5✔
417
                }
418

419
                t.emitTimeoutMetricScopeWithDomainTag(
101✔
420
                        mutableState.GetExecutionInfo().DomainID,
101✔
421
                        metrics.TimerActiveTaskActivityTimeoutScope,
101✔
422
                        timerSequenceID.TimerType,
101✔
423
                        metrics.WorkflowTypeTag(wfType.GetName()),
101✔
424
                )
101✔
425

101✔
426
                t.logger.Info("Activity timed out",
101✔
427
                        tag.WorkflowDomainName(domainName),
101✔
428
                        tag.WorkflowDomainID(task.GetDomainID()),
101✔
429
                        tag.WorkflowID(task.GetWorkflowID()),
101✔
430
                        tag.WorkflowRunID(task.GetRunID()),
101✔
431
                        tag.ScheduleAttempt(task.ScheduleAttempt),
101✔
432
                        tag.FailoverVersion(task.GetVersion()),
101✔
433
                )
101✔
434

101✔
435
                if _, err := mutableState.AddActivityTaskTimedOutEvent(
101✔
436
                        activityInfo.ScheduleID,
101✔
437
                        activityInfo.StartedID,
101✔
438
                        execution.TimerTypeToInternal(timerSequenceID.TimerType),
101✔
439
                        activityInfo.Details,
101✔
440
                ); err != nil {
101✔
441
                        return err
×
442
                }
×
443
                updateMutableState = true
101✔
444
                scheduleDecision = true
101✔
445
        }
446

447
        if !updateMutableState {
312✔
448
                return nil
79✔
449
        }
79✔
450
        return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
154✔
451
}
452

453
func (t *timerActiveTaskExecutor) executeDecisionTimeoutTask(
454
        ctx context.Context,
455
        task *persistence.TimerTaskInfo,
456
) (retError error) {
1,115✔
457

1,115✔
458
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
1,115✔
459
                task.DomainID,
1,115✔
460
                getWorkflowExecution(task),
1,115✔
461
                taskGetExecutionContextTimeout,
1,115✔
462
        )
1,115✔
463
        if err != nil {
1,115✔
464
                if err == context.DeadlineExceeded {
×
465
                        return errWorkflowBusy
×
466
                }
×
467
                return err
×
468
        }
469
        defer func() { release(retError) }()
2,230✔
470

471
        domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
1,115✔
472
        if err != nil {
1,115✔
473
                return fmt.Errorf("unable to find domainID: %v, err: %v", task.DomainID, err)
×
474
        }
×
475

476
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
1,115✔
477
        if err != nil {
1,115✔
478
                return err
×
479
        }
×
480
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
1,884✔
481
                return nil
769✔
482
        }
769✔
483

484
        wfType := mutableState.GetWorkflowType()
346✔
485
        if wfType == nil {
346✔
486
                return fmt.Errorf("unable to find workflow type, task %s", task)
×
487
        }
×
488

489
        scheduleID := task.EventID
346✔
490
        decision, ok := mutableState.GetDecisionInfo(scheduleID)
346✔
491
        if !ok {
637✔
492
                t.logger.Debug("Potentially duplicate", tag.TaskID(task.TaskID), tag.WorkflowScheduleID(scheduleID), tag.TaskType(persistence.TaskTypeDecisionTimeout))
291✔
493
                return nil
291✔
494
        }
291✔
495
        ok, err = verifyTaskVersion(t.shard, t.logger, task.DomainID, decision.Version, task.Version, task)
55✔
496
        if err != nil || !ok {
55✔
497
                return err
×
498
        }
×
499

500
        if decision.Attempt != task.ScheduleAttempt {
55✔
501
                return nil
×
502
        }
×
503

504
        scheduleDecision := false
55✔
505
        isStickyDecision := mutableState.GetExecutionInfo().StickyTaskList != ""
55✔
506
        decisionTypeTag := normalDecisionTypeTag
55✔
507
        if isStickyDecision {
86✔
508
                decisionTypeTag = stickyDecisionTypeTag
31✔
509
        }
31✔
510
        tags := []metrics.Tag{metrics.WorkflowTypeTag(wfType.GetName()), decisionTypeTag}
55✔
511
        switch execution.TimerTypeFromInternal(types.TimeoutType(task.TimeoutType)) {
55✔
512
        case execution.TimerTypeStartToClose:
37✔
513
                t.emitTimeoutMetricScopeWithDomainTag(
37✔
514
                        mutableState.GetExecutionInfo().DomainID,
37✔
515
                        metrics.TimerActiveTaskDecisionTimeoutScope,
37✔
516
                        execution.TimerTypeStartToClose,
37✔
517
                        tags...,
37✔
518
                )
37✔
519
                if _, err := mutableState.AddDecisionTaskTimedOutEvent(
37✔
520
                        decision.ScheduleID,
37✔
521
                        decision.StartedID,
37✔
522
                ); err != nil {
37✔
523
                        return err
×
524
                }
×
525
                scheduleDecision = true
37✔
526

527
        case execution.TimerTypeScheduleToStart:
18✔
528
                if decision.StartedID != common.EmptyEventID {
18✔
529
                        // decision has already started
×
530
                        return nil
×
531
                }
×
532

533
                if !isStickyDecision {
23✔
534
                        t.logger.Warn("Potential lost normal decision task",
5✔
535
                                tag.WorkflowDomainName(domainName),
5✔
536
                                tag.WorkflowDomainID(task.GetDomainID()),
5✔
537
                                tag.WorkflowID(task.GetWorkflowID()),
5✔
538
                                tag.WorkflowRunID(task.GetRunID()),
5✔
539
                                tag.WorkflowScheduleID(scheduleID),
5✔
540
                                tag.ScheduleAttempt(task.ScheduleAttempt),
5✔
541
                                tag.FailoverVersion(task.GetVersion()),
5✔
542
                        )
5✔
543
                }
5✔
544

545
                t.emitTimeoutMetricScopeWithDomainTag(
18✔
546
                        mutableState.GetExecutionInfo().DomainID,
18✔
547
                        metrics.TimerActiveTaskDecisionTimeoutScope,
18✔
548
                        execution.TimerTypeScheduleToStart,
18✔
549
                        tags...,
18✔
550
                )
18✔
551
                _, err := mutableState.AddDecisionTaskScheduleToStartTimeoutEvent(scheduleID)
18✔
552
                if err != nil {
18✔
553
                        return err
×
554
                }
×
555
                scheduleDecision = true
18✔
556
        }
557

558
        return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
55✔
559
}
560

561
func (t *timerActiveTaskExecutor) executeWorkflowBackoffTimerTask(
562
        ctx context.Context,
563
        task *persistence.TimerTaskInfo,
564
) (retError error) {
59✔
565

59✔
566
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
59✔
567
                task.DomainID,
59✔
568
                getWorkflowExecution(task),
59✔
569
                taskGetExecutionContextTimeout,
59✔
570
        )
59✔
571
        if err != nil {
59✔
572
                if err == context.DeadlineExceeded {
×
573
                        return errWorkflowBusy
×
574
                }
×
575
                return err
×
576
        }
577
        defer func() { release(retError) }()
118✔
578

579
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
59✔
580
        if err != nil {
59✔
581
                return err
×
582
        }
×
583
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
65✔
584
                return nil
6✔
585
        }
6✔
586

587
        if task.TimeoutType == persistence.WorkflowBackoffTimeoutTypeRetry {
76✔
588
                t.metricsClient.IncCounter(metrics.TimerActiveTaskWorkflowBackoffTimerScope, metrics.WorkflowRetryBackoffTimerCount)
23✔
589
        } else {
53✔
590
                t.metricsClient.IncCounter(metrics.TimerActiveTaskWorkflowBackoffTimerScope, metrics.WorkflowCronBackoffTimerCount)
30✔
591
        }
30✔
592

593
        if mutableState.HasProcessedOrPendingDecision() {
54✔
594
                // already has decision task
1✔
595
                return nil
1✔
596
        }
1✔
597

598
        // schedule first decision task
599
        return t.updateWorkflowExecution(ctx, wfContext, mutableState, true)
52✔
600
}
601

602
func (t *timerActiveTaskExecutor) executeActivityRetryTimerTask(
603
        ctx context.Context,
604
        task *persistence.TimerTaskInfo,
605
) (retError error) {
11✔
606

11✔
607
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
11✔
608
                task.DomainID,
11✔
609
                getWorkflowExecution(task),
11✔
610
                taskGetExecutionContextTimeout,
11✔
611
        )
11✔
612
        if err != nil {
11✔
613
                if err == context.DeadlineExceeded {
×
614
                        return errWorkflowBusy
×
615
                }
×
616
                return err
×
617
        }
618
        defer func() { release(retError) }()
22✔
619

620
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
11✔
621
        if err != nil {
11✔
622
                return err
×
623
        }
×
624
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
11✔
625
                return nil
×
626
        }
×
627

628
        // generate activity task
629
        scheduledID := task.EventID
11✔
630
        activityInfo, ok := mutableState.GetActivityInfo(scheduledID)
11✔
631
        if !ok || task.ScheduleAttempt < int64(activityInfo.Attempt) || activityInfo.StartedID != common.EmptyEventID {
12✔
632
                if ok {
2✔
633
                        t.logger.Info("Duplicate activity retry timer task",
1✔
634
                                tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowID),
1✔
635
                                tag.WorkflowRunID(mutableState.GetExecutionInfo().RunID),
1✔
636
                                tag.WorkflowDomainID(mutableState.GetExecutionInfo().DomainID),
1✔
637
                                tag.WorkflowScheduleID(activityInfo.ScheduleID),
1✔
638
                                tag.Attempt(activityInfo.Attempt),
1✔
639
                                tag.FailoverVersion(activityInfo.Version),
1✔
640
                                tag.TimerTaskStatus(activityInfo.TimerTaskStatus),
1✔
641
                                tag.ScheduleAttempt(task.ScheduleAttempt))
1✔
642
                }
1✔
643
                return nil
1✔
644
        }
645
        ok, err = verifyTaskVersion(t.shard, t.logger, task.DomainID, activityInfo.Version, task.Version, task)
10✔
646
        if err != nil || !ok {
10✔
647
                return err
×
648
        }
×
649

650
        domainID := task.DomainID
10✔
651
        targetDomainID := domainID
10✔
652
        if activityInfo.DomainID != "" {
20✔
653
                targetDomainID = activityInfo.DomainID
10✔
654
        } else {
10✔
655
                // TODO remove this block after Mar, 1th, 2020
×
656
                //  previously, DomainID in activity info is not used, so need to get
×
657
                //  schedule event from DB checking whether activity to be scheduled
×
658
                //  belongs to this domain
×
659
                scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, scheduledID)
×
660
                if err != nil {
×
661
                        return err
×
662
                }
×
663
                if scheduledEvent.ActivityTaskScheduledEventAttributes.GetDomain() != "" {
×
664
                        domainEntry, err := t.shard.GetDomainCache().GetDomain(scheduledEvent.ActivityTaskScheduledEventAttributes.GetDomain())
×
665
                        if err != nil {
×
666
                                return &types.InternalServiceError{Message: "unable to re-schedule activity across domain."}
×
667
                        }
×
668
                        targetDomainID = domainEntry.GetInfo().ID
×
669
                }
670
        }
671

672
        execution := types.WorkflowExecution{
10✔
673
                WorkflowID: task.WorkflowID,
10✔
674
                RunID:      task.RunID}
10✔
675
        taskList := &types.TaskList{
10✔
676
                Name: activityInfo.TaskList,
10✔
677
        }
10✔
678
        scheduleToStartTimeout := activityInfo.ScheduleToStartTimeout
10✔
679

10✔
680
        release(nil) // release earlier as we don't need the lock anymore
10✔
681

10✔
682
        return t.shard.GetService().GetMatchingClient().AddActivityTask(ctx, &types.AddActivityTaskRequest{
10✔
683
                DomainUUID:                    targetDomainID,
10✔
684
                SourceDomainUUID:              domainID,
10✔
685
                Execution:                     &execution,
10✔
686
                TaskList:                      taskList,
10✔
687
                ScheduleID:                    scheduledID,
10✔
688
                ScheduleToStartTimeoutSeconds: common.Int32Ptr(scheduleToStartTimeout),
10✔
689
                PartitionConfig:               mutableState.GetExecutionInfo().PartitionConfig,
10✔
690
        })
10✔
691
}
692

693
func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(
694
        ctx context.Context,
695
        task *persistence.TimerTaskInfo,
696
) (retError error) {
354✔
697

354✔
698
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
354✔
699
                task.DomainID,
354✔
700
                getWorkflowExecution(task),
354✔
701
                taskGetExecutionContextTimeout,
354✔
702
        )
354✔
703
        if err != nil {
354✔
704
                if err == context.DeadlineExceeded {
×
705
                        return errWorkflowBusy
×
706
                }
×
707
                return err
×
708
        }
709
        defer func() { release(retError) }()
708✔
710

711
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
354✔
712
        if err != nil {
354✔
713
                return err
×
714
        }
×
715
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
627✔
716
                return nil
273✔
717
        }
273✔
718

719
        startVersion, err := mutableState.GetStartVersion()
81✔
720
        if err != nil {
81✔
721
                return err
×
722
        }
×
723
        ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, startVersion, task.Version, task)
81✔
724
        if err != nil || !ok {
81✔
725
                return err
×
726
        }
×
727

728
        eventBatchFirstEventID := mutableState.GetNextEventID()
81✔
729

81✔
730
        timeoutReason := execution.TimerTypeToReason(execution.TimerTypeStartToClose)
81✔
731
        backoffInterval := mutableState.GetRetryBackoffDuration(timeoutReason)
81✔
732
        continueAsNewInitiator := types.ContinueAsNewInitiatorRetryPolicy
81✔
733
        if backoffInterval == backoff.NoBackoff {
161✔
734
                // check if a cron backoff is needed
80✔
735
                backoffInterval, err = mutableState.GetCronBackoffDuration(ctx)
80✔
736
                if err != nil {
80✔
737
                        return err
×
738
                }
×
739
                continueAsNewInitiator = types.ContinueAsNewInitiatorCronSchedule
80✔
740
        }
741
        // ignore event id
742
        isCanceled, _ := mutableState.IsCancelRequested()
81✔
743
        if isCanceled || backoffInterval == backoff.NoBackoff {
157✔
744
                if err := timeoutWorkflow(mutableState, eventBatchFirstEventID); err != nil {
76✔
745
                        return err
×
746
                }
×
747

748
                // We apply the update to execution using optimistic concurrency.  If it fails due to a conflict than reload
749
                // the history and try the operation again.
750
                return t.updateWorkflowExecution(ctx, wfContext, mutableState, false)
76✔
751
        }
752

753
        // workflow timeout, but a retry or cron is needed, so we do continue as new to retry or cron
754
        startEvent, err := mutableState.GetStartEvent(ctx)
5✔
755
        if err != nil {
5✔
756
                return err
×
757
        }
×
758

759
        startAttributes := startEvent.WorkflowExecutionStartedEventAttributes
5✔
760
        continueAsNewAttributes := &types.ContinueAsNewWorkflowExecutionDecisionAttributes{
5✔
761
                WorkflowType:                        startAttributes.WorkflowType,
5✔
762
                TaskList:                            startAttributes.TaskList,
5✔
763
                Input:                               startAttributes.Input,
5✔
764
                ExecutionStartToCloseTimeoutSeconds: startAttributes.ExecutionStartToCloseTimeoutSeconds,
5✔
765
                TaskStartToCloseTimeoutSeconds:      startAttributes.TaskStartToCloseTimeoutSeconds,
5✔
766
                BackoffStartIntervalInSeconds:       common.Int32Ptr(int32(backoffInterval.Seconds())),
5✔
767
                RetryPolicy:                         startAttributes.RetryPolicy,
5✔
768
                Initiator:                           continueAsNewInitiator.Ptr(),
5✔
769
                FailureReason:                       common.StringPtr(timeoutReason),
5✔
770
                CronSchedule:                        mutableState.GetExecutionInfo().CronSchedule,
5✔
771
                Header:                              startAttributes.Header,
5✔
772
                Memo:                                startAttributes.Memo,
5✔
773
                SearchAttributes:                    startAttributes.SearchAttributes,
5✔
774
                JitterStartSeconds:                  startAttributes.JitterStartSeconds,
5✔
775
        }
5✔
776
        newMutableState, err := retryWorkflow(
5✔
777
                ctx,
5✔
778
                mutableState,
5✔
779
                eventBatchFirstEventID,
5✔
780
                startAttributes.GetParentWorkflowDomain(),
5✔
781
                continueAsNewAttributes,
5✔
782
        )
5✔
783
        if err != nil {
5✔
784
                return err
×
785
        }
×
786

787
        newExecutionInfo := newMutableState.GetExecutionInfo()
5✔
788
        return wfContext.UpdateWorkflowExecutionWithNewAsActive(
5✔
789
                ctx,
5✔
790
                t.shard.GetTimeSource().Now(),
5✔
791
                execution.NewContext(
5✔
792
                        newExecutionInfo.DomainID,
5✔
793
                        types.WorkflowExecution{
5✔
794
                                WorkflowID: newExecutionInfo.WorkflowID,
5✔
795
                                RunID:      newExecutionInfo.RunID,
5✔
796
                        },
5✔
797
                        t.shard,
5✔
798
                        t.shard.GetExecutionManager(),
5✔
799
                        t.logger,
5✔
800
                ),
5✔
801
                newMutableState,
5✔
802
        )
5✔
803
}
804

805
func (t *timerActiveTaskExecutor) updateWorkflowExecution(
806
        ctx context.Context,
807
        wfContext execution.Context,
808
        mutableState execution.MutableState,
809
        scheduleNewDecision bool,
810
) error {
354✔
811

354✔
812
        var err error
354✔
813
        if scheduleNewDecision {
564✔
814
                // Schedule a new decision.
210✔
815
                err = execution.ScheduleDecision(mutableState)
210✔
816
                if err != nil {
210✔
817
                        return err
×
818
                }
×
819
        }
820

821
        now := t.shard.GetTimeSource().Now()
354✔
822
        err = wfContext.UpdateWorkflowExecutionAsActive(ctx, now)
354✔
823
        if err != nil {
354✔
824
                // if is shard ownership error, the shard context will stop the entire history engine
×
825
                // we don't need to explicitly stop the queue processor here
×
826
                return err
×
827
        }
×
828

829
        return nil
354✔
830
}
831

832
func (t *timerActiveTaskExecutor) emitTimeoutMetricScopeWithDomainTag(
833
        domainID string,
834
        scope int,
835
        timerType execution.TimerType,
836
        tags ...metrics.Tag,
837
) {
156✔
838
        domainTag, err := getDomainTagByID(t.shard.GetDomainCache(), domainID)
156✔
839
        if err != nil {
156✔
840
                return
×
841
        }
×
842
        tags = append(tags, domainTag)
156✔
843

156✔
844
        metricsScope := t.metricsClient.Scope(scope, tags...)
156✔
845
        switch timerType {
156✔
846
        case execution.TimerTypeScheduleToStart:
77✔
847
                metricsScope.IncCounter(metrics.ScheduleToStartTimeoutCounter)
77✔
848
        case execution.TimerTypeScheduleToClose:
3✔
849
                metricsScope.IncCounter(metrics.ScheduleToCloseTimeoutCounter)
3✔
850
        case execution.TimerTypeStartToClose:
40✔
851
                metricsScope.IncCounter(metrics.StartToCloseTimeoutCounter)
40✔
852
        case execution.TimerTypeHeartbeat:
36✔
853
                metricsScope.IncCounter(metrics.HeartbeatTimeoutCounter)
36✔
854
        }
855
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc