• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.1
/service/history/task/transfer_standby_task_executor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "fmt"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/log/tag"
31
        "github.com/uber/cadence/common/metrics"
32
        "github.com/uber/cadence/common/ndc"
33
        "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/types"
35
        "github.com/uber/cadence/service/history/config"
36
        "github.com/uber/cadence/service/history/execution"
37
        "github.com/uber/cadence/service/history/shard"
38
        "github.com/uber/cadence/service/worker/archiver"
39
)
40

41
type (
42
        transferStandbyTaskExecutor struct {
43
                *transferTaskExecutorBase
44

45
                clusterName     string
46
                historyResender ndc.HistoryResender
47
        }
48
)
49

50
// NewTransferStandbyTaskExecutor creates a new task executor for standby transfer task
51
func NewTransferStandbyTaskExecutor(
52
        shard shard.Context,
53
        archiverClient archiver.Client,
54
        executionCache *execution.Cache,
55
        historyResender ndc.HistoryResender,
56
        logger log.Logger,
57
        clusterName string,
58
        config *config.Config,
59
) Executor {
68✔
60
        return &transferStandbyTaskExecutor{
68✔
61
                transferTaskExecutorBase: newTransferTaskExecutorBase(
68✔
62
                        shard,
68✔
63
                        archiverClient,
68✔
64
                        executionCache,
68✔
65
                        logger,
68✔
66
                        config,
68✔
67
                ),
68✔
68
                clusterName:     clusterName,
68✔
69
                historyResender: historyResender,
68✔
70
        }
68✔
71
}
68✔
72

73
func (t *transferStandbyTaskExecutor) Execute(
74
        task Task,
75
        shouldProcessTask bool,
76
) error {
2,713✔
77
        transferTask, ok := task.GetInfo().(*persistence.TransferTaskInfo)
2,713✔
78
        if !ok {
2,713✔
79
                return errUnexpectedTask
×
80
        }
×
81

82
        if !shouldProcessTask {
5,403✔
83
                return nil
2,690✔
84
        }
2,690✔
85

86
        ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout)
26✔
87
        defer cancel()
26✔
88

26✔
89
        switch transferTask.TaskType {
26✔
90
        case persistence.TransferTaskTypeActivityTask:
6✔
91
                return t.processActivityTask(ctx, transferTask)
6✔
92
        case persistence.TransferTaskTypeDecisionTask:
7✔
93
                return t.processDecisionTask(ctx, transferTask)
7✔
94
        case persistence.TransferTaskTypeCloseExecution,
95
                persistence.TransferTaskTypeRecordWorkflowClosed:
5✔
96
                return t.processCloseExecution(ctx, transferTask)
5✔
97
        case persistence.TransferTaskTypeRecordChildExecutionCompleted,
98
                persistence.TransferTaskTypeApplyParentClosePolicy:
×
99
                // no action needed for standby
×
100
                // check the comment in t.processCloseExecution()
×
101
                return nil
×
102
        case persistence.TransferTaskTypeCancelExecution:
7✔
103
                return t.processCancelExecution(ctx, transferTask)
7✔
104
        case persistence.TransferTaskTypeSignalExecution:
7✔
105
                return t.processSignalExecution(ctx, transferTask)
7✔
106
        case persistence.TransferTaskTypeStartChildExecution:
7✔
107
                return t.processStartChildExecution(ctx, transferTask)
7✔
108
        case persistence.TransferTaskTypeRecordWorkflowStarted:
4✔
109
                return t.processRecordWorkflowStarted(ctx, transferTask)
4✔
110
        case persistence.TransferTaskTypeResetWorkflow:
×
111
                // no reset needed for standby
×
112
                // TODO: add error logs
×
113
                return nil
×
114
        case persistence.TransferTaskTypeUpsertWorkflowSearchAttributes:
1✔
115
                return t.processUpsertWorkflowSearchAttributes(ctx, transferTask)
1✔
116
        default:
×
117
                return errUnknownTransferTask
×
118
        }
119
}
120

121
func (t *transferStandbyTaskExecutor) processActivityTask(
122
        ctx context.Context,
123
        transferTask *persistence.TransferTaskInfo,
124
) error {
6✔
125

6✔
126
        processTaskIfClosed := false
6✔
127
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
12✔
128

6✔
129
                activityInfo, ok := mutableState.GetActivityInfo(transferTask.ScheduleID)
6✔
130
                if !ok {
10✔
131
                        return nil, nil
4✔
132
                }
4✔
133

134
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, activityInfo.Version, transferTask.Version, transferTask)
5✔
135
                if err != nil || !ok {
5✔
136
                        return nil, err
×
137
                }
×
138

139
                if activityInfo.StartedID == common.EmptyEventID {
10✔
140
                        return newPushActivityToMatchingInfo(
5✔
141
                                activityInfo.ScheduleToStartTimeout,
5✔
142
                                mutableState.GetExecutionInfo().PartitionConfig,
5✔
143
                        ), nil
5✔
144
                }
5✔
145

146
                return nil, nil
3✔
147
        }
148

149
        return t.processTransfer(
6✔
150
                ctx,
6✔
151
                processTaskIfClosed,
6✔
152
                transferTask,
6✔
153
                actionFn,
6✔
154
                getStandbyPostActionFn(
6✔
155
                        transferTask,
6✔
156
                        t.getCurrentTime,
6✔
157
                        t.config.StandbyTaskMissingEventsResendDelay(),
6✔
158
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
6✔
159
                        t.pushActivity,
6✔
160
                        t.pushActivity,
6✔
161
                ),
6✔
162
        )
6✔
163
}
164

165
func (t *transferStandbyTaskExecutor) processDecisionTask(
166
        ctx context.Context,
167
        transferTask *persistence.TransferTaskInfo,
168
) error {
7✔
169

7✔
170
        processTaskIfClosed := false
7✔
171
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
172

7✔
173
                decisionInfo, ok := mutableState.GetDecisionInfo(transferTask.ScheduleID)
7✔
174
                if !ok {
10✔
175
                        return nil, nil
3✔
176
                }
3✔
177

178
                executionInfo := mutableState.GetExecutionInfo()
7✔
179
                workflowTimeout := executionInfo.WorkflowTimeout
7✔
180
                decisionTimeout := common.MinInt32(workflowTimeout, common.MaxTaskTimeout)
7✔
181
                if executionInfo.TaskList != transferTask.TaskList {
7✔
182
                        // Experimental: try to push sticky task as regular task with sticky timeout as TTL.
×
183
                        // workflow might be sticky before namespace become standby
×
184
                        // there shall already be a schedule_to_start timer created
×
185
                        decisionTimeout = executionInfo.StickyScheduleToStartTimeout
×
186
                }
×
187

188
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, decisionInfo.Version, transferTask.Version, transferTask)
7✔
189
                if err != nil || !ok {
9✔
190
                        return nil, err
2✔
191
                }
2✔
192

193
                if decisionInfo.StartedID == common.EmptyEventID {
12✔
194
                        return newPushDecisionToMatchingInfo(
5✔
195
                                decisionTimeout,
5✔
196
                                types.TaskList{Name: executionInfo.TaskList}, // at standby, always use non-sticky tasklist
5✔
197
                                mutableState.GetExecutionInfo().PartitionConfig,
5✔
198
                        ), nil
5✔
199
                }
5✔
200

201
                return nil, nil
5✔
202
        }
203

204
        return t.processTransfer(
7✔
205
                ctx,
7✔
206
                processTaskIfClosed,
7✔
207
                transferTask,
7✔
208
                actionFn,
7✔
209
                getStandbyPostActionFn(
7✔
210
                        transferTask,
7✔
211
                        t.getCurrentTime,
7✔
212
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
213
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
214
                        t.pushDecision,
7✔
215
                        t.pushDecision,
7✔
216
                ),
7✔
217
        )
7✔
218
}
219

220
func (t *transferStandbyTaskExecutor) processCloseExecution(
221
        ctx context.Context,
222
        transferTask *persistence.TransferTaskInfo,
223
) error {
5✔
224

5✔
225
        processTaskIfClosed := true
5✔
226
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
10✔
227

5✔
228
                if mutableState.IsWorkflowExecutionRunning() {
8✔
229
                        // this can happen if workflow is reset.
3✔
230
                        return nil, nil
3✔
231
                }
3✔
232

233
                completionEvent, err := mutableState.GetCompletionEvent(ctx)
5✔
234
                if err != nil {
5✔
235
                        return nil, err
×
236
                }
×
237
                wfCloseTime := completionEvent.GetTimestamp()
5✔
238

5✔
239
                executionInfo := mutableState.GetExecutionInfo()
5✔
240
                workflowTypeName := executionInfo.WorkflowTypeName
5✔
241
                workflowCloseTimestamp := wfCloseTime
5✔
242
                workflowCloseStatus := persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
5✔
243
                workflowHistoryLength := mutableState.GetNextEventID() - 1
5✔
244
                startEvent, err := mutableState.GetStartEvent(ctx)
5✔
245
                if err != nil {
5✔
246
                        return nil, err
×
247
                }
×
248
                workflowStartTimestamp := startEvent.GetTimestamp()
5✔
249
                workflowExecutionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
5✔
250
                visibilityMemo := getWorkflowMemo(executionInfo.Memo)
5✔
251
                searchAttr := executionInfo.SearchAttributes
5✔
252
                isCron := len(executionInfo.CronSchedule) > 0
5✔
253
                updateTimestamp := t.shard.GetTimeSource().Now()
5✔
254

5✔
255
                lastWriteVersion, err := mutableState.GetLastWriteVersion()
5✔
256
                if err != nil {
5✔
257
                        return nil, err
×
258
                }
×
259
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, lastWriteVersion, transferTask.Version, transferTask)
5✔
260
                if err != nil || !ok {
8✔
261
                        return nil, err
3✔
262
                }
3✔
263

264
                domainEntry, err := t.shard.GetDomainCache().GetDomainByID(transferTask.DomainID)
5✔
265
                if err != nil {
5✔
266
                        return nil, err
×
267
                }
×
268
                numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
5✔
269

5✔
270
                // DO NOT REPLY TO PARENT
5✔
271
                // since event replication should be done by active cluster
5✔
272
                return nil, t.recordWorkflowClosed(
5✔
273
                        ctx,
5✔
274
                        transferTask.DomainID,
5✔
275
                        transferTask.WorkflowID,
5✔
276
                        transferTask.RunID,
5✔
277
                        workflowTypeName,
5✔
278
                        workflowStartTimestamp,
5✔
279
                        workflowExecutionTimestamp.UnixNano(),
5✔
280
                        workflowCloseTimestamp,
5✔
281
                        *workflowCloseStatus,
5✔
282
                        workflowHistoryLength,
5✔
283
                        transferTask.GetTaskID(),
5✔
284
                        visibilityMemo,
5✔
285
                        executionInfo.TaskList,
5✔
286
                        isCron,
5✔
287
                        numClusters,
5✔
288
                        updateTimestamp.UnixNano(),
5✔
289
                        searchAttr,
5✔
290
                )
5✔
291
        }
292

293
        return t.processTransfer(
5✔
294
                ctx,
5✔
295
                processTaskIfClosed,
5✔
296
                transferTask,
5✔
297
                actionFn,
5✔
298
                standbyTaskPostActionNoOp,
5✔
299
        ) // no op post action, since the entire workflow is finished
5✔
300
}
301

302
func (t *transferStandbyTaskExecutor) processCancelExecution(
303
        ctx context.Context,
304
        transferTask *persistence.TransferTaskInfo,
305
) error {
7✔
306

7✔
307
        processTaskIfClosed := false
7✔
308
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
309

7✔
310
                requestCancelInfo, ok := mutableState.GetRequestCancelInfo(transferTask.ScheduleID)
7✔
311
                if !ok {
11✔
312
                        return nil, nil
4✔
313
                }
4✔
314

315
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, requestCancelInfo.Version, transferTask.Version, transferTask)
6✔
316
                if err != nil || !ok {
6✔
317
                        return nil, err
×
318
                }
×
319

320
                return getHistoryResendInfo(mutableState)
6✔
321
        }
322

323
        return t.processTransfer(
7✔
324
                ctx,
7✔
325
                processTaskIfClosed,
7✔
326
                transferTask,
7✔
327
                actionFn,
7✔
328
                getStandbyPostActionFn(
7✔
329
                        transferTask,
7✔
330
                        t.getCurrentTime,
7✔
331
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
332
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
333
                        t.fetchHistoryFromRemote,
7✔
334
                        standbyTransferTaskPostActionTaskDiscarded,
7✔
335
                ),
7✔
336
        )
7✔
337
}
338

339
func (t *transferStandbyTaskExecutor) processSignalExecution(
340
        ctx context.Context,
341
        transferTask *persistence.TransferTaskInfo,
342
) error {
7✔
343

7✔
344
        processTaskIfClosed := false
7✔
345
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
346

7✔
347
                signalInfo, ok := mutableState.GetSignalInfo(transferTask.ScheduleID)
7✔
348
                if !ok {
11✔
349
                        return nil, nil
4✔
350
                }
4✔
351

352
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, signalInfo.Version, transferTask.Version, transferTask)
6✔
353
                if err != nil || !ok {
6✔
354
                        return nil, err
×
355
                }
×
356

357
                return getHistoryResendInfo(mutableState)
6✔
358
        }
359

360
        return t.processTransfer(
7✔
361
                ctx,
7✔
362
                processTaskIfClosed,
7✔
363
                transferTask,
7✔
364
                actionFn,
7✔
365
                getStandbyPostActionFn(
7✔
366
                        transferTask,
7✔
367
                        t.getCurrentTime,
7✔
368
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
369
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
370
                        t.fetchHistoryFromRemote,
7✔
371
                        standbyTransferTaskPostActionTaskDiscarded,
7✔
372
                ),
7✔
373
        )
7✔
374
}
375

376
func (t *transferStandbyTaskExecutor) processStartChildExecution(
377
        ctx context.Context,
378
        transferTask *persistence.TransferTaskInfo,
379
) error {
7✔
380

7✔
381
        processTaskIfClosed := false
7✔
382
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
383

7✔
384
                childWorkflowInfo, ok := mutableState.GetChildExecutionInfo(transferTask.ScheduleID)
7✔
385
                if !ok {
11✔
386
                        return nil, nil
4✔
387
                }
4✔
388

389
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, childWorkflowInfo.Version, transferTask.Version, transferTask)
6✔
390
                if err != nil || !ok {
6✔
391
                        return nil, err
×
392
                }
×
393

394
                if childWorkflowInfo.StartedID != common.EmptyEventID {
8✔
395
                        return nil, nil
2✔
396
                }
2✔
397

398
                return getHistoryResendInfo(mutableState)
6✔
399
        }
400

401
        return t.processTransfer(
7✔
402
                ctx,
7✔
403
                processTaskIfClosed,
7✔
404
                transferTask,
7✔
405
                actionFn,
7✔
406
                getStandbyPostActionFn(
7✔
407
                        transferTask,
7✔
408
                        t.getCurrentTime,
7✔
409
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
410
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
411
                        t.fetchHistoryFromRemote,
7✔
412
                        standbyTransferTaskPostActionTaskDiscarded,
7✔
413
                ),
7✔
414
        )
7✔
415
}
416

417
func (t *transferStandbyTaskExecutor) processRecordWorkflowStarted(
418
        ctx context.Context,
419
        transferTask *persistence.TransferTaskInfo,
420
) error {
4✔
421

4✔
422
        processTaskIfClosed := false
4✔
423
        return t.processTransfer(
4✔
424
                ctx,
4✔
425
                processTaskIfClosed,
4✔
426
                transferTask,
4✔
427
                func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
8✔
428
                        return nil, t.processRecordWorkflowStartedOrUpsertHelper(ctx, transferTask, mutableState, true)
4✔
429
                },
4✔
430
                standbyTaskPostActionNoOp,
431
        )
432
}
433

434
func (t *transferStandbyTaskExecutor) processUpsertWorkflowSearchAttributes(
435
        ctx context.Context,
436
        transferTask *persistence.TransferTaskInfo,
437
) error {
1✔
438

1✔
439
        processTaskIfClosed := false
1✔
440
        return t.processTransfer(
1✔
441
                ctx,
1✔
442
                processTaskIfClosed,
1✔
443
                transferTask,
1✔
444
                func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
2✔
445
                        return nil, t.processRecordWorkflowStartedOrUpsertHelper(ctx, transferTask, mutableState, false)
1✔
446
                },
1✔
447
                standbyTaskPostActionNoOp,
448
        )
449
}
450

451
func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
452
        ctx context.Context,
453
        transferTask *persistence.TransferTaskInfo,
454
        mutableState execution.MutableState,
455
        isRecordStart bool,
456
) error {
5✔
457

5✔
458
        // verify task version for RecordWorkflowStarted.
5✔
459
        // upsert doesn't require verifyTask, because it is just a sync of mutableState.
5✔
460
        if isRecordStart {
9✔
461
                startVersion, err := mutableState.GetStartVersion()
4✔
462
                if err != nil {
4✔
463
                        return err
×
464
                }
×
465
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, startVersion, transferTask.Version, transferTask)
4✔
466
                if err != nil || !ok {
4✔
467
                        return err
×
468
                }
×
469
        }
470

471
        executionInfo := mutableState.GetExecutionInfo()
5✔
472
        workflowTimeout := executionInfo.WorkflowTimeout
5✔
473
        wfTypeName := executionInfo.WorkflowTypeName
5✔
474
        startEvent, err := mutableState.GetStartEvent(ctx)
5✔
475
        if err != nil {
5✔
476
                return err
×
477
        }
×
478
        startTimestamp := startEvent.GetTimestamp()
5✔
479
        executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
5✔
480
        visibilityMemo := getWorkflowMemo(executionInfo.Memo)
5✔
481
        searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
5✔
482
        isCron := len(executionInfo.CronSchedule) > 0
5✔
483
        updateTimestamp := t.shard.GetTimeSource().Now()
5✔
484

5✔
485
        domainEntry, err := t.shard.GetDomainCache().GetDomainByID(transferTask.DomainID)
5✔
486
        if err != nil {
5✔
487
                return err
×
488
        }
×
489
        numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
5✔
490

5✔
491
        if isRecordStart {
9✔
492
                return t.recordWorkflowStarted(
4✔
493
                        ctx,
4✔
494
                        transferTask.DomainID,
4✔
495
                        transferTask.WorkflowID,
4✔
496
                        transferTask.RunID,
4✔
497
                        wfTypeName,
4✔
498
                        startTimestamp,
4✔
499
                        executionTimestamp.UnixNano(),
4✔
500
                        workflowTimeout,
4✔
501
                        transferTask.GetTaskID(),
4✔
502
                        executionInfo.TaskList,
4✔
503
                        isCron,
4✔
504
                        numClusters,
4✔
505
                        visibilityMemo,
4✔
506
                        updateTimestamp.UnixNano(),
4✔
507
                        searchAttr,
4✔
508
                )
4✔
509
        }
4✔
510
        return t.upsertWorkflowExecution(
1✔
511
                ctx,
1✔
512
                transferTask.DomainID,
1✔
513
                transferTask.WorkflowID,
1✔
514
                transferTask.RunID,
1✔
515
                wfTypeName,
1✔
516
                startTimestamp,
1✔
517
                executionTimestamp.UnixNano(),
1✔
518
                workflowTimeout,
1✔
519
                transferTask.GetTaskID(),
1✔
520
                executionInfo.TaskList,
1✔
521
                visibilityMemo,
1✔
522
                isCron,
1✔
523
                numClusters,
1✔
524
                updateTimestamp.UnixNano(),
1✔
525
                searchAttr,
1✔
526
        )
1✔
527

528
}
529

530
func (t *transferStandbyTaskExecutor) processTransfer(
531
        ctx context.Context,
532
        processTaskIfClosed bool,
533
        taskInfo Info,
534
        actionFn standbyActionFn,
535
        postActionFn standbyPostActionFn,
536
) (retError error) {
26✔
537

26✔
538
        transferTask := taskInfo.(*persistence.TransferTaskInfo)
26✔
539
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
26✔
540
                transferTask.DomainID,
26✔
541
                getWorkflowExecution(transferTask),
26✔
542
                taskGetExecutionContextTimeout,
26✔
543
        )
26✔
544
        if err != nil {
26✔
545
                if err == context.DeadlineExceeded {
×
546
                        return errWorkflowBusy
×
547
                }
×
548
                return err
×
549
        }
550
        defer func() {
52✔
551
                if isRedispatchErr(err) {
26✔
552
                        release(nil)
×
553
                } else {
26✔
554
                        release(retError)
26✔
555
                }
26✔
556
        }()
557

558
        mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, transferTask, t.metricsClient, t.logger)
26✔
559
        if err != nil || mutableState == nil {
29✔
560
                return err
3✔
561
        }
3✔
562

563
        if !mutableState.IsWorkflowExecutionRunning() && !processTaskIfClosed {
29✔
564
                // workflow already finished, no need to process the timer
3✔
565
                return nil
3✔
566
        }
3✔
567

568
        historyResendInfo, err := actionFn(ctx, wfContext, mutableState)
26✔
569
        if err != nil {
26✔
570
                return err
×
571
        }
×
572

573
        release(nil)
26✔
574
        return postActionFn(ctx, taskInfo, historyResendInfo, t.logger)
26✔
575
}
576

577
func (t *transferStandbyTaskExecutor) pushActivity(
578
        ctx context.Context,
579
        task Info,
580
        postActionInfo interface{},
581
        logger log.Logger,
582
) error {
1✔
583

1✔
584
        if postActionInfo == nil {
1✔
585
                return nil
×
586
        }
×
587

588
        pushActivityInfo := postActionInfo.(*pushActivityToMatchingInfo)
1✔
589
        timeout := common.MinInt32(pushActivityInfo.activityScheduleToStartTimeout, common.MaxTaskTimeout)
1✔
590
        return t.transferTaskExecutorBase.pushActivity(
1✔
591
                ctx,
1✔
592
                task.(*persistence.TransferTaskInfo),
1✔
593
                timeout,
1✔
594
                pushActivityInfo.partitionConfig,
1✔
595
        )
1✔
596
}
597

598
func (t *transferStandbyTaskExecutor) pushDecision(
599
        ctx context.Context,
600
        task Info,
601
        postActionInfo interface{},
602
        logger log.Logger,
603
) error {
1✔
604

1✔
605
        if postActionInfo == nil {
1✔
606
                return nil
×
607
        }
×
608

609
        pushDecisionInfo := postActionInfo.(*pushDecisionToMatchingInfo)
1✔
610
        timeout := common.MinInt32(pushDecisionInfo.decisionScheduleToStartTimeout, common.MaxTaskTimeout)
1✔
611
        return t.transferTaskExecutorBase.pushDecision(
1✔
612
                ctx,
1✔
613
                task.(*persistence.TransferTaskInfo),
1✔
614
                &pushDecisionInfo.tasklist,
1✔
615
                timeout,
1✔
616
                pushDecisionInfo.partitionConfig,
1✔
617
        )
1✔
618
}
619

620
func (t *transferStandbyTaskExecutor) fetchHistoryFromRemote(
621
        _ context.Context,
622
        taskInfo Info,
623
        postActionInfo interface{},
624
        _ log.Logger,
625
) error {
3✔
626

3✔
627
        if postActionInfo == nil {
3✔
628
                return nil
×
629
        }
×
630

631
        task := taskInfo.(*persistence.TransferTaskInfo)
3✔
632
        resendInfo := postActionInfo.(*historyResendInfo)
3✔
633

3✔
634
        t.metricsClient.IncCounter(metrics.HistoryRereplicationByTransferTaskScope, metrics.CadenceClientRequests)
3✔
635
        stopwatch := t.metricsClient.StartTimer(metrics.HistoryRereplicationByTransferTaskScope, metrics.CadenceClientLatency)
3✔
636
        defer stopwatch.Stop()
3✔
637

3✔
638
        var err error
3✔
639
        if resendInfo.lastEventID != nil && resendInfo.lastEventVersion != nil {
6✔
640
                // note history resender doesn't take in a context parameter, there's a separate dynamicconfig for
3✔
641
                // controlling the timeout for resending history.
3✔
642
                err = t.historyResender.SendSingleWorkflowHistory(
3✔
643
                        task.DomainID,
3✔
644
                        task.WorkflowID,
3✔
645
                        task.RunID,
3✔
646
                        resendInfo.lastEventID,
3✔
647
                        resendInfo.lastEventVersion,
3✔
648
                        nil,
3✔
649
                        nil,
3✔
650
                )
3✔
651
        } else {
3✔
652
                err = &types.InternalServiceError{
×
653
                        Message: fmt.Sprintf("incomplete historyResendInfo: %v", resendInfo),
×
654
                }
×
655
        }
×
656

657
        if err != nil {
3✔
658
                t.logger.Error("Error re-replicating history from remote.",
×
659
                        tag.ShardID(t.shard.GetShardID()),
×
660
                        tag.WorkflowDomainID(task.DomainID),
×
661
                        tag.WorkflowID(task.WorkflowID),
×
662
                        tag.WorkflowRunID(task.RunID),
×
663
                        tag.SourceCluster(t.clusterName),
×
664
                        tag.Error(err),
×
665
                )
×
666
        }
×
667

668
        // return error so task processing logic will retry
669
        return &redispatchError{Reason: "fetchHistoryFromRemote"}
3✔
670
}
671

672
func (t *transferStandbyTaskExecutor) getCurrentTime() time.Time {
22✔
673
        return t.shard.GetCurrentTime(t.clusterName)
22✔
674
}
22✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc