• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e1da2-2e13-4d90-9d75-822f85f1d078

08 Mar 2024 10:35AM UTC coverage: 63.659% (+0.2%) from 63.447%
018e1da2-2e13-4d90-9d75-822f85f1d078

push

buildkite

web-flow
[code-coverage] update admin and frontend client to use generated code (#5702)

* [code-coverage] update admin client to use generated code

* delete client.go to be replaced

* [code-coverage] update frontend client to use generated code

* rename package to timeout

* fix lint and updated comments in template

* fix merge conflict with histoy client changes and seperate createContext logic

* delete timed.tmpl

* add check for nonnegative timeout in createContext

* rename constants.go file to timeout

* resolve last merge conflicts

14 of 14 new or added lines in 3 files covered. (100.0%)

110 existing lines in 18 files now uncovered.

93021 of 146125 relevant lines covered (63.66%)

2357.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.38
/service/history/task/timer_task_executor_base.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25

26
        "github.com/uber/cadence/common"
27
        "github.com/uber/cadence/common/backoff"
28
        "github.com/uber/cadence/common/cache"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/metrics"
31
        "github.com/uber/cadence/common/persistence"
32
        "github.com/uber/cadence/common/service"
33
        "github.com/uber/cadence/common/types"
34
        "github.com/uber/cadence/service/history/config"
35
        "github.com/uber/cadence/service/history/execution"
36
        "github.com/uber/cadence/service/history/shard"
37
        "github.com/uber/cadence/service/worker/archiver"
38
)
39

40
var (
41
        taskRetryPolicy = common.CreateTaskProcessingRetryPolicy()
42
)
43

44
type (
45
        timerTaskExecutorBase struct {
46
                shard          shard.Context
47
                archiverClient archiver.Client
48
                executionCache *execution.Cache
49
                logger         log.Logger
50
                metricsClient  metrics.Client
51
                config         *config.Config
52
                throttleRetry  *backoff.ThrottleRetry
53
        }
54
)
55

56
func newTimerTaskExecutorBase(
57
        shard shard.Context,
58
        archiverClient archiver.Client,
59
        executionCache *execution.Cache,
60
        logger log.Logger,
61
        metricsClient metrics.Client,
62
        config *config.Config,
63
) *timerTaskExecutorBase {
139✔
64
        return &timerTaskExecutorBase{
139✔
65
                shard:          shard,
139✔
66
                archiverClient: archiverClient,
139✔
67
                executionCache: executionCache,
139✔
68
                logger:         logger,
139✔
69
                metricsClient:  metricsClient,
139✔
70
                config:         config,
139✔
71
                throttleRetry: backoff.NewThrottleRetry(
139✔
72
                        backoff.WithRetryPolicy(taskRetryPolicy),
139✔
73
                        backoff.WithRetryableError(persistence.IsTransientError),
139✔
74
                ),
139✔
75
        }
139✔
76
}
139✔
77

78
func (t *timerTaskExecutorBase) executeDeleteHistoryEventTask(
79
        ctx context.Context,
80
        task *persistence.TimerTaskInfo,
81
) (retError error) {
54✔
82

54✔
83
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
54✔
84
                task.DomainID,
54✔
85
                getWorkflowExecution(task),
54✔
86
                taskGetExecutionContextTimeout,
54✔
87
        )
54✔
88
        if err != nil {
54✔
89
                return err
×
90
        }
×
91
        defer func() { release(retError) }()
108✔
92

93
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
54✔
94
        if err != nil {
54✔
95
                return err
×
96
        }
×
97
        if mutableState == nil || mutableState.IsWorkflowExecutionRunning() {
57✔
98
                return nil
3✔
99
        }
3✔
100

101
        lastWriteVersion, err := mutableState.GetLastWriteVersion()
54✔
102
        if err != nil {
54✔
103
                return err
×
104
        }
×
105
        ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, lastWriteVersion, task.Version, task)
54✔
106
        if err != nil || !ok {
54✔
UNCOV
107
                return err
×
UNCOV
108
        }
×
109

110
        domainCacheEntry, err := t.shard.GetDomainCache().GetDomainByID(task.DomainID)
54✔
111
        if err != nil {
54✔
112
                return err
×
113
        }
×
114
        clusterConfiguredForHistoryArchival := t.shard.GetService().GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival()
54✔
115
        domainConfiguredForHistoryArchival := domainCacheEntry.GetConfig().HistoryArchivalStatus == types.ArchivalStatusEnabled
54✔
116
        archiveHistory := clusterConfiguredForHistoryArchival && domainConfiguredForHistoryArchival
54✔
117

54✔
118
        // TODO: @ycyang once archival backfill is in place cluster:paused && domain:enabled should be a nop rather than a delete
54✔
119
        if archiveHistory {
105✔
120
                t.metricsClient.IncCounter(metrics.HistoryProcessDeleteHistoryEventScope, metrics.WorkflowCleanupArchiveCount)
51✔
121
                return t.archiveWorkflow(ctx, task, wfContext, mutableState, domainCacheEntry)
51✔
122
        }
51✔
123

124
        t.metricsClient.IncCounter(metrics.HistoryProcessDeleteHistoryEventScope, metrics.WorkflowCleanupDeleteCount)
3✔
125
        return t.deleteWorkflow(ctx, task, wfContext, mutableState)
3✔
126
}
127

128
func (t *timerTaskExecutorBase) deleteWorkflow(
129
        ctx context.Context,
130
        task *persistence.TimerTaskInfo,
131
        context execution.Context,
132
        msBuilder execution.MutableState,
133
) error {
4✔
134

4✔
135
        if err := t.deleteCurrentWorkflowExecution(ctx, task); err != nil {
4✔
136
                return err
×
137
        }
×
138

139
        if err := t.deleteWorkflowExecution(ctx, task); err != nil {
4✔
140
                return err
×
141
        }
×
142

143
        if err := t.deleteWorkflowHistory(ctx, task, msBuilder); err != nil {
4✔
144
                return err
×
145
        }
×
146

147
        if err := t.deleteWorkflowVisibility(ctx, task); err != nil {
4✔
148
                return err
×
149
        }
×
150
        // calling clear here to force accesses of mutable state to read database
151
        // if this is not called then callers will get mutable state even though its been removed from database
152
        context.Clear()
4✔
153
        return nil
4✔
154
}
155

156
func (t *timerTaskExecutorBase) archiveWorkflow(
157
        ctx context.Context,
158
        task *persistence.TimerTaskInfo,
159
        workflowContext execution.Context,
160
        msBuilder execution.MutableState,
161
        domainCacheEntry *cache.DomainCacheEntry,
162
) error {
53✔
163
        branchToken, err := msBuilder.GetCurrentBranchToken()
53✔
164
        if err != nil {
53✔
165
                return err
×
166
        }
×
167
        closeFailoverVersion, err := msBuilder.GetLastWriteVersion()
53✔
168
        if err != nil {
53✔
169
                return err
×
170
        }
×
171

172
        req := &archiver.ClientRequest{
53✔
173
                ArchiveRequest: &archiver.ArchiveRequest{
53✔
174
                        DomainID:             task.DomainID,
53✔
175
                        WorkflowID:           task.WorkflowID,
53✔
176
                        RunID:                task.RunID,
53✔
177
                        DomainName:           domainCacheEntry.GetInfo().Name,
53✔
178
                        ShardID:              t.shard.GetShardID(),
53✔
179
                        Targets:              []archiver.ArchivalTarget{archiver.ArchiveTargetHistory},
53✔
180
                        URI:                  domainCacheEntry.GetConfig().HistoryArchivalURI,
53✔
181
                        NextEventID:          msBuilder.GetNextEventID(),
53✔
182
                        BranchToken:          branchToken,
53✔
183
                        CloseFailoverVersion: closeFailoverVersion,
53✔
184
                },
53✔
185
                CallerService:        service.History,
53✔
186
                AttemptArchiveInline: false, // archive in workflow by default
53✔
187
        }
53✔
188
        executionStats, err := workflowContext.LoadExecutionStats(ctx)
53✔
189
        if err == nil && executionStats.HistorySize < int64(t.config.TimerProcessorHistoryArchivalSizeLimit()) {
102✔
190
                req.AttemptArchiveInline = true
49✔
191
        }
49✔
192

193
        archiveCtx, cancel := context.WithTimeout(ctx, t.config.TimerProcessorArchivalTimeLimit())
53✔
194
        defer cancel()
53✔
195
        resp, err := t.archiverClient.Archive(archiveCtx, req)
53✔
196
        if err != nil {
54✔
197
                return err
1✔
198
        }
1✔
199

200
        if err := t.deleteCurrentWorkflowExecution(ctx, task); err != nil {
52✔
201
                return err
×
202
        }
×
203
        if err := t.deleteWorkflowExecution(ctx, task); err != nil {
52✔
204
                return err
×
205
        }
×
206
        // delete workflow history if history archival is not needed or history as been archived inline
207
        if resp.HistoryArchivedInline {
100✔
208
                t.metricsClient.IncCounter(metrics.HistoryProcessDeleteHistoryEventScope, metrics.WorkflowCleanupDeleteHistoryInlineCount)
48✔
209
                if err := t.deleteWorkflowHistory(ctx, task, msBuilder); err != nil {
48✔
210
                        return err
×
211
                }
×
212
        }
213
        // delete visibility record here regardless if it's been archived inline or not
214
        // since the entire record is included as part of the archive request.
215
        if err := t.deleteWorkflowVisibility(ctx, task); err != nil {
52✔
216
                return err
×
217
        }
×
218
        // calling clear here to force accesses of mutable state to read database
219
        // if this is not called then callers will get mutable state even though its been removed from database
220
        workflowContext.Clear()
52✔
221
        return nil
52✔
222
}
223

224
func (t *timerTaskExecutorBase) deleteWorkflowExecution(
225
        ctx context.Context,
226
        task *persistence.TimerTaskInfo,
227
) error {
56✔
228
        domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
56✔
229
        if err != nil {
56✔
230
                return err
×
231
        }
×
232
        op := func() error {
112✔
233
                return t.shard.GetExecutionManager().DeleteWorkflowExecution(ctx, &persistence.DeleteWorkflowExecutionRequest{
56✔
234
                        DomainID:   task.DomainID,
56✔
235
                        WorkflowID: task.WorkflowID,
56✔
236
                        RunID:      task.RunID,
56✔
237
                        DomainName: domainName,
56✔
238
                })
56✔
239
        }
56✔
240
        return t.throttleRetry.Do(ctx, op)
56✔
241
}
242

243
func (t *timerTaskExecutorBase) deleteCurrentWorkflowExecution(
244
        ctx context.Context,
245
        task *persistence.TimerTaskInfo,
246
) error {
56✔
247
        domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
56✔
248
        if err != nil {
56✔
249
                return err
×
250
        }
×
251
        op := func() error {
112✔
252
                return t.shard.GetExecutionManager().DeleteCurrentWorkflowExecution(ctx, &persistence.DeleteCurrentWorkflowExecutionRequest{
56✔
253
                        DomainID:   task.DomainID,
56✔
254
                        WorkflowID: task.WorkflowID,
56✔
255
                        RunID:      task.RunID,
56✔
256
                        DomainName: domainName,
56✔
257
                })
56✔
258
        }
56✔
259
        return t.throttleRetry.Do(ctx, op)
56✔
260
}
261

262
func (t *timerTaskExecutorBase) deleteWorkflowHistory(
263
        ctx context.Context,
264
        task *persistence.TimerTaskInfo,
265
        msBuilder execution.MutableState,
266
) error {
52✔
267

52✔
268
        op := func() error {
104✔
269
                branchToken, err := msBuilder.GetCurrentBranchToken()
52✔
270
                if err != nil {
52✔
271
                        return err
×
272
                }
×
273
                domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
52✔
274
                if err != nil {
52✔
275
                        return err
×
276
                }
×
277
                return t.shard.GetHistoryManager().DeleteHistoryBranch(ctx, &persistence.DeleteHistoryBranchRequest{
52✔
278
                        BranchToken: branchToken,
52✔
279
                        ShardID:     common.IntPtr(t.shard.GetShardID()),
52✔
280
                        DomainName:  domainName,
52✔
281
                })
52✔
282

283
        }
284
        return t.throttleRetry.Do(ctx, op)
52✔
285
}
286

287
func (t *timerTaskExecutorBase) deleteWorkflowVisibility(
288
        ctx context.Context,
289
        task *persistence.TimerTaskInfo,
290
) error {
56✔
291

56✔
292
        domain, errorDomainName := t.shard.GetDomainCache().GetDomainName(task.DomainID)
56✔
293
        if errorDomainName != nil {
56✔
294
                return errorDomainName
×
295
        }
×
296
        op := func() error {
112✔
297
                request := &persistence.VisibilityDeleteWorkflowExecutionRequest{
56✔
298
                        DomainID:   task.DomainID,
56✔
299
                        Domain:     domain,
56✔
300
                        WorkflowID: task.WorkflowID,
56✔
301
                        RunID:      task.RunID,
56✔
302
                        TaskID:     task.TaskID,
56✔
303
                }
56✔
304
                // TODO: expose GetVisibilityManager method on shardContext interface
56✔
305
                return t.shard.GetService().GetVisibilityManager().DeleteWorkflowExecution(ctx, request) // delete from db
56✔
306
        }
56✔
307
        return t.throttleRetry.Do(ctx, op)
56✔
308
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc