• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01907299-3fb0-4ecc-b0ad-df71b17f5ddc

02 Jul 2024 08:39AM UTC coverage: 71.517% (-0.005%) from 71.522%
01907299-3fb0-4ecc-b0ad-df71b17f5ddc

Pull #5926

buildkite

mantas-sidlauskas
Add peer provider plugin registration
Pull Request #5926: Add peer provider plugin registration

20 of 22 new or added lines in 1 file covered. (90.91%)

36 existing lines in 12 files now uncovered.

105326 of 147274 relevant lines covered (71.52%)

2598.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.63
/service/history/task/timer_standby_task_executor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "time"
28

29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/log/tag"
31
        "github.com/uber/cadence/common/metrics"
32
        "github.com/uber/cadence/common/ndc"
33
        "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/types"
35
        "github.com/uber/cadence/service/history/config"
36
        "github.com/uber/cadence/service/history/execution"
37
        "github.com/uber/cadence/service/history/shard"
38
        "github.com/uber/cadence/service/worker/archiver"
39
)
40

41
var (
42
        errUnexpectedTask   = errors.New("unexpected task")
43
        errUnknownTimerTask = errors.New("unknown timer task")
44
)
45

46
type (
47
        timerStandbyTaskExecutor struct {
48
                *timerTaskExecutorBase
49

50
                clusterName     string
51
                historyResender ndc.HistoryResender
52
        }
53
)
54

55
// NewTimerStandbyTaskExecutor creates a new task executor for standby timer task
56
func NewTimerStandbyTaskExecutor(
57
        shard shard.Context,
58
        archiverClient archiver.Client,
59
        executionCache execution.Cache,
60
        historyResender ndc.HistoryResender,
61
        logger log.Logger,
62
        metricsClient metrics.Client,
63
        clusterName string,
64
        config *config.Config,
65
) Executor {
91✔
66
        return &timerStandbyTaskExecutor{
91✔
67
                timerTaskExecutorBase: newTimerTaskExecutorBase(
91✔
68
                        shard,
91✔
69
                        archiverClient,
91✔
70
                        executionCache,
91✔
71
                        logger,
91✔
72
                        metricsClient,
91✔
73
                        config,
91✔
74
                ),
91✔
75
                clusterName:     clusterName,
91✔
76
                historyResender: historyResender,
91✔
77
        }
91✔
78
}
91✔
79

80
func (t *timerStandbyTaskExecutor) Execute(
81
        task Task,
82
        shouldProcessTask bool,
83
) error {
29✔
84

29✔
85
        timerTask, ok := task.GetInfo().(*persistence.TimerTaskInfo)
29✔
86
        if !ok {
29✔
87
                return errUnexpectedTask
×
88
        }
×
89

90
        if !shouldProcessTask {
32✔
91
                return nil
3✔
92
        }
3✔
93

94
        switch timerTask.TaskType {
29✔
95
        case persistence.TaskTypeUserTimer:
5✔
96
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
5✔
97
                defer cancel()
5✔
98
                return t.executeUserTimerTimeoutTask(ctx, timerTask)
5✔
99
        case persistence.TaskTypeActivityTimeout:
9✔
100
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
9✔
101
                defer cancel()
9✔
102
                return t.executeActivityTimeoutTask(ctx, timerTask)
9✔
103
        case persistence.TaskTypeDecisionTimeout:
8✔
104
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
8✔
105
                defer cancel()
8✔
106
                return t.executeDecisionTimeoutTask(ctx, timerTask)
8✔
107
        case persistence.TaskTypeWorkflowTimeout:
7✔
108
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
7✔
109
                defer cancel()
7✔
110
                return t.executeWorkflowTimeoutTask(ctx, timerTask)
7✔
111
        case persistence.TaskTypeActivityRetryTimer:
1✔
112
                // retry backoff timer should not get created on passive cluster
1✔
113
                // TODO: add error logs
1✔
114
                return nil
1✔
115
        case persistence.TaskTypeWorkflowBackoffTimer:
7✔
116
                ctx, cancel := context.WithTimeout(t.ctx, taskDefaultTimeout)
7✔
117
                defer cancel()
7✔
118
                return t.executeWorkflowBackoffTimerTask(ctx, timerTask)
7✔
119
        case persistence.TaskTypeDeleteHistoryEvent:
4✔
120
                // special timeout for delete history event
4✔
121
                deleteHistoryEventContext, deleteHistoryEventCancel := context.WithTimeout(t.ctx, time.Duration(t.config.DeleteHistoryEventContextTimeout())*time.Second)
4✔
122
                defer deleteHistoryEventCancel()
4✔
123
                return t.executeDeleteHistoryEventTask(deleteHistoryEventContext, timerTask)
4✔
124
        default:
×
125
                return errUnknownTimerTask
×
126
        }
127
}
128

129
func (t *timerStandbyTaskExecutor) executeUserTimerTimeoutTask(
130
        ctx context.Context,
131
        timerTask *persistence.TimerTaskInfo,
132
) error {
5✔
133

5✔
134
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
10✔
135

5✔
136
                timerSequence := execution.NewTimerSequence(mutableState)
5✔
137

5✔
138
        Loop:
5✔
139
                for _, timerSequenceID := range timerSequence.LoadAndSortUserTimers() {
9✔
140
                        _, ok := mutableState.GetUserTimerInfoByEventID(timerSequenceID.EventID)
4✔
141
                        if !ok {
4✔
142
                                errString := fmt.Sprintf("failed to find in user timer event ID: %v", timerSequenceID.EventID)
×
143
                                t.logger.Error(errString)
×
144
                                return nil, &types.InternalServiceError{Message: errString}
×
145
                        }
×
146

147
                        if _, isExpired := timerSequence.IsExpired(
4✔
148
                                timerTask.VisibilityTimestamp,
4✔
149
                                timerSequenceID,
4✔
150
                        ); isExpired {
7✔
151
                                return getHistoryResendInfo(mutableState)
3✔
152
                        }
3✔
153
                        // since the user timer are already sorted, so if there is one timer which will not expired
154
                        // all user timer after this timer will not expired
155
                        break Loop //nolint:staticcheck
1✔
156
                }
157
                // if there is no user timer expired, then we are good
158
                return nil, nil
2✔
159
        }
160

161
        return t.processTimer(
5✔
162
                ctx,
5✔
163
                timerTask,
5✔
164
                actionFn,
5✔
165
                getStandbyPostActionFn(
5✔
166
                        timerTask,
5✔
167
                        t.getCurrentTime,
5✔
168
                        t.config.StandbyTaskMissingEventsResendDelay(),
5✔
169
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
5✔
170
                        t.fetchHistoryFromRemote,
5✔
171
                        standbyTimerTaskPostActionTaskDiscarded,
5✔
172
                ),
5✔
173
        )
5✔
174
}
175

176
func (t *timerStandbyTaskExecutor) executeActivityTimeoutTask(
177
        ctx context.Context,
178
        timerTask *persistence.TimerTaskInfo,
179
) error {
9✔
180

9✔
181
        // activity heartbeat timer task is a special snowflake.
9✔
182
        // normal activity timer task on the passive side will be generated by events related to activity in history replicator,
9✔
183
        // and the standby timer processor will only need to verify whether the timer task can be safely throw away.
9✔
184
        //
9✔
185
        // activity heartbeat timer task cannot be handled in the way mentioned above.
9✔
186
        // the reason is, there is no event driving the creation of new activity heartbeat timer.
9✔
187
        // although there will be an task syncing activity from remote, the task is not an event,
9✔
188
        // and cannot attempt to recreate a new activity timer task.
9✔
189
        //
9✔
190
        // the overall solution is to attempt to generate a new activity timer task whenever the
9✔
191
        // task passed in is safe to be throw away.
9✔
192

9✔
193
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
18✔
194

9✔
195
                timerSequence := execution.NewTimerSequence(mutableState)
9✔
196
                updateMutableState := false
9✔
197

9✔
198
        Loop:
9✔
199
                for _, timerSequenceID := range timerSequence.LoadAndSortActivityTimers() {
17✔
200
                        _, ok := mutableState.GetActivityInfo(timerSequenceID.EventID)
8✔
201
                        if !ok {
8✔
202
                                errString := fmt.Sprintf("failed to find in memory activity timer: %v", timerSequenceID.EventID)
×
203
                                t.logger.Error(errString)
×
204
                                return nil, &types.InternalServiceError{Message: errString}
×
205
                        }
×
206

207
                        if _, isExpired := timerSequence.IsExpired(
8✔
208
                                timerTask.VisibilityTimestamp,
8✔
209
                                timerSequenceID,
8✔
210
                        ); isExpired {
14✔
211
                                return getHistoryResendInfo(mutableState)
6✔
212
                        }
6✔
213
                        // since the activity timer are already sorted, so if there is one timer which will not expired
214
                        // all activity timer after this timer will not expired
215
                        break Loop //nolint:staticcheck
2✔
216
                }
217

218
                // for reason to update mutable state & generate a new activity task,
219
                // see comments at the beginning of this function.
220
                // NOTE: this is the only place in the standby logic where mutable state can be updated
221

222
                // need to clear the activity heartbeat timer task marks
223
                lastWriteVersion, err := mutableState.GetLastWriteVersion()
6✔
224
                if err != nil {
6✔
225
                        return nil, err
×
226
                }
×
227

228
                // NOTE: LastHeartbeatTimeoutVisibilityInSeconds is for deduping heartbeat timer creation as it's possible
229
                // one heartbeat task was persisted multiple times with different taskIDs due to the retry logic
230
                // for updating workflow execution. In that case, only one new heartbeat timeout task should be
231
                // created.
232
                isHeartBeatTask := timerTask.TimeoutType == int(types.TimeoutTypeHeartbeat)
6✔
233
                activityInfo, ok := mutableState.GetActivityInfo(timerTask.EventID)
6✔
234
                if isHeartBeatTask && ok && activityInfo.LastHeartbeatTimeoutVisibilityInSeconds <= timerTask.VisibilityTimestamp.Unix() {
7✔
235
                        activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ execution.TimerTaskStatusCreatedHeartbeat
1✔
236
                        if err := mutableState.UpdateActivity(activityInfo); err != nil {
1✔
237
                                return nil, err
×
238
                        }
×
239
                        updateMutableState = true
1✔
240
                }
241

242
                // passive logic need to explicitly call create timer
243
                modified, err := timerSequence.CreateNextActivityTimer()
6✔
244
                if err != nil {
6✔
245
                        return nil, err
×
246
                }
×
247
                updateMutableState = updateMutableState || modified
6✔
248

6✔
249
                if !updateMutableState {
11✔
250
                        return nil, nil
5✔
251
                }
5✔
252

253
                now := t.getStandbyClusterTime()
1✔
254
                // we need to handcraft some of the variables
1✔
255
                // since the job being done here is update the activity and possibly write a timer task to DB
1✔
256
                // also need to reset the current version.
1✔
257
                if err := mutableState.UpdateCurrentVersion(lastWriteVersion, true); err != nil {
1✔
258
                        return nil, err
×
259
                }
×
260

261
                err = wfContext.UpdateWorkflowExecutionAsPassive(ctx, now)
1✔
262
                return nil, err
1✔
263
        }
264

265
        return t.processTimer(
9✔
266
                ctx,
9✔
267
                timerTask,
9✔
268
                actionFn,
9✔
269
                getStandbyPostActionFn(
9✔
270
                        timerTask,
9✔
271
                        t.getCurrentTime,
9✔
272
                        t.config.StandbyTaskMissingEventsResendDelay(),
9✔
273
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
9✔
274
                        t.fetchHistoryFromRemote,
9✔
275
                        standbyTimerTaskPostActionTaskDiscarded,
9✔
276
                ),
9✔
277
        )
9✔
278
}
279

280
func (t *timerStandbyTaskExecutor) executeDecisionTimeoutTask(
281
        ctx context.Context,
282
        timerTask *persistence.TimerTaskInfo,
283
) error {
8✔
284

8✔
285
        // decision schedule to start timer won't be generated for sticky decision,
8✔
286
        // since sticky is cleared when applying events on passive.
8✔
287
        // for normal decision, we don't know if a schedule to start timeout timer
8✔
288
        // is generated or not since it's based on a dynamicconfig. On passive cluster,
8✔
289
        // a timer task will be generated based on passive cluster's config, however, it
8✔
290
        // may not match the active cluster.
8✔
291
        // so we simply ignore the schedule to start timer here as the decision task will be
8✔
292
        // pushed to matching without any timeout if's not started, and the workflow
8✔
293
        // can continue execution after failover.
8✔
294
        if timerTask.TimeoutType == int(types.TimeoutTypeScheduleToStart) {
9✔
295
                return nil
1✔
296
        }
1✔
297

298
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
299

7✔
300
                decision, isPending := mutableState.GetDecisionInfo(timerTask.EventID)
7✔
301
                if !isPending {
11✔
302
                        return nil, nil
4✔
303
                }
4✔
304

305
                ok, err := verifyTaskVersion(t.shard, t.logger, timerTask.DomainID, decision.Version, timerTask.Version, timerTask)
6✔
306
                if err != nil || !ok {
6✔
307
                        return nil, err
×
308
                }
×
309

310
                return getHistoryResendInfo(mutableState)
6✔
311
        }
312

313
        return t.processTimer(
7✔
314
                ctx,
7✔
315
                timerTask,
7✔
316
                actionFn,
7✔
317
                getStandbyPostActionFn(
7✔
318
                        timerTask,
7✔
319
                        t.getCurrentTime,
7✔
320
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
321
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
322
                        t.fetchHistoryFromRemote,
7✔
323
                        standbyTimerTaskPostActionTaskDiscarded,
7✔
324
                ),
7✔
325
        )
7✔
326
}
327

328
func (t *timerStandbyTaskExecutor) executeWorkflowBackoffTimerTask(
329
        ctx context.Context,
330
        timerTask *persistence.TimerTaskInfo,
331
) error {
7✔
332

7✔
333
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
334

7✔
335
                if mutableState.HasProcessedOrPendingDecision() {
11✔
336
                        // if there is one decision already been processed
4✔
337
                        // or has pending decision, meaning workflow has already running
4✔
338
                        return nil, nil
4✔
339
                }
4✔
340

341
                // Note: do not need to verify task version here
342
                // logic can only go here if mutable state build's next event ID is 2
343
                // meaning history only contains workflow started event.
344
                // we can do the checking of task version vs workflow started version
345
                // however, workflow started version is immutable
346

347
                // active cluster will add first decision task after backoff timeout.
348
                // standby cluster should just call ack manager to retry this task
349
                // since we are stilling waiting for the first DecisionScheduledEvent to be replicated from active side.
350

351
                return getHistoryResendInfo(mutableState)
3✔
352
        }
353

354
        return t.processTimer(
7✔
355
                ctx,
7✔
356
                timerTask,
7✔
357
                actionFn,
7✔
358
                getStandbyPostActionFn(
7✔
359
                        timerTask,
7✔
360
                        t.getCurrentTime,
7✔
361
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
362
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
363
                        t.fetchHistoryFromRemote,
7✔
364
                        standbyTimerTaskPostActionTaskDiscarded,
7✔
365
                ),
7✔
366
        )
7✔
367
}
368

369
func (t *timerStandbyTaskExecutor) executeWorkflowTimeoutTask(
370
        ctx context.Context,
371
        timerTask *persistence.TimerTaskInfo,
372
) error {
7✔
373

7✔
374
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
13✔
375

6✔
376
                // we do not need to notify new timer to base, since if there is no new event being replicated
6✔
377
                // checking again if the timer can be completed is meaningless
6✔
378

6✔
379
                startVersion, err := mutableState.GetStartVersion()
6✔
380
                if err != nil {
6✔
381
                        return nil, err
×
382
                }
×
383
                ok, err := verifyTaskVersion(t.shard, t.logger, timerTask.DomainID, startVersion, timerTask.Version, timerTask)
6✔
384
                if err != nil || !ok {
6✔
385
                        return nil, err
×
386
                }
×
387

388
                return getHistoryResendInfo(mutableState)
6✔
389
        }
390

391
        return t.processTimer(
7✔
392
                ctx,
7✔
393
                timerTask,
7✔
394
                actionFn,
7✔
395
                getStandbyPostActionFn(
7✔
396
                        timerTask,
7✔
397
                        t.getCurrentTime,
7✔
398
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
399
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
400
                        t.fetchHistoryFromRemote,
7✔
401
                        standbyTimerTaskPostActionTaskDiscarded,
7✔
402
                ),
7✔
403
        )
7✔
404
}
405

406
func (t *timerStandbyTaskExecutor) getStandbyClusterTime() time.Time {
1✔
407
        // time of remote cluster in the shard is delayed by "StandbyClusterDelay"
1✔
408
        // so to get the current accurate remote cluster time, need to add it back
1✔
409
        return t.shard.GetCurrentTime(t.clusterName).Add(t.shard.GetConfig().StandbyClusterDelay())
1✔
410
}
1✔
411

412
func (t *timerStandbyTaskExecutor) processTimer(
413
        ctx context.Context,
414
        timerTask *persistence.TimerTaskInfo,
415
        actionFn standbyActionFn,
416
        postActionFn standbyPostActionFn,
417
) (retError error) {
26✔
418

26✔
419
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
26✔
420
                timerTask.DomainID,
26✔
421
                getWorkflowExecution(timerTask),
26✔
422
                taskGetExecutionContextTimeout,
26✔
423
        )
26✔
424
        if err != nil {
26✔
UNCOV
425
                if err == context.DeadlineExceeded {
×
UNCOV
426
                        return errWorkflowBusy
×
UNCOV
427
                }
×
428
                return err
×
429
        }
430
        defer func() {
52✔
431
                if isRedispatchErr(retError) {
39✔
432
                        release(nil)
13✔
433
                } else {
29✔
434
                        release(retError)
16✔
435
                }
16✔
436
        }()
437

438
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, timerTask, t.metricsClient, t.logger)
26✔
439
        if err != nil {
26✔
440
                return err
×
441
        }
×
442
        if mutableState == nil {
29✔
443
                return nil
3✔
444
        }
3✔
445

446
        if !mutableState.IsWorkflowExecutionRunning() {
30✔
447
                // workflow already finished, no need to process the timer
4✔
448
                return nil
4✔
449
        }
4✔
450

451
        historyResendInfo, err := actionFn(ctx, wfContext, mutableState)
25✔
452
        if err != nil {
25✔
453
                return err
×
454
        }
×
455

456
        release(nil)
25✔
457
        return postActionFn(ctx, timerTask, historyResendInfo, t.logger)
25✔
458
}
459

460
func (t *timerStandbyTaskExecutor) fetchHistoryFromRemote(
461
        _ context.Context,
462
        taskInfo Info,
463
        postActionInfo interface{},
464
        _ log.Logger,
465
) error {
5✔
466

5✔
467
        if postActionInfo == nil {
5✔
468
                return nil
×
469
        }
×
470

471
        task := taskInfo.(*persistence.TimerTaskInfo)
5✔
472
        resendInfo := postActionInfo.(*historyResendInfo)
5✔
473

5✔
474
        t.metricsClient.IncCounter(metrics.HistoryRereplicationByTimerTaskScope, metrics.CadenceClientRequests)
5✔
475
        stopwatch := t.metricsClient.StartTimer(metrics.HistoryRereplicationByTimerTaskScope, metrics.CadenceClientLatency)
5✔
476
        defer stopwatch.Stop()
5✔
477

5✔
478
        var err error
5✔
479
        if resendInfo.lastEventID != nil && resendInfo.lastEventVersion != nil {
10✔
480
                // note history resender doesn't take in a context parameter, there's a separate dynamicconfig for
5✔
481
                // controlling the timeout for resending history.
5✔
482
                err = t.historyResender.SendSingleWorkflowHistory(
5✔
483
                        task.DomainID,
5✔
484
                        task.WorkflowID,
5✔
485
                        task.RunID,
5✔
486
                        resendInfo.lastEventID,
5✔
487
                        resendInfo.lastEventVersion,
5✔
488
                        nil,
5✔
489
                        nil,
5✔
490
                )
5✔
491
        } else {
5✔
492
                err = &types.InternalServiceError{
×
493
                        Message: fmt.Sprintf("incomplete historyResendInfo: %v", resendInfo),
×
494
                }
×
495
        }
×
496

497
        if err != nil {
5✔
498
                t.logger.Error("Error re-replicating history from remote.",
×
499
                        tag.ShardID(t.shard.GetShardID()),
×
500
                        tag.WorkflowDomainID(task.DomainID),
×
501
                        tag.WorkflowID(task.WorkflowID),
×
502
                        tag.WorkflowRunID(task.RunID),
×
503
                        tag.SourceCluster(t.clusterName),
×
504
                        tag.Error(err),
×
505
                )
×
506
        }
×
507

508
        // return error so task processing logic will retry
509
        return &redispatchError{Reason: "fetchHistoryFromRemote"}
5✔
510
}
511

512
func (t *timerStandbyTaskExecutor) getCurrentTime() time.Time {
26✔
513
        return t.shard.GetCurrentTime(t.clusterName)
26✔
514
}
26✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc