• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018ea0c8-b3fa-4137-afa5-8777a0d22d18

02 Apr 2024 09:48PM UTC coverage: 65.792% (+0.006%) from 65.786%
018ea0c8-b3fa-4137-afa5-8777a0d22d18

push

buildkite

web-flow
Added tests for nosql_Store.go timers (#5838)

96280 of 146339 relevant lines covered (65.79%)

2364.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.38
/service/history/task/timer_task_executor_base.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25

26
        "github.com/uber/cadence/common"
27
        "github.com/uber/cadence/common/backoff"
28
        "github.com/uber/cadence/common/cache"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/metrics"
31
        "github.com/uber/cadence/common/persistence"
32
        "github.com/uber/cadence/common/service"
33
        "github.com/uber/cadence/common/types"
34
        "github.com/uber/cadence/service/history/config"
35
        "github.com/uber/cadence/service/history/execution"
36
        "github.com/uber/cadence/service/history/shard"
37
        "github.com/uber/cadence/service/worker/archiver"
38
)
39

40
var (
41
        taskRetryPolicy = common.CreateTaskProcessingRetryPolicy()
42
)
43

44
type (
45
        timerTaskExecutorBase struct {
46
                shard          shard.Context
47
                archiverClient archiver.Client
48
                executionCache *execution.Cache
49
                logger         log.Logger
50
                metricsClient  metrics.Client
51
                config         *config.Config
52
                throttleRetry  *backoff.ThrottleRetry
53
        }
54
)
55

56
func newTimerTaskExecutorBase(
57
        shard shard.Context,
58
        archiverClient archiver.Client,
59
        executionCache *execution.Cache,
60
        logger log.Logger,
61
        metricsClient metrics.Client,
62
        config *config.Config,
63
) *timerTaskExecutorBase {
139✔
64
        return &timerTaskExecutorBase{
139✔
65
                shard:          shard,
139✔
66
                archiverClient: archiverClient,
139✔
67
                executionCache: executionCache,
139✔
68
                logger:         logger,
139✔
69
                metricsClient:  metricsClient,
139✔
70
                config:         config,
139✔
71
                throttleRetry: backoff.NewThrottleRetry(
139✔
72
                        backoff.WithRetryPolicy(taskRetryPolicy),
139✔
73
                        backoff.WithRetryableError(persistence.IsTransientError),
139✔
74
                ),
139✔
75
        }
139✔
76
}
139✔
77

78
func (t *timerTaskExecutorBase) executeDeleteHistoryEventTask(
79
        ctx context.Context,
80
        task *persistence.TimerTaskInfo,
81
) (retError error) {
54✔
82

54✔
83
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
54✔
84
                task.DomainID,
54✔
85
                getWorkflowExecution(task),
54✔
86
                taskGetExecutionContextTimeout,
54✔
87
        )
54✔
88
        if err != nil {
54✔
89
                return err
×
90
        }
×
91
        defer func() { release(retError) }()
108✔
92

93
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
54✔
94
        if err != nil {
54✔
95
                return err
×
96
        }
×
97
        if mutableState == nil || mutableState.IsWorkflowExecutionRunning() {
57✔
98
                return nil
3✔
99
        }
3✔
100

101
        lastWriteVersion, err := mutableState.GetLastWriteVersion()
54✔
102
        if err != nil {
54✔
103
                return err
×
104
        }
×
105
        ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, lastWriteVersion, task.Version, task)
54✔
106
        if err != nil || !ok {
54✔
107
                return err
×
108
        }
×
109

110
        domainCacheEntry, err := t.shard.GetDomainCache().GetDomainByID(task.DomainID)
54✔
111
        if err != nil {
54✔
112
                return err
×
113
        }
×
114
        clusterConfiguredForHistoryArchival := t.shard.GetService().GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival()
54✔
115
        domainConfiguredForHistoryArchival := domainCacheEntry.GetConfig().HistoryArchivalStatus == types.ArchivalStatusEnabled
54✔
116
        archiveHistory := clusterConfiguredForHistoryArchival && domainConfiguredForHistoryArchival
54✔
117

54✔
118
        // TODO: @ycyang once archival backfill is in place cluster:paused && domain:enabled should be a nop rather than a delete
54✔
119
        if archiveHistory {
105✔
120
                t.metricsClient.IncCounter(metrics.HistoryProcessDeleteHistoryEventScope, metrics.WorkflowCleanupArchiveCount)
51✔
121
                return t.archiveWorkflow(ctx, task, wfContext, mutableState, domainCacheEntry)
51✔
122
        }
51✔
123

124
        t.metricsClient.IncCounter(metrics.HistoryProcessDeleteHistoryEventScope, metrics.WorkflowCleanupDeleteCount)
3✔
125
        return t.deleteWorkflow(ctx, task, wfContext, mutableState)
3✔
126
}
127

128
func (t *timerTaskExecutorBase) deleteWorkflow(
129
        ctx context.Context,
130
        task *persistence.TimerTaskInfo,
131
        context execution.Context,
132
        msBuilder execution.MutableState,
133
) error {
4✔
134

4✔
135
        if err := t.deleteWorkflowHistory(ctx, task, msBuilder); err != nil {
4✔
136
                return err
×
137
        }
×
138

139
        if err := t.deleteWorkflowVisibility(ctx, task); err != nil {
4✔
140
                return err
×
141
        }
×
142

143
        if err := t.deleteCurrentWorkflowExecution(ctx, task); err != nil {
4✔
144
                return err
×
145
        }
×
146

147
        // it must be the last one due to the nature of workflow execution deletion
148
        if err := t.deleteWorkflowExecution(ctx, task); err != nil {
4✔
149
                return err
×
150
        }
×
151

152
        // calling clear here to force accesses of mutable state to read database
153
        // if this is not called then callers will get mutable state even though its been removed from database
154
        context.Clear()
4✔
155
        return nil
4✔
156
}
157

158
func (t *timerTaskExecutorBase) archiveWorkflow(
159
        ctx context.Context,
160
        task *persistence.TimerTaskInfo,
161
        workflowContext execution.Context,
162
        msBuilder execution.MutableState,
163
        domainCacheEntry *cache.DomainCacheEntry,
164
) error {
53✔
165
        branchToken, err := msBuilder.GetCurrentBranchToken()
53✔
166
        if err != nil {
53✔
167
                return err
×
168
        }
×
169
        closeFailoverVersion, err := msBuilder.GetLastWriteVersion()
53✔
170
        if err != nil {
53✔
171
                return err
×
172
        }
×
173

174
        req := &archiver.ClientRequest{
53✔
175
                ArchiveRequest: &archiver.ArchiveRequest{
53✔
176
                        DomainID:             task.DomainID,
53✔
177
                        WorkflowID:           task.WorkflowID,
53✔
178
                        RunID:                task.RunID,
53✔
179
                        DomainName:           domainCacheEntry.GetInfo().Name,
53✔
180
                        ShardID:              t.shard.GetShardID(),
53✔
181
                        Targets:              []archiver.ArchivalTarget{archiver.ArchiveTargetHistory},
53✔
182
                        URI:                  domainCacheEntry.GetConfig().HistoryArchivalURI,
53✔
183
                        NextEventID:          msBuilder.GetNextEventID(),
53✔
184
                        BranchToken:          branchToken,
53✔
185
                        CloseFailoverVersion: closeFailoverVersion,
53✔
186
                },
53✔
187
                CallerService:        service.History,
53✔
188
                AttemptArchiveInline: false, // archive in workflow by default
53✔
189
        }
53✔
190
        executionStats, err := workflowContext.LoadExecutionStats(ctx)
53✔
191
        if err == nil && executionStats.HistorySize < int64(t.config.TimerProcessorHistoryArchivalSizeLimit()) {
102✔
192
                req.AttemptArchiveInline = true
49✔
193
        }
49✔
194

195
        archiveCtx, cancel := context.WithTimeout(ctx, t.config.TimerProcessorArchivalTimeLimit())
53✔
196
        defer cancel()
53✔
197
        resp, err := t.archiverClient.Archive(archiveCtx, req)
53✔
198
        if err != nil {
54✔
199
                return err
1✔
200
        }
1✔
201

202
        // delete workflow history if history archival is not needed or history as been archived inline
203
        if resp.HistoryArchivedInline {
100✔
204
                t.metricsClient.IncCounter(metrics.HistoryProcessDeleteHistoryEventScope, metrics.WorkflowCleanupDeleteHistoryInlineCount)
48✔
205
                if err := t.deleteWorkflowHistory(ctx, task, msBuilder); err != nil {
48✔
206
                        return err
×
207
                }
×
208
        }
209
        // delete visibility record here regardless if it's been archived inline or not
210
        // since the entire record is included as part of the archive request.
211
        if err := t.deleteWorkflowVisibility(ctx, task); err != nil {
52✔
212
                return err
×
213
        }
×
214

215
        if err := t.deleteCurrentWorkflowExecution(ctx, task); err != nil {
52✔
216
                return err
×
217
        }
×
218
        if err := t.deleteWorkflowExecution(ctx, task); err != nil {
52✔
219
                return err
×
220
        }
×
221
        // calling clear here to force accesses of mutable state to read database
222
        // if this is not called then callers will get mutable state even though its been removed from database
223
        workflowContext.Clear()
52✔
224
        return nil
52✔
225
}
226

227
func (t *timerTaskExecutorBase) deleteWorkflowExecution(
228
        ctx context.Context,
229
        task *persistence.TimerTaskInfo,
230
) error {
56✔
231
        domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
56✔
232
        if err != nil {
56✔
233
                return err
×
234
        }
×
235
        op := func() error {
112✔
236
                return t.shard.GetExecutionManager().DeleteWorkflowExecution(ctx, &persistence.DeleteWorkflowExecutionRequest{
56✔
237
                        DomainID:   task.DomainID,
56✔
238
                        WorkflowID: task.WorkflowID,
56✔
239
                        RunID:      task.RunID,
56✔
240
                        DomainName: domainName,
56✔
241
                })
56✔
242
        }
56✔
243
        return t.throttleRetry.Do(ctx, op)
56✔
244
}
245

246
func (t *timerTaskExecutorBase) deleteCurrentWorkflowExecution(
247
        ctx context.Context,
248
        task *persistence.TimerTaskInfo,
249
) error {
56✔
250
        domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
56✔
251
        if err != nil {
56✔
252
                return err
×
253
        }
×
254
        op := func() error {
112✔
255
                return t.shard.GetExecutionManager().DeleteCurrentWorkflowExecution(ctx, &persistence.DeleteCurrentWorkflowExecutionRequest{
56✔
256
                        DomainID:   task.DomainID,
56✔
257
                        WorkflowID: task.WorkflowID,
56✔
258
                        RunID:      task.RunID,
56✔
259
                        DomainName: domainName,
56✔
260
                })
56✔
261
        }
56✔
262
        return t.throttleRetry.Do(ctx, op)
56✔
263
}
264

265
func (t *timerTaskExecutorBase) deleteWorkflowHistory(
266
        ctx context.Context,
267
        task *persistence.TimerTaskInfo,
268
        msBuilder execution.MutableState,
269
) error {
52✔
270

52✔
271
        op := func() error {
104✔
272
                branchToken, err := msBuilder.GetCurrentBranchToken()
52✔
273
                if err != nil {
52✔
274
                        return err
×
275
                }
×
276
                domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
52✔
277
                if err != nil {
52✔
278
                        return err
×
279
                }
×
280
                return t.shard.GetHistoryManager().DeleteHistoryBranch(ctx, &persistence.DeleteHistoryBranchRequest{
52✔
281
                        BranchToken: branchToken,
52✔
282
                        ShardID:     common.IntPtr(t.shard.GetShardID()),
52✔
283
                        DomainName:  domainName,
52✔
284
                })
52✔
285

286
        }
287
        return t.throttleRetry.Do(ctx, op)
52✔
288
}
289

290
func (t *timerTaskExecutorBase) deleteWorkflowVisibility(
291
        ctx context.Context,
292
        task *persistence.TimerTaskInfo,
293
) error {
56✔
294

56✔
295
        domain, errorDomainName := t.shard.GetDomainCache().GetDomainName(task.DomainID)
56✔
296
        if errorDomainName != nil {
56✔
297
                return errorDomainName
×
298
        }
×
299
        op := func() error {
112✔
300
                request := &persistence.VisibilityDeleteWorkflowExecutionRequest{
56✔
301
                        DomainID:   task.DomainID,
56✔
302
                        Domain:     domain,
56✔
303
                        WorkflowID: task.WorkflowID,
56✔
304
                        RunID:      task.RunID,
56✔
305
                        TaskID:     task.TaskID,
56✔
306
                }
56✔
307
                // TODO: expose GetVisibilityManager method on shardContext interface
56✔
308
                return t.shard.GetService().GetVisibilityManager().DeleteWorkflowExecution(ctx, request) // delete from db
56✔
309
        }
56✔
310
        return t.throttleRetry.Do(ctx, op)
56✔
311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc