• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018f9f5c-2963-46a5-8e77-e52888c66415

22 May 2024 08:12AM UTC coverage: 69.202% (-0.03%) from 69.232%
018f9f5c-2963-46a5-8e77-e52888c66415

push

buildkite

web-flow
Add method to list all workflow executions with support for partial match and search params (#6017)

* try

* update

* Update dataVisibilityManagerInterfaces.go

* updated

* Update pinot_visibility_store_test.go

* Update visibility_store_mock.go

* Update es_visibility_store_test.go

* Update pinot_visibility_store.go

77 of 89 new or added lines in 4 files covered. (86.52%)

80 existing lines in 21 files now uncovered.

101937 of 147303 relevant lines covered (69.2%)

2547.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.21
/service/history/task/timer_task_executor_base.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25

26
        "github.com/uber/cadence/common"
27
        "github.com/uber/cadence/common/backoff"
28
        "github.com/uber/cadence/common/cache"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/metrics"
31
        "github.com/uber/cadence/common/persistence"
32
        "github.com/uber/cadence/common/service"
33
        "github.com/uber/cadence/common/types"
34
        "github.com/uber/cadence/service/history/config"
35
        "github.com/uber/cadence/service/history/execution"
36
        "github.com/uber/cadence/service/history/shard"
37
        "github.com/uber/cadence/service/worker/archiver"
38
)
39

40
var (
41
        taskRetryPolicy = common.CreateTaskProcessingRetryPolicy()
42
)
43

44
type (
45
        timerTaskExecutorBase struct {
46
                shard          shard.Context
47
                archiverClient archiver.Client
48
                executionCache *execution.Cache
49
                logger         log.Logger
50
                metricsClient  metrics.Client
51
                config         *config.Config
52
                throttleRetry  *backoff.ThrottleRetry
53
                ctx            context.Context
54
                cancelFn       context.CancelFunc
55
        }
56
)
57

58
func newTimerTaskExecutorBase(
59
        shard shard.Context,
60
        archiverClient archiver.Client,
61
        executionCache *execution.Cache,
62
        logger log.Logger,
63
        metricsClient metrics.Client,
64
        config *config.Config,
65
) *timerTaskExecutorBase {
189✔
66
        ctx, cancelFn := context.WithCancel(context.Background())
189✔
67
        return &timerTaskExecutorBase{
189✔
68
                shard:          shard,
189✔
69
                archiverClient: archiverClient,
189✔
70
                executionCache: executionCache,
189✔
71
                logger:         logger,
189✔
72
                metricsClient:  metricsClient,
189✔
73
                config:         config,
189✔
74
                throttleRetry: backoff.NewThrottleRetry(
189✔
75
                        backoff.WithRetryPolicy(taskRetryPolicy),
189✔
76
                        backoff.WithRetryableError(persistence.IsTransientError),
189✔
77
                ),
189✔
78
                ctx:      ctx,
189✔
79
                cancelFn: cancelFn,
189✔
80
        }
189✔
81
}
189✔
82

83
func (t *timerTaskExecutorBase) executeDeleteHistoryEventTask(
84
        ctx context.Context,
85
        task *persistence.TimerTaskInfo,
86
) (retError error) {
56✔
87

56✔
88
        wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
56✔
89
                task.DomainID,
56✔
90
                getWorkflowExecution(task),
56✔
91
                taskGetExecutionContextTimeout,
56✔
92
        )
56✔
93
        if err != nil {
58✔
94
                return err
2✔
95
        }
2✔
96
        defer func() { release(retError) }()
108✔
97

98
        mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
54✔
99
        if err != nil {
54✔
100
                return err
×
101
        }
×
102
        if mutableState == nil || mutableState.IsWorkflowExecutionRunning() {
57✔
103
                return nil
3✔
104
        }
3✔
105

106
        lastWriteVersion, err := mutableState.GetLastWriteVersion()
54✔
107
        if err != nil {
54✔
108
                return err
×
109
        }
×
110
        ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, lastWriteVersion, task.Version, task)
54✔
111
        if err != nil || !ok {
54✔
UNCOV
112
                return err
×
UNCOV
113
        }
×
114

115
        domainCacheEntry, err := t.shard.GetDomainCache().GetDomainByID(task.DomainID)
54✔
116
        if err != nil {
54✔
117
                return err
×
118
        }
×
119
        clusterConfiguredForHistoryArchival := t.shard.GetService().GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival()
54✔
120
        domainConfiguredForHistoryArchival := domainCacheEntry.GetConfig().HistoryArchivalStatus == types.ArchivalStatusEnabled
54✔
121
        archiveHistory := clusterConfiguredForHistoryArchival && domainConfiguredForHistoryArchival
54✔
122

54✔
123
        // TODO: @ycyang once archival backfill is in place cluster:paused && domain:enabled should be a nop rather than a delete
54✔
124
        if archiveHistory {
105✔
125
                t.metricsClient.IncCounter(metrics.HistoryProcessDeleteHistoryEventScope, metrics.WorkflowCleanupArchiveCount)
51✔
126
                return t.archiveWorkflow(ctx, task, wfContext, mutableState, domainCacheEntry)
51✔
127
        }
51✔
128

129
        t.metricsClient.IncCounter(metrics.HistoryProcessDeleteHistoryEventScope, metrics.WorkflowCleanupDeleteCount)
3✔
130
        return t.deleteWorkflow(ctx, task, wfContext, mutableState)
3✔
131
}
132

133
func (t *timerTaskExecutorBase) deleteWorkflow(
134
        ctx context.Context,
135
        task *persistence.TimerTaskInfo,
136
        context execution.Context,
137
        msBuilder execution.MutableState,
138
) error {
4✔
139

4✔
140
        if err := t.deleteWorkflowHistory(ctx, task, msBuilder); err != nil {
4✔
141
                return err
×
142
        }
×
143

144
        if err := t.deleteWorkflowVisibility(ctx, task); err != nil {
4✔
145
                return err
×
146
        }
×
147

148
        if err := t.deleteCurrentWorkflowExecution(ctx, task); err != nil {
4✔
149
                return err
×
150
        }
×
151

152
        // it must be the last one due to the nature of workflow execution deletion
153
        if err := t.deleteWorkflowExecution(ctx, task); err != nil {
4✔
154
                return err
×
155
        }
×
156

157
        // calling clear here to force accesses of mutable state to read database
158
        // if this is not called then callers will get mutable state even though its been removed from database
159
        context.Clear()
4✔
160
        return nil
4✔
161
}
162

163
func (t *timerTaskExecutorBase) archiveWorkflow(
164
        ctx context.Context,
165
        task *persistence.TimerTaskInfo,
166
        workflowContext execution.Context,
167
        msBuilder execution.MutableState,
168
        domainCacheEntry *cache.DomainCacheEntry,
169
) error {
53✔
170
        branchToken, err := msBuilder.GetCurrentBranchToken()
53✔
171
        if err != nil {
53✔
172
                return err
×
173
        }
×
174
        closeFailoverVersion, err := msBuilder.GetLastWriteVersion()
53✔
175
        if err != nil {
53✔
176
                return err
×
177
        }
×
178

179
        req := &archiver.ClientRequest{
53✔
180
                ArchiveRequest: &archiver.ArchiveRequest{
53✔
181
                        DomainID:             task.DomainID,
53✔
182
                        WorkflowID:           task.WorkflowID,
53✔
183
                        RunID:                task.RunID,
53✔
184
                        DomainName:           domainCacheEntry.GetInfo().Name,
53✔
185
                        ShardID:              t.shard.GetShardID(),
53✔
186
                        Targets:              []archiver.ArchivalTarget{archiver.ArchiveTargetHistory},
53✔
187
                        URI:                  domainCacheEntry.GetConfig().HistoryArchivalURI,
53✔
188
                        NextEventID:          msBuilder.GetNextEventID(),
53✔
189
                        BranchToken:          branchToken,
53✔
190
                        CloseFailoverVersion: closeFailoverVersion,
53✔
191
                },
53✔
192
                CallerService:        service.History,
53✔
193
                AttemptArchiveInline: false, // archive in workflow by default
53✔
194
        }
53✔
195
        executionStats, err := workflowContext.LoadExecutionStats(ctx)
53✔
196
        if err == nil && executionStats.HistorySize < int64(t.config.TimerProcessorHistoryArchivalSizeLimit()) {
102✔
197
                req.AttemptArchiveInline = true
49✔
198
        }
49✔
199

200
        archiveCtx, cancel := context.WithTimeout(ctx, t.config.TimerProcessorArchivalTimeLimit())
53✔
201
        defer cancel()
53✔
202
        resp, err := t.archiverClient.Archive(archiveCtx, req)
53✔
203
        if err != nil {
54✔
204
                return err
1✔
205
        }
1✔
206

207
        // delete workflow history if history archival is not needed or history as been archived inline
208
        if resp.HistoryArchivedInline {
100✔
209
                t.metricsClient.IncCounter(metrics.HistoryProcessDeleteHistoryEventScope, metrics.WorkflowCleanupDeleteHistoryInlineCount)
48✔
210
                if err := t.deleteWorkflowHistory(ctx, task, msBuilder); err != nil {
48✔
211
                        return err
×
212
                }
×
213
        }
214
        // delete visibility record here regardless if it's been archived inline or not
215
        // since the entire record is included as part of the archive request.
216
        if err := t.deleteWorkflowVisibility(ctx, task); err != nil {
52✔
217
                return err
×
218
        }
×
219

220
        if err := t.deleteCurrentWorkflowExecution(ctx, task); err != nil {
52✔
221
                return err
×
222
        }
×
223
        if err := t.deleteWorkflowExecution(ctx, task); err != nil {
52✔
224
                return err
×
225
        }
×
226
        // calling clear here to force accesses of mutable state to read database
227
        // if this is not called then callers will get mutable state even though its been removed from database
228
        workflowContext.Clear()
52✔
229
        return nil
52✔
230
}
231

232
func (t *timerTaskExecutorBase) deleteWorkflowExecution(
233
        ctx context.Context,
234
        task *persistence.TimerTaskInfo,
235
) error {
56✔
236
        domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
56✔
237
        if err != nil {
56✔
238
                return err
×
239
        }
×
240
        op := func() error {
112✔
241
                return t.shard.GetExecutionManager().DeleteWorkflowExecution(ctx, &persistence.DeleteWorkflowExecutionRequest{
56✔
242
                        DomainID:   task.DomainID,
56✔
243
                        WorkflowID: task.WorkflowID,
56✔
244
                        RunID:      task.RunID,
56✔
245
                        DomainName: domainName,
56✔
246
                })
56✔
247
        }
56✔
248
        return t.throttleRetry.Do(ctx, op)
56✔
249
}
250

251
func (t *timerTaskExecutorBase) deleteCurrentWorkflowExecution(
252
        ctx context.Context,
253
        task *persistence.TimerTaskInfo,
254
) error {
56✔
255
        domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
56✔
256
        if err != nil {
56✔
257
                return err
×
258
        }
×
259
        op := func() error {
112✔
260
                return t.shard.GetExecutionManager().DeleteCurrentWorkflowExecution(ctx, &persistence.DeleteCurrentWorkflowExecutionRequest{
56✔
261
                        DomainID:   task.DomainID,
56✔
262
                        WorkflowID: task.WorkflowID,
56✔
263
                        RunID:      task.RunID,
56✔
264
                        DomainName: domainName,
56✔
265
                })
56✔
266
        }
56✔
267
        return t.throttleRetry.Do(ctx, op)
56✔
268
}
269

270
func (t *timerTaskExecutorBase) deleteWorkflowHistory(
271
        ctx context.Context,
272
        task *persistence.TimerTaskInfo,
273
        msBuilder execution.MutableState,
274
) error {
52✔
275

52✔
276
        op := func() error {
104✔
277
                branchToken, err := msBuilder.GetCurrentBranchToken()
52✔
278
                if err != nil {
52✔
279
                        return err
×
280
                }
×
281
                domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
52✔
282
                if err != nil {
52✔
283
                        return err
×
284
                }
×
285
                return t.shard.GetHistoryManager().DeleteHistoryBranch(ctx, &persistence.DeleteHistoryBranchRequest{
52✔
286
                        BranchToken: branchToken,
52✔
287
                        ShardID:     common.IntPtr(t.shard.GetShardID()),
52✔
288
                        DomainName:  domainName,
52✔
289
                })
52✔
290

291
        }
292
        return t.throttleRetry.Do(ctx, op)
52✔
293
}
294

295
func (t *timerTaskExecutorBase) deleteWorkflowVisibility(
296
        ctx context.Context,
297
        task *persistence.TimerTaskInfo,
298
) error {
56✔
299

56✔
300
        domain, errorDomainName := t.shard.GetDomainCache().GetDomainName(task.DomainID)
56✔
301
        if errorDomainName != nil {
56✔
302
                return errorDomainName
×
303
        }
×
304
        op := func() error {
112✔
305
                request := &persistence.VisibilityDeleteWorkflowExecutionRequest{
56✔
306
                        DomainID:   task.DomainID,
56✔
307
                        Domain:     domain,
56✔
308
                        WorkflowID: task.WorkflowID,
56✔
309
                        RunID:      task.RunID,
56✔
310
                        TaskID:     task.TaskID,
56✔
311
                }
56✔
312
                // TODO: expose GetVisibilityManager method on shardContext interface
56✔
313
                return t.shard.GetService().GetVisibilityManager().DeleteWorkflowExecution(ctx, request) // delete from db
56✔
314
        }
56✔
315
        return t.throttleRetry.Do(ctx, op)
56✔
316
}
317

318
func (t *timerTaskExecutorBase) Stop() {
150✔
319
        t.logger.Info("Stopping timerTaskExecutorBase")
150✔
320
        t.cancelFn()
150✔
321
}
150✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc