• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01875e2f-959c-4c4d-87af-1d7805759bcc

08 Apr 2023 12:26AM UTC coverage: 57.178% (+0.1%) from 57.072%
01875e2f-959c-4c4d-87af-1d7805759bcc

Pull #5197

buildkite

Steven L
bad cleanup -> good cleanup
Pull Request #5197: Demonstrate a way to get rid of the cadence-idl repo

85396 of 149351 relevant lines covered (57.18%)

2283.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.24
/service/history/task/transfer_standby_task_executor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "fmt"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/log/tag"
31
        "github.com/uber/cadence/common/metrics"
32
        "github.com/uber/cadence/common/ndc"
33
        "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/types"
35
        "github.com/uber/cadence/service/history/config"
36
        "github.com/uber/cadence/service/history/execution"
37
        "github.com/uber/cadence/service/history/shard"
38
        "github.com/uber/cadence/service/worker/archiver"
39
)
40

41
type (
42
        transferStandbyTaskExecutor struct {
43
                *transferTaskExecutorBase
44

45
                clusterName     string
46
                historyResender ndc.HistoryResender
47
        }
48
)
49

50
// NewTransferStandbyTaskExecutor creates a new task executor for standby transfer task
51
func NewTransferStandbyTaskExecutor(
52
        shard shard.Context,
53
        archiverClient archiver.Client,
54
        executionCache *execution.Cache,
55
        historyResender ndc.HistoryResender,
56
        logger log.Logger,
57
        clusterName string,
58
        config *config.Config,
59
) Executor {
68✔
60
        return &transferStandbyTaskExecutor{
68✔
61
                transferTaskExecutorBase: newTransferTaskExecutorBase(
68✔
62
                        shard,
68✔
63
                        archiverClient,
68✔
64
                        executionCache,
68✔
65
                        logger,
68✔
66
                        config,
68✔
67
                ),
68✔
68
                clusterName:     clusterName,
68✔
69
                historyResender: historyResender,
68✔
70
        }
68✔
71
}
68✔
72

73
func (t *transferStandbyTaskExecutor) Execute(
74
        task Task,
75
        shouldProcessTask bool,
76
) error {
2,571✔
77
        transferTask, ok := task.GetInfo().(*persistence.TransferTaskInfo)
2,571✔
78
        if !ok {
2,571✔
79
                return errUnexpectedTask
×
80
        }
×
81

82
        if !shouldProcessTask {
5,119✔
83
                return nil
2,548✔
84
        }
2,548✔
85

86
        ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout)
26✔
87
        defer cancel()
26✔
88

26✔
89
        switch transferTask.TaskType {
26✔
90
        case persistence.TransferTaskTypeActivityTask:
6✔
91
                return t.processActivityTask(ctx, transferTask)
6✔
92
        case persistence.TransferTaskTypeDecisionTask:
7✔
93
                return t.processDecisionTask(ctx, transferTask)
7✔
94
        case persistence.TransferTaskTypeCloseExecution,
95
                persistence.TransferTaskTypeRecordWorkflowClosed:
5✔
96
                return t.processCloseExecution(ctx, transferTask)
5✔
97
        case persistence.TransferTaskTypeRecordChildExecutionCompleted,
98
                persistence.TransferTaskTypeApplyParentClosePolicy:
×
99
                // no action needed for standby
×
100
                // check the comment in t.processCloseExecution()
×
101
                return nil
×
102
        case persistence.TransferTaskTypeCancelExecution:
7✔
103
                return t.processCancelExecution(ctx, transferTask)
7✔
104
        case persistence.TransferTaskTypeSignalExecution:
7✔
105
                return t.processSignalExecution(ctx, transferTask)
7✔
106
        case persistence.TransferTaskTypeStartChildExecution:
7✔
107
                return t.processStartChildExecution(ctx, transferTask)
7✔
108
        case persistence.TransferTaskTypeRecordWorkflowStarted:
4✔
109
                return t.processRecordWorkflowStarted(ctx, transferTask)
4✔
110
        case persistence.TransferTaskTypeResetWorkflow:
×
111
                // no reset needed for standby
×
112
                // TODO: add error logs
×
113
                return nil
×
114
        case persistence.TransferTaskTypeUpsertWorkflowSearchAttributes:
1✔
115
                return t.processUpsertWorkflowSearchAttributes(ctx, transferTask)
1✔
116
        default:
×
117
                return errUnknownTransferTask
×
118
        }
119
}
120

121
func (t *transferStandbyTaskExecutor) processActivityTask(
122
        ctx context.Context,
123
        transferTask *persistence.TransferTaskInfo,
124
) error {
6✔
125

6✔
126
        processTaskIfClosed := false
6✔
127
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
12✔
128

6✔
129
                activityInfo, ok := mutableState.GetActivityInfo(transferTask.ScheduleID)
6✔
130
                if !ok {
10✔
131
                        return nil, nil
4✔
132
                }
4✔
133

134
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, activityInfo.Version, transferTask.Version, transferTask)
5✔
135
                if err != nil || !ok {
6✔
136
                        return nil, err
1✔
137
                }
1✔
138

139
                if activityInfo.StartedID == common.EmptyEventID {
10✔
140
                        return newPushActivityToMatchingInfo(
5✔
141
                                activityInfo.ScheduleToStartTimeout,
5✔
142
                        ), nil
5✔
143
                }
5✔
144

145
                return nil, nil
3✔
146
        }
147

148
        return t.processTransfer(
6✔
149
                ctx,
6✔
150
                processTaskIfClosed,
6✔
151
                transferTask,
6✔
152
                actionFn,
6✔
153
                getStandbyPostActionFn(
6✔
154
                        transferTask,
6✔
155
                        t.getCurrentTime,
6✔
156
                        t.config.StandbyTaskMissingEventsResendDelay(),
6✔
157
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
6✔
158
                        t.pushActivity,
6✔
159
                        t.pushActivity,
6✔
160
                ),
6✔
161
        )
6✔
162
}
163

164
func (t *transferStandbyTaskExecutor) processDecisionTask(
165
        ctx context.Context,
166
        transferTask *persistence.TransferTaskInfo,
167
) error {
7✔
168

7✔
169
        processTaskIfClosed := false
7✔
170
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
171

7✔
172
                decisionInfo, ok := mutableState.GetDecisionInfo(transferTask.ScheduleID)
7✔
173
                if !ok {
10✔
174
                        return nil, nil
3✔
175
                }
3✔
176

177
                executionInfo := mutableState.GetExecutionInfo()
7✔
178
                workflowTimeout := executionInfo.WorkflowTimeout
7✔
179
                decisionTimeout := common.MinInt32(workflowTimeout, common.MaxTaskTimeout)
7✔
180
                if executionInfo.TaskList != transferTask.TaskList {
7✔
181
                        // Experimental: try to push sticky task as regular task with sticky timeout as TTL.
×
182
                        // workflow might be sticky before namespace become standby
×
183
                        // there shall already be a schedule_to_start timer created
×
184
                        decisionTimeout = executionInfo.StickyScheduleToStartTimeout
×
185
                }
×
186

187
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, decisionInfo.Version, transferTask.Version, transferTask)
7✔
188
                if err != nil || !ok {
10✔
189
                        return nil, err
3✔
190
                }
3✔
191

192
                if decisionInfo.StartedID == common.EmptyEventID {
12✔
193
                        return newPushDecisionToMatchingInfo(
5✔
194
                                decisionTimeout,
5✔
195
                                types.TaskList{Name: executionInfo.TaskList}, // at standby, always use non-sticky tasklist
5✔
196
                        ), nil
5✔
197
                }
5✔
198

199
                return nil, nil
5✔
200
        }
201

202
        return t.processTransfer(
7✔
203
                ctx,
7✔
204
                processTaskIfClosed,
7✔
205
                transferTask,
7✔
206
                actionFn,
7✔
207
                getStandbyPostActionFn(
7✔
208
                        transferTask,
7✔
209
                        t.getCurrentTime,
7✔
210
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
211
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
212
                        t.pushDecision,
7✔
213
                        t.pushDecision,
7✔
214
                ),
7✔
215
        )
7✔
216
}
217

218
func (t *transferStandbyTaskExecutor) processCloseExecution(
219
        ctx context.Context,
220
        transferTask *persistence.TransferTaskInfo,
221
) error {
5✔
222

5✔
223
        processTaskIfClosed := true
5✔
224
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
10✔
225

5✔
226
                if mutableState.IsWorkflowExecutionRunning() {
8✔
227
                        // this can happen if workflow is reset.
3✔
228
                        return nil, nil
3✔
229
                }
3✔
230

231
                completionEvent, err := mutableState.GetCompletionEvent(ctx)
5✔
232
                if err != nil {
5✔
233
                        return nil, err
×
234
                }
×
235
                wfCloseTime := completionEvent.GetTimestamp()
5✔
236

5✔
237
                executionInfo := mutableState.GetExecutionInfo()
5✔
238
                workflowTypeName := executionInfo.WorkflowTypeName
5✔
239
                workflowCloseTimestamp := wfCloseTime
5✔
240
                workflowCloseStatus := persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
5✔
241
                workflowHistoryLength := mutableState.GetNextEventID() - 1
5✔
242
                startEvent, err := mutableState.GetStartEvent(ctx)
5✔
243
                if err != nil {
5✔
244
                        return nil, err
×
245
                }
×
246
                workflowStartTimestamp := startEvent.GetTimestamp()
5✔
247
                workflowExecutionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
5✔
248
                visibilityMemo := getWorkflowMemo(executionInfo.Memo)
5✔
249
                searchAttr := executionInfo.SearchAttributes
5✔
250
                isCron := len(executionInfo.CronSchedule) > 0
5✔
251
                updateTimestamp := t.shard.GetTimeSource().Now()
5✔
252

5✔
253
                lastWriteVersion, err := mutableState.GetLastWriteVersion()
5✔
254
                if err != nil {
5✔
255
                        return nil, err
×
256
                }
×
257
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, lastWriteVersion, transferTask.Version, transferTask)
5✔
258
                if err != nil || !ok {
8✔
259
                        return nil, err
3✔
260
                }
3✔
261

262
                domainEntry, err := t.shard.GetDomainCache().GetDomainByID(transferTask.DomainID)
5✔
263
                if err != nil {
5✔
264
                        return nil, err
×
265
                }
×
266
                numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
5✔
267

5✔
268
                // DO NOT REPLY TO PARENT
5✔
269
                // since event replication should be done by active cluster
5✔
270
                return nil, t.recordWorkflowClosed(
5✔
271
                        ctx,
5✔
272
                        transferTask.DomainID,
5✔
273
                        transferTask.WorkflowID,
5✔
274
                        transferTask.RunID,
5✔
275
                        workflowTypeName,
5✔
276
                        workflowStartTimestamp,
5✔
277
                        workflowExecutionTimestamp.UnixNano(),
5✔
278
                        workflowCloseTimestamp,
5✔
279
                        *workflowCloseStatus,
5✔
280
                        workflowHistoryLength,
5✔
281
                        transferTask.GetTaskID(),
5✔
282
                        visibilityMemo,
5✔
283
                        executionInfo.TaskList,
5✔
284
                        isCron,
5✔
285
                        numClusters,
5✔
286
                        updateTimestamp.UnixNano(),
5✔
287
                        searchAttr,
5✔
288
                )
5✔
289
        }
290

291
        return t.processTransfer(
5✔
292
                ctx,
5✔
293
                processTaskIfClosed,
5✔
294
                transferTask,
5✔
295
                actionFn,
5✔
296
                standbyTaskPostActionNoOp,
5✔
297
        ) // no op post action, since the entire workflow is finished
5✔
298
}
299

300
func (t *transferStandbyTaskExecutor) processCancelExecution(
301
        ctx context.Context,
302
        transferTask *persistence.TransferTaskInfo,
303
) error {
7✔
304

7✔
305
        processTaskIfClosed := false
7✔
306
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
307

7✔
308
                requestCancelInfo, ok := mutableState.GetRequestCancelInfo(transferTask.ScheduleID)
7✔
309
                if !ok {
11✔
310
                        return nil, nil
4✔
311
                }
4✔
312

313
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, requestCancelInfo.Version, transferTask.Version, transferTask)
6✔
314
                if err != nil || !ok {
6✔
315
                        return nil, err
×
316
                }
×
317

318
                return getHistoryResendInfo(mutableState)
6✔
319
        }
320

321
        return t.processTransfer(
7✔
322
                ctx,
7✔
323
                processTaskIfClosed,
7✔
324
                transferTask,
7✔
325
                actionFn,
7✔
326
                getStandbyPostActionFn(
7✔
327
                        transferTask,
7✔
328
                        t.getCurrentTime,
7✔
329
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
330
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
331
                        t.fetchHistoryFromRemote,
7✔
332
                        standbyTransferTaskPostActionTaskDiscarded,
7✔
333
                ),
7✔
334
        )
7✔
335
}
336

337
func (t *transferStandbyTaskExecutor) processSignalExecution(
338
        ctx context.Context,
339
        transferTask *persistence.TransferTaskInfo,
340
) error {
7✔
341

7✔
342
        processTaskIfClosed := false
7✔
343
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
344

7✔
345
                signalInfo, ok := mutableState.GetSignalInfo(transferTask.ScheduleID)
7✔
346
                if !ok {
11✔
347
                        return nil, nil
4✔
348
                }
4✔
349

350
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, signalInfo.Version, transferTask.Version, transferTask)
6✔
351
                if err != nil || !ok {
8✔
352
                        return nil, err
2✔
353
                }
2✔
354

355
                return getHistoryResendInfo(mutableState)
6✔
356
        }
357

358
        return t.processTransfer(
7✔
359
                ctx,
7✔
360
                processTaskIfClosed,
7✔
361
                transferTask,
7✔
362
                actionFn,
7✔
363
                getStandbyPostActionFn(
7✔
364
                        transferTask,
7✔
365
                        t.getCurrentTime,
7✔
366
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
367
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
368
                        t.fetchHistoryFromRemote,
7✔
369
                        standbyTransferTaskPostActionTaskDiscarded,
7✔
370
                ),
7✔
371
        )
7✔
372
}
373

374
func (t *transferStandbyTaskExecutor) processStartChildExecution(
375
        ctx context.Context,
376
        transferTask *persistence.TransferTaskInfo,
377
) error {
7✔
378

7✔
379
        processTaskIfClosed := false
7✔
380
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
381

7✔
382
                childWorkflowInfo, ok := mutableState.GetChildExecutionInfo(transferTask.ScheduleID)
7✔
383
                if !ok {
11✔
384
                        return nil, nil
4✔
385
                }
4✔
386

387
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, childWorkflowInfo.Version, transferTask.Version, transferTask)
6✔
388
                if err != nil || !ok {
8✔
389
                        return nil, err
2✔
390
                }
2✔
391

392
                if childWorkflowInfo.StartedID != common.EmptyEventID {
7✔
393
                        return nil, nil
1✔
394
                }
1✔
395

396
                return getHistoryResendInfo(mutableState)
6✔
397
        }
398

399
        return t.processTransfer(
7✔
400
                ctx,
7✔
401
                processTaskIfClosed,
7✔
402
                transferTask,
7✔
403
                actionFn,
7✔
404
                getStandbyPostActionFn(
7✔
405
                        transferTask,
7✔
406
                        t.getCurrentTime,
7✔
407
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
408
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
409
                        t.fetchHistoryFromRemote,
7✔
410
                        standbyTransferTaskPostActionTaskDiscarded,
7✔
411
                ),
7✔
412
        )
7✔
413
}
414

415
func (t *transferStandbyTaskExecutor) processRecordWorkflowStarted(
416
        ctx context.Context,
417
        transferTask *persistence.TransferTaskInfo,
418
) error {
4✔
419

4✔
420
        processTaskIfClosed := false
4✔
421
        return t.processTransfer(
4✔
422
                ctx,
4✔
423
                processTaskIfClosed,
4✔
424
                transferTask,
4✔
425
                func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
8✔
426
                        return nil, t.processRecordWorkflowStartedOrUpsertHelper(ctx, transferTask, mutableState, true)
4✔
427
                },
4✔
428
                standbyTaskPostActionNoOp,
429
        )
430
}
431

432
func (t *transferStandbyTaskExecutor) processUpsertWorkflowSearchAttributes(
433
        ctx context.Context,
434
        transferTask *persistence.TransferTaskInfo,
435
) error {
1✔
436

1✔
437
        processTaskIfClosed := false
1✔
438
        return t.processTransfer(
1✔
439
                ctx,
1✔
440
                processTaskIfClosed,
1✔
441
                transferTask,
1✔
442
                func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
2✔
443
                        return nil, t.processRecordWorkflowStartedOrUpsertHelper(ctx, transferTask, mutableState, false)
1✔
444
                },
1✔
445
                standbyTaskPostActionNoOp,
446
        )
447
}
448

449
func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
450
        ctx context.Context,
451
        transferTask *persistence.TransferTaskInfo,
452
        mutableState execution.MutableState,
453
        isRecordStart bool,
454
) error {
5✔
455

5✔
456
        // verify task version for RecordWorkflowStarted.
5✔
457
        // upsert doesn't require verifyTask, because it is just a sync of mutableState.
5✔
458
        if isRecordStart {
9✔
459
                startVersion, err := mutableState.GetStartVersion()
4✔
460
                if err != nil {
4✔
461
                        return err
×
462
                }
×
463
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, startVersion, transferTask.Version, transferTask)
4✔
464
                if err != nil || !ok {
4✔
465
                        return err
×
466
                }
×
467
        }
468

469
        executionInfo := mutableState.GetExecutionInfo()
5✔
470
        workflowTimeout := executionInfo.WorkflowTimeout
5✔
471
        wfTypeName := executionInfo.WorkflowTypeName
5✔
472
        startEvent, err := mutableState.GetStartEvent(ctx)
5✔
473
        if err != nil {
5✔
474
                return err
×
475
        }
×
476
        startTimestamp := startEvent.GetTimestamp()
5✔
477
        executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
5✔
478
        visibilityMemo := getWorkflowMemo(executionInfo.Memo)
5✔
479
        searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
5✔
480
        isCron := len(executionInfo.CronSchedule) > 0
5✔
481
        updateTimestamp := t.shard.GetTimeSource().Now()
5✔
482

5✔
483
        domainEntry, err := t.shard.GetDomainCache().GetDomainByID(transferTask.DomainID)
5✔
484
        if err != nil {
5✔
485
                return err
×
486
        }
×
487
        numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
5✔
488

5✔
489
        if isRecordStart {
9✔
490
                return t.recordWorkflowStarted(
4✔
491
                        ctx,
4✔
492
                        transferTask.DomainID,
4✔
493
                        transferTask.WorkflowID,
4✔
494
                        transferTask.RunID,
4✔
495
                        wfTypeName,
4✔
496
                        startTimestamp,
4✔
497
                        executionTimestamp.UnixNano(),
4✔
498
                        workflowTimeout,
4✔
499
                        transferTask.GetTaskID(),
4✔
500
                        executionInfo.TaskList,
4✔
501
                        isCron,
4✔
502
                        numClusters,
4✔
503
                        visibilityMemo,
4✔
504
                        updateTimestamp.UnixNano(),
4✔
505
                        searchAttr,
4✔
506
                )
4✔
507
        }
4✔
508
        return t.upsertWorkflowExecution(
1✔
509
                ctx,
1✔
510
                transferTask.DomainID,
1✔
511
                transferTask.WorkflowID,
1✔
512
                transferTask.RunID,
1✔
513
                wfTypeName,
1✔
514
                startTimestamp,
1✔
515
                executionTimestamp.UnixNano(),
1✔
516
                workflowTimeout,
1✔
517
                transferTask.GetTaskID(),
1✔
518
                executionInfo.TaskList,
1✔
519
                visibilityMemo,
1✔
520
                isCron,
1✔
521
                numClusters,
1✔
522
                updateTimestamp.UnixNano(),
1✔
523
                searchAttr,
1✔
524
        )
1✔
525

526
}
527

528
func (t *transferStandbyTaskExecutor) processTransfer(
529
        ctx context.Context,
530
        processTaskIfClosed bool,
531
        taskInfo Info,
532
        actionFn standbyActionFn,
533
        postActionFn standbyPostActionFn,
534
) (retError error) {
26✔
535

26✔
536
        transferTask := taskInfo.(*persistence.TransferTaskInfo)
26✔
537
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
26✔
538
                transferTask.DomainID,
26✔
539
                getWorkflowExecution(transferTask),
26✔
540
                taskGetExecutionContextTimeout,
26✔
541
        )
26✔
542
        if err != nil {
26✔
543
                if err == context.DeadlineExceeded {
×
544
                        return errWorkflowBusy
×
545
                }
×
546
                return err
×
547
        }
548
        defer func() {
52✔
549
                if isRedispatchErr(err) {
26✔
550
                        release(nil)
×
551
                } else {
26✔
552
                        release(retError)
26✔
553
                }
26✔
554
        }()
555

556
        mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, transferTask, t.metricsClient, t.logger)
26✔
557
        if err != nil || mutableState == nil {
29✔
558
                return err
3✔
559
        }
3✔
560

561
        if !mutableState.IsWorkflowExecutionRunning() && !processTaskIfClosed {
29✔
562
                // workflow already finished, no need to process the timer
3✔
563
                return nil
3✔
564
        }
3✔
565

566
        historyResendInfo, err := actionFn(ctx, wfContext, mutableState)
26✔
567
        if err != nil {
26✔
568
                return err
×
569
        }
×
570

571
        release(nil)
26✔
572
        return postActionFn(ctx, taskInfo, historyResendInfo, t.logger)
26✔
573
}
574

575
func (t *transferStandbyTaskExecutor) pushActivity(
576
        ctx context.Context,
577
        task Info,
578
        postActionInfo interface{},
579
        logger log.Logger,
580
) error {
1✔
581

1✔
582
        if postActionInfo == nil {
1✔
583
                return nil
×
584
        }
×
585

586
        pushActivityInfo := postActionInfo.(*pushActivityToMatchingInfo)
1✔
587
        timeout := common.MinInt32(pushActivityInfo.activityScheduleToStartTimeout, common.MaxTaskTimeout)
1✔
588
        return t.transferTaskExecutorBase.pushActivity(
1✔
589
                ctx,
1✔
590
                task.(*persistence.TransferTaskInfo),
1✔
591
                timeout,
1✔
592
        )
1✔
593
}
594

595
func (t *transferStandbyTaskExecutor) pushDecision(
596
        ctx context.Context,
597
        task Info,
598
        postActionInfo interface{},
599
        logger log.Logger,
600
) error {
1✔
601

1✔
602
        if postActionInfo == nil {
1✔
603
                return nil
×
604
        }
×
605

606
        pushDecisionInfo := postActionInfo.(*pushDecisionToMatchingInfo)
1✔
607
        timeout := common.MinInt32(pushDecisionInfo.decisionScheduleToStartTimeout, common.MaxTaskTimeout)
1✔
608
        return t.transferTaskExecutorBase.pushDecision(
1✔
609
                ctx,
1✔
610
                task.(*persistence.TransferTaskInfo),
1✔
611
                &pushDecisionInfo.tasklist,
1✔
612
                timeout,
1✔
613
        )
1✔
614
}
615

616
func (t *transferStandbyTaskExecutor) fetchHistoryFromRemote(
617
        _ context.Context,
618
        taskInfo Info,
619
        postActionInfo interface{},
620
        _ log.Logger,
621
) error {
3✔
622

3✔
623
        if postActionInfo == nil {
3✔
624
                return nil
×
625
        }
×
626

627
        task := taskInfo.(*persistence.TransferTaskInfo)
3✔
628
        resendInfo := postActionInfo.(*historyResendInfo)
3✔
629

3✔
630
        t.metricsClient.IncCounter(metrics.HistoryRereplicationByTransferTaskScope, metrics.CadenceClientRequests)
3✔
631
        stopwatch := t.metricsClient.StartTimer(metrics.HistoryRereplicationByTransferTaskScope, metrics.CadenceClientLatency)
3✔
632
        defer stopwatch.Stop()
3✔
633

3✔
634
        var err error
3✔
635
        if resendInfo.lastEventID != nil && resendInfo.lastEventVersion != nil {
6✔
636
                // note history resender doesn't take in a context parameter, there's a separate dynamicconfig for
3✔
637
                // controlling the timeout for resending history.
3✔
638
                err = t.historyResender.SendSingleWorkflowHistory(
3✔
639
                        task.DomainID,
3✔
640
                        task.WorkflowID,
3✔
641
                        task.RunID,
3✔
642
                        resendInfo.lastEventID,
3✔
643
                        resendInfo.lastEventVersion,
3✔
644
                        nil,
3✔
645
                        nil,
3✔
646
                )
3✔
647
        } else {
3✔
648
                err = &types.InternalServiceError{
×
649
                        Message: fmt.Sprintf("incomplete historyResendInfo: %v", resendInfo),
×
650
                }
×
651
        }
×
652

653
        if err != nil {
3✔
654
                t.logger.Error("Error re-replicating history from remote.",
×
655
                        tag.ShardID(t.shard.GetShardID()),
×
656
                        tag.WorkflowDomainID(task.DomainID),
×
657
                        tag.WorkflowID(task.WorkflowID),
×
658
                        tag.WorkflowRunID(task.RunID),
×
659
                        tag.SourceCluster(t.clusterName),
×
660
                        tag.Error(err),
×
661
                )
×
662
        }
×
663

664
        // return error so task processing logic will retry
665
        return &redispatchError{Reason: "fetchHistoryFromRemote"}
3✔
666
}
667

668
func (t *transferStandbyTaskExecutor) getCurrentTime() time.Time {
22✔
669
        return t.shard.GetCurrentTime(t.clusterName)
22✔
670
}
22✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc