• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018f9f5c-2963-46a5-8e77-e52888c66415

22 May 2024 08:12AM UTC coverage: 69.202% (-0.03%) from 69.232%
018f9f5c-2963-46a5-8e77-e52888c66415

push

buildkite

web-flow
Add method to list all workflow executions with support for partial match and search params (#6017)

* try

* update

* Update dataVisibilityManagerInterfaces.go

* updated

* Update pinot_visibility_store_test.go

* Update visibility_store_mock.go

* Update es_visibility_store_test.go

* Update pinot_visibility_store.go

77 of 89 new or added lines in 4 files covered. (86.52%)

80 existing lines in 21 files now uncovered.

101937 of 147303 relevant lines covered (69.2%)

2547.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.33
/service/history/task/timer_active_task_executor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "fmt"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/backoff"
30
        "github.com/uber/cadence/common/log"
31
        "github.com/uber/cadence/common/log/tag"
32
        "github.com/uber/cadence/common/metrics"
33
        "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/types"
35
        "github.com/uber/cadence/service/history/config"
36
        "github.com/uber/cadence/service/history/execution"
37
        "github.com/uber/cadence/service/history/shard"
38
        "github.com/uber/cadence/service/worker/archiver"
39
)
40

41
const (
42
        scanWorkflowTimeout = 30 * time.Second
43
)
44

45
var (
46
        normalDecisionTypeTag = metrics.DecisionTypeTag("normal")
47
        stickyDecisionTypeTag = metrics.DecisionTypeTag("sticky")
48
)
49

50
type (
51
        timerActiveTaskExecutor struct {
52
                *timerTaskExecutorBase
53
        }
54
)
55

56
// NewTimerActiveTaskExecutor creates a new task executor for active timer task
57
func NewTimerActiveTaskExecutor(
58
        shard shard.Context,
59
        archiverClient archiver.Client,
60
        executionCache *execution.Cache,
61
        logger log.Logger,
62
        metricsClient metrics.Client,
63
        config *config.Config,
64
) Executor {
98✔
65
        return &timerActiveTaskExecutor{
98✔
66
                timerTaskExecutorBase: newTimerTaskExecutorBase(
98✔
67
                        shard,
98✔
68
                        archiverClient,
98✔
69
                        executionCache,
98✔
70
                        logger,
98✔
71
                        metricsClient,
98✔
72
                        config,
98✔
73
                ),
98✔
74
        }
98✔
75
}
98✔
76

77
func (t *timerActiveTaskExecutor) Execute(
78
        task Task,
79
        shouldProcessTask bool,
80
) error {
2,369✔
81
        timerTask, ok := task.GetInfo().(*persistence.TimerTaskInfo)
2,369✔
82
        if !ok {
2,369✔
83
                return errUnexpectedTask
×
84
        }
×
85

86
        if !shouldProcessTask {
2,372✔
87
                return nil
3✔
88
        }
3✔
89

90
        switch timerTask.TaskType {
2,366✔
91
        case persistence.TaskTypeUserTimer:
27✔
92
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
27✔
93
                defer cancel()
27✔
94
                return t.executeUserTimerTimeoutTask(ctx, timerTask)
27✔
95
        case persistence.TaskTypeActivityTimeout:
734✔
96
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
734✔
97
                defer cancel()
734✔
98
                return t.executeActivityTimeoutTask(ctx, timerTask)
734✔
99
        case persistence.TaskTypeDecisionTimeout:
1,117✔
100
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
1,117✔
101
                defer cancel()
1,117✔
102
                return t.executeDecisionTimeoutTask(ctx, timerTask)
1,117✔
103
        case persistence.TaskTypeWorkflowTimeout:
364✔
104
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
364✔
105
                defer cancel()
364✔
106
                return t.executeWorkflowTimeoutTask(ctx, timerTask)
364✔
107
        case persistence.TaskTypeActivityRetryTimer:
11✔
108
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
11✔
109
                defer cancel()
11✔
110
                return t.executeActivityRetryTimerTask(ctx, timerTask)
11✔
111
        case persistence.TaskTypeWorkflowBackoffTimer:
61✔
112
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
61✔
113
                defer cancel()
61✔
114
                return t.executeWorkflowBackoffTimerTask(ctx, timerTask)
61✔
115
        case persistence.TaskTypeDeleteHistoryEvent:
52✔
116
                // special timeout for delete history event
52✔
117
                deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(t.ctx, time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second)
52✔
118
                defer deleteHistoryEventCancel()
52✔
119
                return t.executeDeleteHistoryEventTask(deleteHistoryEventContext, timerTask)
52✔
120
        default:
×
121
                return errUnknownTimerTask
×
122
        }
123
}
124

125
func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask(
126
        ctx context.Context,
127
        task *persistence.TimerTaskInfo,
128
) (retError error) {
27✔
129
        t.logger.Debug("Processing user timer",
27✔
130
                tag.WorkflowDomainID(task.DomainID),
27✔
131
                tag.WorkflowID(task.WorkflowID),
27✔
132
                tag.WorkflowRunID(task.RunID),
27✔
133
                tag.TaskID(task.TaskID),
27✔
134
        )
27✔
135
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
27✔
136
                task.DomainID,
27✔
137
                getWorkflowExecution(task),
27✔
138
                taskGetExecutionContextTimeout,
27✔
139
        )
27✔
140
        if err != nil {
27✔
141
                if err == context.DeadlineExceeded {
×
142
                        return errWorkflowBusy
×
143
                }
×
144
                return err
×
145
        }
146
        defer func() { release(retError) }()
54✔
147

148
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
27✔
149
        if err != nil {
27✔
150
                return err
×
151
        }
×
152
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
36✔
153
                return nil
9✔
154
        }
9✔
155

156
        timerSequence := execution.NewTimerSequence(mutableState)
18✔
157
        referenceTime := t.shard.GetTimeSource().Now()
18✔
158
        resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
18✔
159
        updateMutableState := false
18✔
160
        debugLog := t.logger.Debug
18✔
161
        if t.config.EnableDebugMode && t.config.EnableTimerDebugLogByDomainID(task.DomainID) {
18✔
162
                debugLog = t.logger.Info
×
163
        }
×
164

165
        // initialized when a timer with delay >= resurrectionCheckMinDelay
166
        // is encountered, so that we don't need to scan history multiple times
167
        // where there're multiple timers with high delay
168
        var resurrectedTimer map[string]struct{}
18✔
169
        scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout)
18✔
170
        defer cancel()
18✔
171

18✔
172
        sortedUserTimers := timerSequence.LoadAndSortUserTimers()
18✔
173
        debugLog("Sorted user timers",
18✔
174
                tag.WorkflowDomainID(task.DomainID),
18✔
175
                tag.WorkflowID(task.WorkflowID),
18✔
176
                tag.WorkflowRunID(task.RunID),
18✔
177
                tag.Counter(len(sortedUserTimers)),
18✔
178
        )
18✔
179

18✔
180
Loop:
18✔
181
        for _, timerSequenceID := range sortedUserTimers {
36✔
182
                timerInfo, ok := mutableState.GetUserTimerInfoByEventID(timerSequenceID.EventID)
18✔
183
                if !ok {
18✔
184
                        errString := fmt.Sprintf("failed to find in user timer event ID: %v", timerSequenceID.EventID)
×
185
                        t.logger.Error(errString)
×
186
                        return &types.InternalServiceError{Message: errString}
×
187
                }
×
188

189
                delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID)
18✔
190
                debugLog("Processing user timer sequence id",
18✔
191
                        tag.WorkflowDomainID(task.DomainID),
18✔
192
                        tag.WorkflowID(task.WorkflowID),
18✔
193
                        tag.WorkflowRunID(task.RunID),
18✔
194
                        tag.TaskType(task.TaskType),
18✔
195
                        tag.TaskID(task.TaskID),
18✔
196
                        tag.WorkflowTimerID(timerInfo.TimerID),
18✔
197
                        tag.WorkflowScheduleID(timerInfo.StartedID),
18✔
198
                        tag.Dynamic("timer-sequence-id", timerSequenceID),
18✔
199
                        tag.Dynamic("timer-info", timerInfo),
18✔
200
                        tag.Dynamic("delay", delay),
18✔
201
                        tag.Dynamic("expired", expired),
18✔
202
                )
18✔
203

18✔
204
                if !expired {
18✔
205
                        // timer sequence IDs are sorted, once there is one timer
×
206
                        // sequence ID not expired, all after that wil not expired
×
207
                        break Loop
×
208
                }
209

210
                if delay >= resurrectionCheckMinDelay || resurrectedTimer != nil {
20✔
211
                        if resurrectedTimer == nil {
3✔
212
                                // overwrite the context here as scan history may take a long time to complete
1✔
213
                                // ctx will also be used by other operations like updateWorkflow
1✔
214
                                ctx = scanWorkflowCtx
1✔
215
                                resurrectedTimer, err = execution.GetResurrectedTimers(ctx, t.shard, mutableState)
1✔
216
                                if err != nil {
1✔
217
                                        t.logger.Error("Timer resurrection check failed", tag.Error(err))
×
218
                                        return err
×
219
                                }
×
220
                        }
221

222
                        if _, ok := resurrectedTimer[timerInfo.TimerID]; ok {
3✔
223
                                // found timer resurrection
1✔
224
                                domainName := mutableState.GetDomainEntry().GetInfo().Name
1✔
225
                                t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.TimerResurrectionCounter)
1✔
226
                                t.logger.Warn("Encounter resurrected timer, skip",
1✔
227
                                        tag.WorkflowDomainID(task.DomainID),
1✔
228
                                        tag.WorkflowID(task.WorkflowID),
1✔
229
                                        tag.WorkflowRunID(task.RunID),
1✔
230
                                        tag.TaskType(task.TaskType),
1✔
231
                                        tag.TaskID(task.TaskID),
1✔
232
                                        tag.WorkflowTimerID(timerInfo.TimerID),
1✔
233
                                        tag.WorkflowScheduleID(timerInfo.StartedID), // timerStartedEvent is basically scheduled event
1✔
234
                                )
1✔
235

1✔
236
                                // remove resurrected timer from mutable state
1✔
237
                                if err := mutableState.DeleteUserTimer(timerInfo.TimerID); err != nil {
1✔
238
                                        return err
×
239
                                }
×
240
                                updateMutableState = true
1✔
241
                                continue Loop
1✔
242
                        }
243
                }
244

245
                if _, err := mutableState.AddTimerFiredEvent(timerInfo.TimerID); err != nil {
17✔
246
                        return err
×
247
                }
×
248
                updateMutableState = true
17✔
249

17✔
250
                debugLog("User timer fired",
17✔
251
                        tag.WorkflowDomainID(task.DomainID),
17✔
252
                        tag.WorkflowID(task.WorkflowID),
17✔
253
                        tag.WorkflowRunID(task.RunID),
17✔
254
                        tag.TaskType(task.TaskType),
17✔
255
                        tag.TaskID(task.TaskID),
17✔
256
                        tag.WorkflowTimerID(timerInfo.TimerID),
17✔
257
                        tag.WorkflowScheduleID(timerInfo.StartedID),
17✔
258
                        tag.WorkflowNextEventID(mutableState.GetNextEventID()),
17✔
259
                )
17✔
260

261
        }
262

263
        if !updateMutableState {
19✔
264
                return nil
1✔
265
        }
1✔
266

267
        return t.updateWorkflowExecution(ctx, wfContext, mutableState, updateMutableState)
17✔
268
}
269

270
func (t *timerActiveTaskExecutor) executeActivityTimeoutTask(
271
        ctx context.Context,
272
        task *persistence.TimerTaskInfo,
273
) (retError error) {
734✔
274

734✔
275
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
734✔
276
                task.DomainID,
734✔
277
                getWorkflowExecution(task),
734✔
278
                taskGetExecutionContextTimeout,
734✔
279
        )
734✔
280
        if err != nil {
734✔
UNCOV
281
                if err == context.DeadlineExceeded {
×
UNCOV
282
                        return errWorkflowBusy
×
UNCOV
283
                }
×
284
                return err
×
285
        }
286
        defer func() { release(retError) }()
1,468✔
287

288
        domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
734✔
289
        if err != nil {
734✔
290
                return fmt.Errorf("unable to find domainID: %v, err: %v", task.DomainID, err)
×
291
        }
×
292

293
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
734✔
294
        if err != nil {
734✔
295
                return err
×
296
        }
×
297
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
1,236✔
298
                return nil
502✔
299
        }
502✔
300

301
        wfType := mutableState.GetWorkflowType()
232✔
302
        if wfType == nil {
232✔
303
                return fmt.Errorf("unable to find workflow type, task %s", task)
×
304
        }
×
305

306
        timerSequence := execution.NewTimerSequence(mutableState)
232✔
307
        referenceTime := t.shard.GetTimeSource().Now()
232✔
308
        resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
232✔
309
        updateMutableState := false
232✔
310
        scheduleDecision := false
232✔
311

232✔
312
        // initialized when an activity timer with delay >= resurrectionCheckMinDelay
232✔
313
        // is encountered, so that we don't need to scan history multiple times
232✔
314
        // where there're multiple timers with high delay
232✔
315
        var resurrectedActivity map[int64]struct{}
232✔
316
        scanWorkflowCtx, cancel := context.WithTimeout(t.ctx, scanWorkflowTimeout)
232✔
317
        defer cancel()
232✔
318

232✔
319
        // need to clear activity heartbeat timer task mask for new activity timer task creation
232✔
320
        // NOTE: LastHeartbeatTimeoutVisibilityInSeconds is for deduping heartbeat timer creation as it's possible
232✔
321
        // one heartbeat task was persisted multiple times with different taskIDs due to the retry logic
232✔
322
        // for updating workflow execution. In that case, only one new heartbeat timeout task should be
232✔
323
        // created.
232✔
324
        isHeartBeatTask := task.TimeoutType == int(types.TimeoutTypeHeartbeat)
232✔
325
        activityInfo, ok := mutableState.GetActivityInfo(task.EventID)
232✔
326
        if isHeartBeatTask && ok && activityInfo.LastHeartbeatTimeoutVisibilityInSeconds <= task.VisibilityTimestamp.Unix() {
314✔
327
                activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ execution.TimerTaskStatusCreatedHeartbeat
82✔
328
                if err := mutableState.UpdateActivity(activityInfo); err != nil {
82✔
329
                        return err
×
330
                }
×
331
                updateMutableState = true
82✔
332
        }
333

334
Loop:
232✔
335
        for _, timerSequenceID := range timerSequence.LoadAndSortActivityTimers() {
526✔
336
                activityInfo, ok := mutableState.GetActivityInfo(timerSequenceID.EventID)
294✔
337
                if !ok || timerSequenceID.Attempt < activityInfo.Attempt {
380✔
338
                        // handle 2 cases:
86✔
339
                        // 1. !ok
86✔
340
                        //  this case can happen since each activity can have 4 timers
86✔
341
                        //  and one of those 4 timers may have fired in this loop
86✔
342
                        // 2. timerSequenceID.attempt < activityInfo.Attempt
86✔
343
                        //  retry could update activity attempt, should not timeouts new attempt
86✔
344
                        // 3. it's a resurrected activity and has already been deleted in this loop
86✔
345
                        continue Loop
86✔
346
                }
347

348
                delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID)
208✔
349
                if !expired {
309✔
350
                        // timer sequence IDs are sorted, once there is one timer
101✔
351
                        // sequence ID not expired, all after that wil not expired
101✔
352
                        break Loop
101✔
353
                }
354

355
                if delay >= resurrectionCheckMinDelay || resurrectedActivity != nil {
109✔
356
                        if resurrectedActivity == nil {
3✔
357
                                // overwrite the context here as scan history may take a long time to complete
1✔
358
                                // ctx will also be used by other operations like updateWorkflow
1✔
359
                                ctx = scanWorkflowCtx
1✔
360
                                resurrectedActivity, err = execution.GetResurrectedActivities(ctx, t.shard, mutableState)
1✔
361
                                if err != nil {
1✔
362
                                        t.logger.Error("Activity resurrection check failed", tag.Error(err))
×
363
                                        return err
×
364
                                }
×
365
                        }
366

367
                        if _, ok := resurrectedActivity[activityInfo.ScheduleID]; ok {
3✔
368
                                // found activity resurrection
1✔
369
                                domainName := mutableState.GetDomainEntry().GetInfo().Name
1✔
370
                                t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.ActivityResurrectionCounter)
1✔
371
                                t.logger.Warn("Encounter resurrected activity, skip",
1✔
372
                                        tag.WorkflowDomainID(task.DomainID),
1✔
373
                                        tag.WorkflowID(task.WorkflowID),
1✔
374
                                        tag.WorkflowRunID(task.RunID),
1✔
375
                                        tag.TaskType(task.TaskType),
1✔
376
                                        tag.TaskID(task.TaskID),
1✔
377
                                        tag.WorkflowActivityID(activityInfo.ActivityID),
1✔
378
                                        tag.WorkflowScheduleID(activityInfo.ScheduleID),
1✔
379
                                )
1✔
380

1✔
381
                                // remove resurrected activity from mutable state
1✔
382
                                if err := mutableState.DeleteActivity(activityInfo.ScheduleID); err != nil {
1✔
383
                                        return err
×
384
                                }
×
385
                                updateMutableState = true
1✔
386
                                continue Loop
1✔
387
                        }
388
                }
389

390
                // check if it's possible that the timeout is due to activity task lost
391
                if timerSequenceID.TimerType == execution.TimerTypeScheduleToStart {
166✔
392
                        domainName, err := t.shard.GetDomainCache().GetDomainName(mutableState.GetExecutionInfo().DomainID)
60✔
393
                        if err == nil && activityInfo.ScheduleToStartTimeout >= int32(t.config.ActivityMaxScheduleToStartTimeoutForRetry(domainName).Seconds()) {
60✔
394
                                // note that we ignore the race condition for the dynamic config value change here as it's only for metric and logging purpose.
×
395
                                // theoratically the check only applies to activities with retry policy
×
396
                                // however for activities without retry policy, we also want to check the potential task lost and emit the metric
×
397
                                // so reuse the same config value as a threshold so that the metric only got emitted if the activity has been started after a long time.
×
398
                                t.metricsClient.Scope(metrics.TimerActiveTaskActivityTimeoutScope, metrics.DomainTag(domainName)).IncCounter(metrics.ActivityLostCounter)
×
399
                                t.logger.Warn("Potentially activity task lost",
×
400
                                        tag.WorkflowDomainName(domainName),
×
401
                                        tag.WorkflowID(task.WorkflowID),
×
402
                                        tag.WorkflowRunID(task.RunID),
×
403
                                        tag.WorkflowScheduleID(activityInfo.ScheduleID),
×
404
                                )
×
405
                        }
×
406
                }
407

408
                if ok, err := mutableState.RetryActivity(
106✔
409
                        activityInfo,
106✔
410
                        execution.TimerTypeToReason(timerSequenceID.TimerType),
106✔
411
                        nil,
106✔
412
                ); err != nil {
106✔
413
                        return err
×
414
                } else if ok {
111✔
415
                        updateMutableState = true
5✔
416
                        continue Loop
5✔
417
                }
418

419
                t.emitTimeoutMetricScopeWithDomainTag(
101✔
420
                        mutableState.GetExecutionInfo().DomainID,
101✔
421
                        metrics.TimerActiveTaskActivityTimeoutScope,
101✔
422
                        timerSequenceID.TimerType,
101✔
423
                        metrics.WorkflowTypeTag(wfType.GetName()),
101✔
424
                )
101✔
425

101✔
426
                t.logger.Info("Activity timed out",
101✔
427
                        tag.WorkflowDomainName(domainName),
101✔
428
                        tag.WorkflowDomainID(task.GetDomainID()),
101✔
429
                        tag.WorkflowID(task.GetWorkflowID()),
101✔
430
                        tag.WorkflowRunID(task.GetRunID()),
101✔
431
                        tag.ScheduleAttempt(task.ScheduleAttempt),
101✔
432
                        tag.FailoverVersion(task.GetVersion()),
101✔
433
                )
101✔
434

101✔
435
                if _, err := mutableState.AddActivityTaskTimedOutEvent(
101✔
436
                        activityInfo.ScheduleID,
101✔
437
                        activityInfo.StartedID,
101✔
438
                        execution.TimerTypeToInternal(timerSequenceID.TimerType),
101✔
439
                        activityInfo.Details,
101✔
440
                ); err != nil {
101✔
441
                        return err
×
442
                }
×
443
                updateMutableState = true
101✔
444
                scheduleDecision = true
101✔
445
        }
446

447
        if !updateMutableState {
316✔
448
                return nil
84✔
449
        }
84✔
450
        return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
148✔
451
}
452

453
func (t *timerActiveTaskExecutor) executeDecisionTimeoutTask(
454
        ctx context.Context,
455
        task *persistence.TimerTaskInfo,
456
) (retError error) {
1,117✔
457

1,117✔
458
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
1,117✔
459
                task.DomainID,
1,117✔
460
                getWorkflowExecution(task),
1,117✔
461
                taskGetExecutionContextTimeout,
1,117✔
462
        )
1,117✔
463
        if err != nil {
1,117✔
464
                if err == context.DeadlineExceeded {
×
465
                        return errWorkflowBusy
×
466
                }
×
467
                return err
×
468
        }
469
        defer func() { release(retError) }()
2,234✔
470

471
        domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
1,117✔
472
        if err != nil {
1,117✔
473
                return fmt.Errorf("unable to find domainID: %v, err: %v", task.DomainID, err)
×
474
        }
×
475

476
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
1,117✔
477
        if err != nil {
1,117✔
478
                return err
×
479
        }
×
480
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
1,890✔
481
                return nil
773✔
482
        }
773✔
483

484
        wfType := mutableState.GetWorkflowType()
344✔
485
        if wfType == nil {
344✔
486
                return fmt.Errorf("unable to find workflow type, task %s", task)
×
487
        }
×
488

489
        scheduleID := task.EventID
344✔
490
        decision, ok := mutableState.GetDecisionInfo(scheduleID)
344✔
491
        if !ok {
633✔
492
                t.logger.Debug("Potentially duplicate", tag.TaskID(task.TaskID), tag.WorkflowScheduleID(scheduleID), tag.TaskType(persistence.TaskTypeDecisionTimeout))
289✔
493
                return nil
289✔
494
        }
289✔
495
        ok, err = verifyTaskVersion(t.shard, t.logger, task.DomainID, decision.Version, task.Version, task)
55✔
496
        if err != nil || !ok {
55✔
497
                return err
×
498
        }
×
499

500
        if decision.Attempt != task.ScheduleAttempt {
55✔
501
                return nil
×
502
        }
×
503

504
        scheduleDecision := false
55✔
505
        isStickyDecision := mutableState.GetExecutionInfo().StickyTaskList != ""
55✔
506
        decisionTypeTag := normalDecisionTypeTag
55✔
507
        if isStickyDecision {
86✔
508
                decisionTypeTag = stickyDecisionTypeTag
31✔
509
        }
31✔
510
        tags := []metrics.Tag{metrics.WorkflowTypeTag(wfType.GetName()), decisionTypeTag}
55✔
511
        switch execution.TimerTypeFromInternal(types.TimeoutType(task.TimeoutType)) {
55✔
512
        case execution.TimerTypeStartToClose:
37✔
513
                t.emitTimeoutMetricScopeWithDomainTag(
37✔
514
                        mutableState.GetExecutionInfo().DomainID,
37✔
515
                        metrics.TimerActiveTaskDecisionTimeoutScope,
37✔
516
                        execution.TimerTypeStartToClose,
37✔
517
                        tags...,
37✔
518
                )
37✔
519
                if _, err := mutableState.AddDecisionTaskTimedOutEvent(
37✔
520
                        decision.ScheduleID,
37✔
521
                        decision.StartedID,
37✔
522
                ); err != nil {
37✔
523
                        return err
×
524
                }
×
525
                scheduleDecision = true
37✔
526

527
        case execution.TimerTypeScheduleToStart:
18✔
528
                if decision.StartedID != common.EmptyEventID {
18✔
529
                        // decision has already started
×
530
                        return nil
×
531
                }
×
532

533
                if !isStickyDecision {
23✔
534
                        t.logger.Warn("Potential lost normal decision task",
5✔
535
                                tag.WorkflowDomainName(domainName),
5✔
536
                                tag.WorkflowDomainID(task.GetDomainID()),
5✔
537
                                tag.WorkflowID(task.GetWorkflowID()),
5✔
538
                                tag.WorkflowRunID(task.GetRunID()),
5✔
539
                                tag.WorkflowScheduleID(scheduleID),
5✔
540
                                tag.ScheduleAttempt(task.ScheduleAttempt),
5✔
541
                                tag.FailoverVersion(task.GetVersion()),
5✔
542
                        )
5✔
543
                }
5✔
544

545
                t.emitTimeoutMetricScopeWithDomainTag(
18✔
546
                        mutableState.GetExecutionInfo().DomainID,
18✔
547
                        metrics.TimerActiveTaskDecisionTimeoutScope,
18✔
548
                        execution.TimerTypeScheduleToStart,
18✔
549
                        tags...,
18✔
550
                )
18✔
551
                _, err := mutableState.AddDecisionTaskScheduleToStartTimeoutEvent(scheduleID)
18✔
552
                if err != nil {
18✔
553
                        return err
×
554
                }
×
555
                scheduleDecision = true
18✔
556
        }
557

558
        return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
55✔
559
}
560

561
func (t *timerActiveTaskExecutor) executeWorkflowBackoffTimerTask(
562
        ctx context.Context,
563
        task *persistence.TimerTaskInfo,
564
) (retError error) {
61✔
565

61✔
566
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
61✔
567
                task.DomainID,
61✔
568
                getWorkflowExecution(task),
61✔
569
                taskGetExecutionContextTimeout,
61✔
570
        )
61✔
571
        if err != nil {
61✔
572
                if err == context.DeadlineExceeded {
×
573
                        return errWorkflowBusy
×
574
                }
×
575
                return err
×
576
        }
577
        defer func() { release(retError) }()
122✔
578

579
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
61✔
580
        if err != nil {
61✔
581
                return err
×
582
        }
×
583
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
67✔
584
                return nil
6✔
585
        }
6✔
586

587
        if task.TimeoutType == persistence.WorkflowBackoffTimeoutTypeRetry {
78✔
588
                t.metricsClient.IncCounter(metrics.TimerActiveTaskWorkflowBackoffTimerScope, metrics.WorkflowRetryBackoffTimerCount)
23✔
589
        } else {
55✔
590
                t.metricsClient.IncCounter(metrics.TimerActiveTaskWorkflowBackoffTimerScope, metrics.WorkflowCronBackoffTimerCount)
32✔
591
        }
32✔
592

593
        if mutableState.HasProcessedOrPendingDecision() {
56✔
594
                // already has decision task
1✔
595
                return nil
1✔
596
        }
1✔
597

598
        // schedule first decision task
599
        return t.updateWorkflowExecution(ctx, wfContext, mutableState, true)
54✔
600
}
601

602
func (t *timerActiveTaskExecutor) executeActivityRetryTimerTask(
603
        ctx context.Context,
604
        task *persistence.TimerTaskInfo,
605
) (retError error) {
11✔
606

11✔
607
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
11✔
608
                task.DomainID,
11✔
609
                getWorkflowExecution(task),
11✔
610
                taskGetExecutionContextTimeout,
11✔
611
        )
11✔
612
        if err != nil {
11✔
613
                if err == context.DeadlineExceeded {
×
614
                        return errWorkflowBusy
×
615
                }
×
616
                return err
×
617
        }
618
        defer func() { release(retError) }()
22✔
619

620
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
11✔
621
        if err != nil {
11✔
622
                return err
×
623
        }
×
624
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
11✔
625
                return nil
×
626
        }
×
627

628
        // generate activity task
629
        scheduledID := task.EventID
11✔
630
        activityInfo, ok := mutableState.GetActivityInfo(scheduledID)
11✔
631
        if !ok || task.ScheduleAttempt < int64(activityInfo.Attempt) || activityInfo.StartedID != common.EmptyEventID {
12✔
632
                if ok {
2✔
633
                        t.logger.Info("Duplicate activity retry timer task",
1✔
634
                                tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowID),
1✔
635
                                tag.WorkflowRunID(mutableState.GetExecutionInfo().RunID),
1✔
636
                                tag.WorkflowDomainID(mutableState.GetExecutionInfo().DomainID),
1✔
637
                                tag.WorkflowScheduleID(activityInfo.ScheduleID),
1✔
638
                                tag.Attempt(activityInfo.Attempt),
1✔
639
                                tag.FailoverVersion(activityInfo.Version),
1✔
640
                                tag.TimerTaskStatus(activityInfo.TimerTaskStatus),
1✔
641
                                tag.ScheduleAttempt(task.ScheduleAttempt))
1✔
642
                }
1✔
643
                return nil
1✔
644
        }
645
        ok, err = verifyTaskVersion(t.shard, t.logger, task.DomainID, activityInfo.Version, task.Version, task)
10✔
646
        if err != nil || !ok {
10✔
647
                return err
×
648
        }
×
649

650
        domainID := task.DomainID
10✔
651
        targetDomainID := domainID
10✔
652
        if activityInfo.DomainID != "" {
20✔
653
                targetDomainID = activityInfo.DomainID
10✔
654
        } else {
10✔
655
                // TODO remove this block after Mar, 1th, 2020
×
656
                //  previously, DomainID in activity info is not used, so need to get
×
657
                //  schedule event from DB checking whether activity to be scheduled
×
658
                //  belongs to this domain
×
659
                scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, scheduledID)
×
660
                if err != nil {
×
661
                        return err
×
662
                }
×
663
                if scheduledEvent.ActivityTaskScheduledEventAttributes.GetDomain() != "" {
×
664
                        domainEntry, err := t.shard.GetDomainCache().GetDomain(scheduledEvent.ActivityTaskScheduledEventAttributes.GetDomain())
×
665
                        if err != nil {
×
666
                                return &types.InternalServiceError{Message: "unable to re-schedule activity across domain."}
×
667
                        }
×
668
                        targetDomainID = domainEntry.GetInfo().ID
×
669
                }
670
        }
671

672
        execution := types.WorkflowExecution{
10✔
673
                WorkflowID: task.WorkflowID,
10✔
674
                RunID:      task.RunID}
10✔
675
        taskList := &types.TaskList{
10✔
676
                Name: activityInfo.TaskList,
10✔
677
        }
10✔
678
        scheduleToStartTimeout := activityInfo.ScheduleToStartTimeout
10✔
679

10✔
680
        release(nil) // release earlier as we don't need the lock anymore
10✔
681

10✔
682
        return t.shard.GetService().GetMatchingClient().AddActivityTask(ctx, &types.AddActivityTaskRequest{
10✔
683
                DomainUUID:                    targetDomainID,
10✔
684
                SourceDomainUUID:              domainID,
10✔
685
                Execution:                     &execution,
10✔
686
                TaskList:                      taskList,
10✔
687
                ScheduleID:                    scheduledID,
10✔
688
                ScheduleToStartTimeoutSeconds: common.Int32Ptr(scheduleToStartTimeout),
10✔
689
                PartitionConfig:               mutableState.GetExecutionInfo().PartitionConfig,
10✔
690
        })
10✔
691
}
692

693
func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(
694
        ctx context.Context,
695
        task *persistence.TimerTaskInfo,
696
) (retError error) {
364✔
697

364✔
698
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
364✔
699
                task.DomainID,
364✔
700
                getWorkflowExecution(task),
364✔
701
                taskGetExecutionContextTimeout,
364✔
702
        )
364✔
703
        if err != nil {
364✔
704
                if err == context.DeadlineExceeded {
×
705
                        return errWorkflowBusy
×
706
                }
×
707
                return err
×
708
        }
709
        defer func() { release(retError) }()
728✔
710

711
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
364✔
712
        if err != nil {
364✔
713
                return err
×
714
        }
×
715
        if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
645✔
716
                return nil
281✔
717
        }
281✔
718

719
        startVersion, err := mutableState.GetStartVersion()
83✔
720
        if err != nil {
83✔
721
                return err
×
722
        }
×
723
        ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, startVersion, task.Version, task)
83✔
724
        if err != nil || !ok {
83✔
725
                return err
×
726
        }
×
727

728
        eventBatchFirstEventID := mutableState.GetNextEventID()
83✔
729

83✔
730
        timeoutReason := execution.TimerTypeToReason(execution.TimerTypeStartToClose)
83✔
731
        backoffInterval := mutableState.GetRetryBackoffDuration(timeoutReason)
83✔
732
        continueAsNewInitiator := types.ContinueAsNewInitiatorRetryPolicy
83✔
733
        if backoffInterval == backoff.NoBackoff {
165✔
734
                // check if a cron backoff is needed
82✔
735
                backoffInterval, err = mutableState.GetCronBackoffDuration(ctx)
82✔
736
                if err != nil {
82✔
737
                        return err
×
738
                }
×
739
                continueAsNewInitiator = types.ContinueAsNewInitiatorCronSchedule
82✔
740
        }
741
        // ignore event id
742
        isCanceled, _ := mutableState.IsCancelRequested()
83✔
743
        if isCanceled || backoffInterval == backoff.NoBackoff {
161✔
744
                if err := timeoutWorkflow(mutableState, eventBatchFirstEventID); err != nil {
78✔
745
                        return err
×
746
                }
×
747

748
                // We apply the update to execution using optimistic concurrency.  If it fails due to a conflict than reload
749
                // the history and try the operation again.
750
                return t.updateWorkflowExecution(ctx, wfContext, mutableState, false)
78✔
751
        }
752

753
        // workflow timeout, but a retry or cron is needed, so we do continue as new to retry or cron
754
        startEvent, err := mutableState.GetStartEvent(ctx)
5✔
755
        if err != nil {
5✔
756
                return err
×
757
        }
×
758

759
        startAttributes := startEvent.WorkflowExecutionStartedEventAttributes
5✔
760
        continueAsNewAttributes := &types.ContinueAsNewWorkflowExecutionDecisionAttributes{
5✔
761
                WorkflowType:                        startAttributes.WorkflowType,
5✔
762
                TaskList:                            startAttributes.TaskList,
5✔
763
                Input:                               startAttributes.Input,
5✔
764
                ExecutionStartToCloseTimeoutSeconds: startAttributes.ExecutionStartToCloseTimeoutSeconds,
5✔
765
                TaskStartToCloseTimeoutSeconds:      startAttributes.TaskStartToCloseTimeoutSeconds,
5✔
766
                BackoffStartIntervalInSeconds:       common.Int32Ptr(int32(backoffInterval.Seconds())),
5✔
767
                RetryPolicy:                         startAttributes.RetryPolicy,
5✔
768
                Initiator:                           continueAsNewInitiator.Ptr(),
5✔
769
                FailureReason:                       common.StringPtr(timeoutReason),
5✔
770
                CronSchedule:                        mutableState.GetExecutionInfo().CronSchedule,
5✔
771
                Header:                              startAttributes.Header,
5✔
772
                Memo:                                startAttributes.Memo,
5✔
773
                SearchAttributes:                    startAttributes.SearchAttributes,
5✔
774
                JitterStartSeconds:                  startAttributes.JitterStartSeconds,
5✔
775
        }
5✔
776
        newMutableState, err := retryWorkflow(
5✔
777
                ctx,
5✔
778
                mutableState,
5✔
779
                eventBatchFirstEventID,
5✔
780
                startAttributes.GetParentWorkflowDomain(),
5✔
781
                continueAsNewAttributes,
5✔
782
        )
5✔
783
        if err != nil {
5✔
784
                return err
×
785
        }
×
786

787
        newExecutionInfo := newMutableState.GetExecutionInfo()
5✔
788
        return wfContext.UpdateWorkflowExecutionWithNewAsActive(
5✔
789
                ctx,
5✔
790
                t.shard.GetTimeSource().Now(),
5✔
791
                execution.NewContext(
5✔
792
                        newExecutionInfo.DomainID,
5✔
793
                        types.WorkflowExecution{
5✔
794
                                WorkflowID: newExecutionInfo.WorkflowID,
5✔
795
                                RunID:      newExecutionInfo.RunID,
5✔
796
                        },
5✔
797
                        t.shard,
5✔
798
                        t.shard.GetExecutionManager(),
5✔
799
                        t.logger,
5✔
800
                ),
5✔
801
                newMutableState,
5✔
802
        )
5✔
803
}
804

805
func (t *timerActiveTaskExecutor) updateWorkflowExecution(
806
        ctx context.Context,
807
        wfContext execution.Context,
808
        mutableState execution.MutableState,
809
        scheduleNewDecision bool,
810
) error {
352✔
811

352✔
812
        var err error
352✔
813
        if scheduleNewDecision {
563✔
814
                // Schedule a new decision.
211✔
815
                err = execution.ScheduleDecision(mutableState)
211✔
816
                if err != nil {
211✔
817
                        return err
×
818
                }
×
819
        }
820

821
        now := t.shard.GetTimeSource().Now()
352✔
822
        err = wfContext.UpdateWorkflowExecutionAsActive(ctx, now)
352✔
823
        if err != nil {
352✔
824
                // if is shard ownership error, the shard context will stop the entire history engine
×
825
                // we don't need to explicitly stop the queue processor here
×
826
                return err
×
827
        }
×
828

829
        return nil
352✔
830
}
831

832
func (t *timerActiveTaskExecutor) emitTimeoutMetricScopeWithDomainTag(
833
        domainID string,
834
        scope int,
835
        timerType execution.TimerType,
836
        tags ...metrics.Tag,
837
) {
156✔
838
        domainTag, err := getDomainTagByID(t.shard.GetDomainCache(), domainID)
156✔
839
        if err != nil {
156✔
840
                return
×
841
        }
×
842
        tags = append(tags, domainTag)
156✔
843

156✔
844
        metricsScope := t.metricsClient.Scope(scope, tags...)
156✔
845
        switch timerType {
156✔
846
        case execution.TimerTypeScheduleToStart:
77✔
847
                metricsScope.IncCounter(metrics.ScheduleToStartTimeoutCounter)
77✔
848
        case execution.TimerTypeScheduleToClose:
3✔
849
                metricsScope.IncCounter(metrics.ScheduleToCloseTimeoutCounter)
3✔
850
        case execution.TimerTypeStartToClose:
40✔
851
                metricsScope.IncCounter(metrics.StartToCloseTimeoutCounter)
40✔
852
        case execution.TimerTypeHeartbeat:
36✔
853
                metricsScope.IncCounter(metrics.HeartbeatTimeoutCounter)
36✔
854
        }
855
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc