• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01907562-f5f0-40b6-8908-451d758b6138

02 Jul 2024 09:37PM UTC coverage: 71.512% (+0.003%) from 71.509%
01907562-f5f0-40b6-8908-451d758b6138

Pull #6155

buildkite

Groxx
Stop the ratelimiter collections when stopping the service
Pull Request #6155: Stop the ratelimiter collections when stopping the service

9 of 17 new or added lines in 1 file covered. (52.94%)

22 existing lines in 7 files now uncovered.

105315 of 147269 relevant lines covered (71.51%)

2600.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.64
/service/history/task/transfer_standby_task_executor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "fmt"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/log/tag"
31
        "github.com/uber/cadence/common/metrics"
32
        "github.com/uber/cadence/common/ndc"
33
        "github.com/uber/cadence/common/persistence"
34
        "github.com/uber/cadence/common/types"
35
        "github.com/uber/cadence/service/history/config"
36
        "github.com/uber/cadence/service/history/execution"
37
        "github.com/uber/cadence/service/history/shard"
38
        "github.com/uber/cadence/service/worker/archiver"
39
)
40

41
type (
42
        transferStandbyTaskExecutor struct {
43
                *transferTaskExecutorBase
44

45
                clusterName     string
46
                historyResender ndc.HistoryResender
47
        }
48
)
49

50
// NewTransferStandbyTaskExecutor creates a new task executor for standby transfer task
51
func NewTransferStandbyTaskExecutor(
52
        shard shard.Context,
53
        archiverClient archiver.Client,
54
        executionCache execution.Cache,
55
        historyResender ndc.HistoryResender,
56
        logger log.Logger,
57
        clusterName string,
58
        config *config.Config,
59
) Executor {
94✔
60
        return &transferStandbyTaskExecutor{
94✔
61
                transferTaskExecutorBase: newTransferTaskExecutorBase(
94✔
62
                        shard,
94✔
63
                        archiverClient,
94✔
64
                        executionCache,
94✔
65
                        logger,
94✔
66
                        config,
94✔
67
                ),
94✔
68
                clusterName:     clusterName,
94✔
69
                historyResender: historyResender,
94✔
70
        }
94✔
71
}
94✔
72

73
func (t *transferStandbyTaskExecutor) Execute(
74
        task Task,
75
        shouldProcessTask bool,
76
) error {
2,675✔
77
        transferTask, ok := task.GetInfo().(*persistence.TransferTaskInfo)
2,675✔
78
        if !ok {
2,675✔
79
                return errUnexpectedTask
×
80
        }
×
81

82
        if !shouldProcessTask {
5,325✔
83
                return nil
2,650✔
84
        }
2,650✔
85

86
        ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout)
28✔
87
        defer cancel()
28✔
88

28✔
89
        switch transferTask.TaskType {
28✔
90
        case persistence.TransferTaskTypeActivityTask:
6✔
91
                return t.processActivityTask(ctx, transferTask)
6✔
92
        case persistence.TransferTaskTypeDecisionTask:
7✔
93
                return t.processDecisionTask(ctx, transferTask)
7✔
94
        case persistence.TransferTaskTypeCloseExecution,
95
                persistence.TransferTaskTypeRecordWorkflowClosed:
5✔
96
                return t.processCloseExecution(ctx, transferTask)
5✔
97
        case persistence.TransferTaskTypeRecordChildExecutionCompleted,
98
                persistence.TransferTaskTypeApplyParentClosePolicy:
×
99
                // no action needed for standby
×
100
                // check the comment in t.processCloseExecution()
×
101
                return nil
×
102
        case persistence.TransferTaskTypeCancelExecution:
7✔
103
                return t.processCancelExecution(ctx, transferTask)
7✔
104
        case persistence.TransferTaskTypeSignalExecution:
7✔
105
                return t.processSignalExecution(ctx, transferTask)
7✔
106
        case persistence.TransferTaskTypeStartChildExecution:
7✔
107
                return t.processStartChildExecution(ctx, transferTask)
7✔
108
        case persistence.TransferTaskTypeRecordWorkflowStarted:
5✔
109
                return t.processRecordWorkflowStarted(ctx, transferTask)
5✔
110
        case persistence.TransferTaskTypeResetWorkflow:
×
111
                // no reset needed for standby
×
112
                // TODO: add error logs
×
113
                return nil
×
114
        case persistence.TransferTaskTypeUpsertWorkflowSearchAttributes:
2✔
115
                return t.processUpsertWorkflowSearchAttributes(ctx, transferTask)
2✔
116
        default:
×
117
                return errUnknownTransferTask
×
118
        }
119
}
120

121
// Empty func for now
122
func (t *transferStandbyTaskExecutor) Stop() {}
19✔
123

124
func (t *transferStandbyTaskExecutor) processActivityTask(
125
        ctx context.Context,
126
        transferTask *persistence.TransferTaskInfo,
127
) error {
6✔
128

6✔
129
        processTaskIfClosed := false
6✔
130
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
12✔
131

6✔
132
                activityInfo, ok := mutableState.GetActivityInfo(transferTask.ScheduleID)
6✔
133
                if !ok {
10✔
134
                        return nil, nil
4✔
135
                }
4✔
136

137
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, activityInfo.Version, transferTask.Version, transferTask)
5✔
138
                if err != nil || !ok {
6✔
139
                        return nil, err
1✔
140
                }
1✔
141

142
                if activityInfo.StartedID == common.EmptyEventID {
10✔
143
                        return newPushActivityToMatchingInfo(
5✔
144
                                activityInfo.ScheduleToStartTimeout,
5✔
145
                                mutableState.GetExecutionInfo().PartitionConfig,
5✔
146
                        ), nil
5✔
147
                }
5✔
148

149
                return nil, nil
3✔
150
        }
151

152
        return t.processTransfer(
6✔
153
                ctx,
6✔
154
                processTaskIfClosed,
6✔
155
                transferTask,
6✔
156
                actionFn,
6✔
157
                getStandbyPostActionFn(
6✔
158
                        transferTask,
6✔
159
                        t.getCurrentTime,
6✔
160
                        t.config.StandbyTaskMissingEventsResendDelay(),
6✔
161
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
6✔
162
                        t.pushActivity,
6✔
163
                        t.pushActivity,
6✔
164
                ),
6✔
165
        )
6✔
166
}
167

168
func (t *transferStandbyTaskExecutor) processDecisionTask(
169
        ctx context.Context,
170
        transferTask *persistence.TransferTaskInfo,
171
) error {
7✔
172

7✔
173
        processTaskIfClosed := false
7✔
174
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
175

7✔
176
                decisionInfo, ok := mutableState.GetDecisionInfo(transferTask.ScheduleID)
7✔
177
                if !ok {
10✔
178
                        return nil, nil
3✔
179
                }
3✔
180

181
                executionInfo := mutableState.GetExecutionInfo()
7✔
182
                workflowTimeout := executionInfo.WorkflowTimeout
7✔
183
                decisionTimeout := common.MinInt32(workflowTimeout, common.MaxTaskTimeout)
7✔
184
                if executionInfo.TaskList != transferTask.TaskList {
7✔
185
                        // Experimental: try to push sticky task as regular task with sticky timeout as TTL.
×
186
                        // workflow might be sticky before namespace become standby
×
187
                        // there shall already be a schedule_to_start timer created
×
188
                        decisionTimeout = executionInfo.StickyScheduleToStartTimeout
×
189
                }
×
190

191
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, decisionInfo.Version, transferTask.Version, transferTask)
7✔
192
                if err != nil || !ok {
9✔
193
                        return nil, err
2✔
194
                }
2✔
195

196
                if decisionInfo.StartedID == common.EmptyEventID {
12✔
197
                        return newPushDecisionToMatchingInfo(
5✔
198
                                decisionTimeout,
5✔
199
                                types.TaskList{Name: executionInfo.TaskList}, // at standby, always use non-sticky tasklist
5✔
200
                                mutableState.GetExecutionInfo().PartitionConfig,
5✔
201
                        ), nil
5✔
202
                }
5✔
203

204
                return nil, nil
5✔
205
        }
206

207
        return t.processTransfer(
7✔
208
                ctx,
7✔
209
                processTaskIfClosed,
7✔
210
                transferTask,
7✔
211
                actionFn,
7✔
212
                getStandbyPostActionFn(
7✔
213
                        transferTask,
7✔
214
                        t.getCurrentTime,
7✔
215
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
216
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
217
                        t.pushDecision,
7✔
218
                        t.pushDecision,
7✔
219
                ),
7✔
220
        )
7✔
221
}
222

223
func (t *transferStandbyTaskExecutor) processCloseExecution(
224
        ctx context.Context,
225
        transferTask *persistence.TransferTaskInfo,
226
) error {
5✔
227

5✔
228
        processTaskIfClosed := true
5✔
229
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
10✔
230

5✔
231
                if mutableState.IsWorkflowExecutionRunning() {
8✔
232
                        // this can happen if workflow is reset.
3✔
233
                        return nil, nil
3✔
234
                }
3✔
235

236
                completionEvent, err := mutableState.GetCompletionEvent(ctx)
5✔
237
                if err != nil {
5✔
238
                        return nil, err
×
239
                }
×
240
                wfCloseTime := completionEvent.GetTimestamp()
5✔
241

5✔
242
                executionInfo := mutableState.GetExecutionInfo()
5✔
243
                workflowTypeName := executionInfo.WorkflowTypeName
5✔
244
                workflowCloseTimestamp := wfCloseTime
5✔
245
                workflowCloseStatus := persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
5✔
246
                workflowHistoryLength := mutableState.GetNextEventID() - 1
5✔
247
                startEvent, err := mutableState.GetStartEvent(ctx)
5✔
248
                if err != nil {
5✔
249
                        return nil, err
×
250
                }
×
251
                workflowStartTimestamp := startEvent.GetTimestamp()
5✔
252
                workflowExecutionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
5✔
253
                visibilityMemo := getWorkflowMemo(executionInfo.Memo)
5✔
254
                searchAttr := executionInfo.SearchAttributes
5✔
255
                isCron := len(executionInfo.CronSchedule) > 0
5✔
256
                updateTimestamp := t.shard.GetTimeSource().Now()
5✔
257

5✔
258
                lastWriteVersion, err := mutableState.GetLastWriteVersion()
5✔
259
                if err != nil {
5✔
260
                        return nil, err
×
261
                }
×
262
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, lastWriteVersion, transferTask.Version, transferTask)
5✔
263
                if err != nil || !ok {
8✔
264
                        return nil, err
3✔
265
                }
3✔
266

267
                domainEntry, err := t.shard.GetDomainCache().GetDomainByID(transferTask.DomainID)
5✔
268
                if err != nil {
5✔
269
                        return nil, err
×
270
                }
×
271
                numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
5✔
272

5✔
273
                // DO NOT REPLY TO PARENT
5✔
274
                // since event replication should be done by active cluster
5✔
275
                return nil, t.recordWorkflowClosed(
5✔
276
                        ctx,
5✔
277
                        transferTask.DomainID,
5✔
278
                        transferTask.WorkflowID,
5✔
279
                        transferTask.RunID,
5✔
280
                        workflowTypeName,
5✔
281
                        workflowStartTimestamp,
5✔
282
                        workflowExecutionTimestamp.UnixNano(),
5✔
283
                        workflowCloseTimestamp,
5✔
284
                        *workflowCloseStatus,
5✔
285
                        workflowHistoryLength,
5✔
286
                        transferTask.GetTaskID(),
5✔
287
                        visibilityMemo,
5✔
288
                        executionInfo.TaskList,
5✔
289
                        isCron,
5✔
290
                        numClusters,
5✔
291
                        updateTimestamp.UnixNano(),
5✔
292
                        searchAttr,
5✔
293
                )
5✔
294
        }
295

296
        return t.processTransfer(
5✔
297
                ctx,
5✔
298
                processTaskIfClosed,
5✔
299
                transferTask,
5✔
300
                actionFn,
5✔
301
                standbyTaskPostActionNoOp,
5✔
302
        ) // no op post action, since the entire workflow is finished
5✔
303
}
304

305
func (t *transferStandbyTaskExecutor) processCancelExecution(
306
        ctx context.Context,
307
        transferTask *persistence.TransferTaskInfo,
308
) error {
7✔
309

7✔
310
        processTaskIfClosed := false
7✔
311
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
312

7✔
313
                requestCancelInfo, ok := mutableState.GetRequestCancelInfo(transferTask.ScheduleID)
7✔
314
                if !ok {
11✔
315
                        return nil, nil
4✔
316
                }
4✔
317

318
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, requestCancelInfo.Version, transferTask.Version, transferTask)
6✔
319
                if err != nil || !ok {
6✔
320
                        return nil, err
×
321
                }
×
322

323
                return getHistoryResendInfo(mutableState)
6✔
324
        }
325

326
        return t.processTransfer(
7✔
327
                ctx,
7✔
328
                processTaskIfClosed,
7✔
329
                transferTask,
7✔
330
                actionFn,
7✔
331
                getStandbyPostActionFn(
7✔
332
                        transferTask,
7✔
333
                        t.getCurrentTime,
7✔
334
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
335
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
336
                        t.fetchHistoryFromRemote,
7✔
337
                        standbyTransferTaskPostActionTaskDiscarded,
7✔
338
                ),
7✔
339
        )
7✔
340
}
341

342
func (t *transferStandbyTaskExecutor) processSignalExecution(
343
        ctx context.Context,
344
        transferTask *persistence.TransferTaskInfo,
345
) error {
7✔
346

7✔
347
        processTaskIfClosed := false
7✔
348
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
349

7✔
350
                signalInfo, ok := mutableState.GetSignalInfo(transferTask.ScheduleID)
7✔
351
                if !ok {
11✔
352
                        return nil, nil
4✔
353
                }
4✔
354

355
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, signalInfo.Version, transferTask.Version, transferTask)
6✔
356
                if err != nil || !ok {
6✔
357
                        return nil, err
×
358
                }
×
359

360
                return getHistoryResendInfo(mutableState)
6✔
361
        }
362

363
        return t.processTransfer(
7✔
364
                ctx,
7✔
365
                processTaskIfClosed,
7✔
366
                transferTask,
7✔
367
                actionFn,
7✔
368
                getStandbyPostActionFn(
7✔
369
                        transferTask,
7✔
370
                        t.getCurrentTime,
7✔
371
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
372
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
373
                        t.fetchHistoryFromRemote,
7✔
374
                        standbyTransferTaskPostActionTaskDiscarded,
7✔
375
                ),
7✔
376
        )
7✔
377
}
378

379
func (t *transferStandbyTaskExecutor) processStartChildExecution(
380
        ctx context.Context,
381
        transferTask *persistence.TransferTaskInfo,
382
) error {
7✔
383

7✔
384
        processTaskIfClosed := false
7✔
385
        actionFn := func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
14✔
386

7✔
387
                childWorkflowInfo, ok := mutableState.GetChildExecutionInfo(transferTask.ScheduleID)
7✔
388
                if !ok {
11✔
389
                        return nil, nil
4✔
390
                }
4✔
391

392
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, childWorkflowInfo.Version, transferTask.Version, transferTask)
6✔
393
                if err != nil || !ok {
6✔
394
                        return nil, err
×
395
                }
×
396

397
                if childWorkflowInfo.StartedID != common.EmptyEventID {
7✔
398
                        return nil, nil
1✔
399
                }
1✔
400

401
                return getHistoryResendInfo(mutableState)
6✔
402
        }
403

404
        return t.processTransfer(
7✔
405
                ctx,
7✔
406
                processTaskIfClosed,
7✔
407
                transferTask,
7✔
408
                actionFn,
7✔
409
                getStandbyPostActionFn(
7✔
410
                        transferTask,
7✔
411
                        t.getCurrentTime,
7✔
412
                        t.config.StandbyTaskMissingEventsResendDelay(),
7✔
413
                        t.config.StandbyTaskMissingEventsDiscardDelay(),
7✔
414
                        t.fetchHistoryFromRemote,
7✔
415
                        standbyTransferTaskPostActionTaskDiscarded,
7✔
416
                ),
7✔
417
        )
7✔
418
}
419

420
func (t *transferStandbyTaskExecutor) processRecordWorkflowStarted(
421
        ctx context.Context,
422
        transferTask *persistence.TransferTaskInfo,
423
) error {
5✔
424

5✔
425
        processTaskIfClosed := false
5✔
426
        return t.processTransfer(
5✔
427
                ctx,
5✔
428
                processTaskIfClosed,
5✔
429
                transferTask,
5✔
430
                func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
10✔
431
                        return nil, t.processRecordWorkflowStartedOrUpsertHelper(ctx, transferTask, mutableState, true)
5✔
432
                },
5✔
433
                standbyTaskPostActionNoOp,
434
        )
435
}
436

437
func (t *transferStandbyTaskExecutor) processUpsertWorkflowSearchAttributes(
438
        ctx context.Context,
439
        transferTask *persistence.TransferTaskInfo,
440
) error {
2✔
441

2✔
442
        processTaskIfClosed := false
2✔
443
        return t.processTransfer(
2✔
444
                ctx,
2✔
445
                processTaskIfClosed,
2✔
446
                transferTask,
2✔
447
                func(ctx context.Context, wfContext execution.Context, mutableState execution.MutableState) (interface{}, error) {
4✔
448
                        return nil, t.processRecordWorkflowStartedOrUpsertHelper(ctx, transferTask, mutableState, false)
2✔
449
                },
2✔
450
                standbyTaskPostActionNoOp,
451
        )
452
}
453

454
func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
455
        ctx context.Context,
456
        transferTask *persistence.TransferTaskInfo,
457
        mutableState execution.MutableState,
458
        isRecordStart bool,
459
) error {
7✔
460

7✔
461
        workflowStartedScope := getOrCreateDomainTaggedScope(t.shard, metrics.TransferStandbyTaskRecordWorkflowStartedScope, transferTask.DomainID, t.logger)
7✔
462

7✔
463
        // verify task version for RecordWorkflowStarted.
7✔
464
        // upsert doesn't require verifyTask, because it is just a sync of mutableState.
7✔
465
        if isRecordStart {
12✔
466
                startVersion, err := mutableState.GetStartVersion()
5✔
467
                if err != nil {
5✔
468
                        return err
×
469
                }
×
470
                ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, startVersion, transferTask.Version, transferTask)
5✔
471
                if err != nil || !ok {
5✔
472
                        return err
×
473
                }
×
474
        }
475

476
        executionInfo := mutableState.GetExecutionInfo()
7✔
477
        workflowTimeout := executionInfo.WorkflowTimeout
7✔
478
        wfTypeName := executionInfo.WorkflowTypeName
7✔
479
        startEvent, err := mutableState.GetStartEvent(ctx)
7✔
480
        if err != nil {
7✔
481
                return err
×
482
        }
×
483
        startTimestamp := startEvent.GetTimestamp()
7✔
484
        executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
7✔
485
        visibilityMemo := getWorkflowMemo(executionInfo.Memo)
7✔
486
        isCron := len(executionInfo.CronSchedule) > 0
7✔
487
        updateTimestamp := t.shard.GetTimeSource().Now()
7✔
488

7✔
489
        domainEntry, err := t.shard.GetDomainCache().GetDomainByID(transferTask.DomainID)
7✔
490
        if err != nil {
7✔
491
                return err
×
492
        }
×
493
        numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
7✔
494

7✔
495
        searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
7✔
496
        if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
9✔
497
                if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil {
4✔
498
                        // fail open to avoid blocking the task processing
2✔
499
                        if newSearchAttr, err := appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()); err != nil {
2✔
500
                                t.logger.Error("failed to add headers to search attributes", tag.Error(err))
×
501
                        } else {
2✔
502
                                searchAttr = newSearchAttr
2✔
503
                        }
2✔
504
                }
505
        }
506

507
        if isRecordStart {
12✔
508
                workflowStartedScope.IncCounter(metrics.WorkflowStartedCount)
5✔
509
                return t.recordWorkflowStarted(
5✔
510
                        ctx,
5✔
511
                        transferTask.DomainID,
5✔
512
                        transferTask.WorkflowID,
5✔
513
                        transferTask.RunID,
5✔
514
                        wfTypeName,
5✔
515
                        startTimestamp,
5✔
516
                        executionTimestamp.UnixNano(),
5✔
517
                        workflowTimeout,
5✔
518
                        transferTask.GetTaskID(),
5✔
519
                        executionInfo.TaskList,
5✔
520
                        isCron,
5✔
521
                        numClusters,
5✔
522
                        visibilityMemo,
5✔
523
                        updateTimestamp.UnixNano(),
5✔
524
                        searchAttr,
5✔
525
                )
5✔
526
        }
5✔
527
        return t.upsertWorkflowExecution(
2✔
528
                ctx,
2✔
529
                transferTask.DomainID,
2✔
530
                transferTask.WorkflowID,
2✔
531
                transferTask.RunID,
2✔
532
                wfTypeName,
2✔
533
                startTimestamp,
2✔
534
                executionTimestamp.UnixNano(),
2✔
535
                workflowTimeout,
2✔
536
                transferTask.GetTaskID(),
2✔
537
                executionInfo.TaskList,
2✔
538
                visibilityMemo,
2✔
539
                isCron,
2✔
540
                numClusters,
2✔
541
                updateTimestamp.UnixNano(),
2✔
542
                searchAttr,
2✔
543
        )
2✔
544

545
}
546

547
func (t *transferStandbyTaskExecutor) processTransfer(
548
        ctx context.Context,
549
        processTaskIfClosed bool,
550
        taskInfo Info,
551
        actionFn standbyActionFn,
552
        postActionFn standbyPostActionFn,
553
) (retError error) {
28✔
554

28✔
555
        transferTask := taskInfo.(*persistence.TransferTaskInfo)
28✔
556
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
28✔
557
                transferTask.DomainID,
28✔
558
                getWorkflowExecution(transferTask),
28✔
559
                taskGetExecutionContextTimeout,
28✔
560
        )
28✔
561
        if err != nil {
28✔
UNCOV
562
                if err == context.DeadlineExceeded {
×
UNCOV
563
                        return errWorkflowBusy
×
UNCOV
564
                }
×
565
                return err
×
566
        }
567
        defer func() {
56✔
568
                if isRedispatchErr(err) {
28✔
569
                        release(nil)
×
570
                } else {
28✔
571
                        release(retError)
28✔
572
                }
28✔
573
        }()
574

575
        mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, transferTask, t.metricsClient, t.logger)
28✔
576
        if err != nil || mutableState == nil {
31✔
577
                return err
3✔
578
        }
3✔
579

580
        if !mutableState.IsWorkflowExecutionRunning() && !processTaskIfClosed {
31✔
581
                // workflow already finished, no need to process the timer
3✔
582
                return nil
3✔
583
        }
3✔
584

585
        historyResendInfo, err := actionFn(ctx, wfContext, mutableState)
28✔
586
        if err != nil {
28✔
587
                return err
×
588
        }
×
589

590
        release(nil)
28✔
591
        return postActionFn(ctx, taskInfo, historyResendInfo, t.logger)
28✔
592
}
593

594
func (t *transferStandbyTaskExecutor) pushActivity(
595
        ctx context.Context,
596
        task Info,
597
        postActionInfo interface{},
598
        logger log.Logger,
599
) error {
1✔
600

1✔
601
        if postActionInfo == nil {
1✔
602
                return nil
×
603
        }
×
604

605
        pushActivityInfo := postActionInfo.(*pushActivityToMatchingInfo)
1✔
606
        timeout := common.MinInt32(pushActivityInfo.activityScheduleToStartTimeout, common.MaxTaskTimeout)
1✔
607
        return t.transferTaskExecutorBase.pushActivity(
1✔
608
                ctx,
1✔
609
                task.(*persistence.TransferTaskInfo),
1✔
610
                timeout,
1✔
611
                pushActivityInfo.partitionConfig,
1✔
612
        )
1✔
613
}
614

615
func (t *transferStandbyTaskExecutor) pushDecision(
616
        ctx context.Context,
617
        task Info,
618
        postActionInfo interface{},
619
        logger log.Logger,
620
) error {
1✔
621

1✔
622
        if postActionInfo == nil {
1✔
623
                return nil
×
624
        }
×
625

626
        pushDecisionInfo := postActionInfo.(*pushDecisionToMatchingInfo)
1✔
627
        timeout := common.MinInt32(pushDecisionInfo.decisionScheduleToStartTimeout, common.MaxTaskTimeout)
1✔
628
        return t.transferTaskExecutorBase.pushDecision(
1✔
629
                ctx,
1✔
630
                task.(*persistence.TransferTaskInfo),
1✔
631
                &pushDecisionInfo.tasklist,
1✔
632
                timeout,
1✔
633
                pushDecisionInfo.partitionConfig,
1✔
634
        )
1✔
635
}
636

637
func (t *transferStandbyTaskExecutor) fetchHistoryFromRemote(
638
        _ context.Context,
639
        taskInfo Info,
640
        postActionInfo interface{},
641
        _ log.Logger,
642
) error {
3✔
643

3✔
644
        if postActionInfo == nil {
3✔
645
                return nil
×
646
        }
×
647

648
        task := taskInfo.(*persistence.TransferTaskInfo)
3✔
649
        resendInfo := postActionInfo.(*historyResendInfo)
3✔
650

3✔
651
        t.metricsClient.IncCounter(metrics.HistoryRereplicationByTransferTaskScope, metrics.CadenceClientRequests)
3✔
652
        stopwatch := t.metricsClient.StartTimer(metrics.HistoryRereplicationByTransferTaskScope, metrics.CadenceClientLatency)
3✔
653
        defer stopwatch.Stop()
3✔
654

3✔
655
        var err error
3✔
656
        if resendInfo.lastEventID != nil && resendInfo.lastEventVersion != nil {
6✔
657
                // note history resender doesn't take in a context parameter, there's a separate dynamicconfig for
3✔
658
                // controlling the timeout for resending history.
3✔
659
                err = t.historyResender.SendSingleWorkflowHistory(
3✔
660
                        task.DomainID,
3✔
661
                        task.WorkflowID,
3✔
662
                        task.RunID,
3✔
663
                        resendInfo.lastEventID,
3✔
664
                        resendInfo.lastEventVersion,
3✔
665
                        nil,
3✔
666
                        nil,
3✔
667
                )
3✔
668
        } else {
3✔
669
                err = &types.InternalServiceError{
×
670
                        Message: fmt.Sprintf("incomplete historyResendInfo: %v", resendInfo),
×
671
                }
×
672
        }
×
673

674
        if err != nil {
3✔
675
                t.logger.Error("Error re-replicating history from remote.",
×
676
                        tag.ShardID(t.shard.GetShardID()),
×
677
                        tag.WorkflowDomainID(task.DomainID),
×
678
                        tag.WorkflowID(task.WorkflowID),
×
679
                        tag.WorkflowRunID(task.RunID),
×
680
                        tag.SourceCluster(t.clusterName),
×
681
                        tag.Error(err),
×
682
                )
×
683
        }
×
684

685
        // return error so task processing logic will retry
686
        return &redispatchError{Reason: "fetchHistoryFromRemote"}
3✔
687
}
688

689
func (t *transferStandbyTaskExecutor) getCurrentTime() time.Time {
22✔
690
        return t.shard.GetCurrentTime(t.clusterName)
22✔
691
}
22✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc