• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e53aa-bc8b-40e2-9d7c-012f8da56b45

18 Mar 2024 10:24PM UTC coverage: 64.926% (-0.005%) from 64.931%
018e53aa-bc8b-40e2-9d7c-012f8da56b45

push

buildkite

web-flow
Fix checksum validation for SQL implementation (#5790)

What changed?
Add a check for the SQL implementation of GetWorkflowExecution operation to exclude false positive checksum validation failure cases.

Why?
To make sure the checksum validation result is true, the data we read from GetWorkflowExecution operation are from a consistent view. In the NoSQL implementation, the operation is a single read, so the data is from a consistent view. However, in the SQL implementation, the operation is multiple reads from different table. If there is a concurrent update, the data we read isn't from a consistent view and the checksum validation could fail. Normally, we don't have concurrent updates with reads. But when the shard ownership changed, it might not be the case.

24 of 26 new or added lines in 5 files covered. (92.31%)

66 existing lines in 14 files now uncovered.

94774 of 145972 relevant lines covered (64.93%)

2395.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.06
/common/task/parallel_task_processor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/uber/cadence/common"
32
        "github.com/uber/cadence/common/backoff"
33
        "github.com/uber/cadence/common/dynamicconfig"
34
        "github.com/uber/cadence/common/log"
35
        "github.com/uber/cadence/common/log/tag"
36
        "github.com/uber/cadence/common/metrics"
37
)
38

39
type (
40
        // ParallelTaskProcessorOptions configs PriorityTaskProcessor
41
        ParallelTaskProcessorOptions struct {
42
                QueueSize   int
43
                WorkerCount dynamicconfig.IntPropertyFn
44
                RetryPolicy backoff.RetryPolicy
45
        }
46

47
        parallelTaskProcessorImpl struct {
48
                status           int32
49
                tasksCh          chan Task
50
                shutdownCh       chan struct{}
51
                workerShutdownCh []chan struct{}
52
                shutdownWG       sync.WaitGroup
53
                logger           log.Logger
54
                metricsScope     metrics.Scope
55
                options          *ParallelTaskProcessorOptions
56
        }
57
)
58

59
const (
60
        defaultMonitorTickerDuration = 5 * time.Second
61
)
62

63
var (
64
        // ErrTaskProcessorClosed is the error returned when submiting task to a stopped processor
65
        ErrTaskProcessorClosed = errors.New("task processor has already shutdown")
66
)
67

68
// NewParallelTaskProcessor creates a new PriorityTaskProcessor
69
func NewParallelTaskProcessor(
70
        logger log.Logger,
71
        metricsClient metrics.Client,
72
        options *ParallelTaskProcessorOptions,
73
) Processor {
39✔
74
        return &parallelTaskProcessorImpl{
39✔
75
                status:           common.DaemonStatusInitialized,
39✔
76
                tasksCh:          make(chan Task, options.QueueSize),
39✔
77
                shutdownCh:       make(chan struct{}),
39✔
78
                workerShutdownCh: make([]chan struct{}, 0, options.WorkerCount()),
39✔
79
                logger:           logger,
39✔
80
                metricsScope:     metricsClient.Scope(metrics.ParallelTaskProcessingScope),
39✔
81
                options:          options,
39✔
82
        }
39✔
83
}
39✔
84

85
func (p *parallelTaskProcessorImpl) Start() {
21✔
86
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
21✔
87
                return
×
88
        }
×
89

90
        initialWorkerCount := p.options.WorkerCount()
21✔
91

21✔
92
        p.shutdownWG.Add(initialWorkerCount)
21✔
93
        for i := 0; i < initialWorkerCount; i++ {
2,439✔
94
                shutdownCh := make(chan struct{})
2,418✔
95
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
2,418✔
96
                go p.taskWorker(shutdownCh)
2,418✔
97
        }
2,418✔
98

99
        p.shutdownWG.Add(1)
21✔
100
        go p.workerMonitor(defaultMonitorTickerDuration)
21✔
101

21✔
102
        p.logger.Info("Parallel task processor started.")
21✔
103
}
104

105
func (p *parallelTaskProcessorImpl) Stop() {
21✔
106
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
21✔
107
                return
×
108
        }
×
109

110
        close(p.shutdownCh)
21✔
111

21✔
112
        p.drainAndNackTasks()
21✔
113

21✔
114
        if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
21✔
115
                p.logger.Warn("Parallel task processor timedout on shutdown.")
×
116
        }
×
117
        p.logger.Info("Parallel task processor shutdown.")
21✔
118
}
119

120
func (p *parallelTaskProcessorImpl) Submit(task Task) error {
18,586✔
121
        p.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
18,586✔
122
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
18,586✔
123
        defer sw.Stop()
18,586✔
124

18,586✔
125
        if p.isStopped() {
26,960✔
126
                return ErrTaskProcessorClosed
8,374✔
127
        }
8,374✔
128

129
        select {
10,212✔
130
        case p.tasksCh <- task:
10,210✔
131
                if p.isStopped() {
10,210✔
UNCOV
132
                        p.drainAndNackTasks()
×
UNCOV
133
                }
×
134
                return nil
10,210✔
135
        case <-p.shutdownCh:
2✔
136
                return ErrTaskProcessorClosed
2✔
137
        }
138
}
139

140
func (p *parallelTaskProcessorImpl) taskWorker(shutdownCh chan struct{}) {
2,444✔
141
        defer p.shutdownWG.Done()
2,444✔
142

2,444✔
143
        for {
15,001✔
144
                select {
12,557✔
145
                case <-shutdownCh:
2,444✔
146
                        return
2,444✔
147
                case task := <-p.tasksCh:
10,116✔
148
                        p.executeTask(task, shutdownCh)
10,116✔
149
                }
150
        }
151
}
152

153
func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struct{}) {
10,120✔
154
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskTaskProcessingLatency)
10,120✔
155
        defer sw.Stop()
10,120✔
156

10,120✔
157
        defer func() {
20,240✔
158
                if r := recover(); r != nil {
10,121✔
159
                        p.logger.Error("recovered panic in task execution", tag.Dynamic("recovered-panic", r))
1✔
160
                        task.HandleErr(fmt.Errorf("recovered panic: %v", r))
1✔
161
                        task.Nack()
1✔
162
                }
1✔
163
        }()
164

165
        op := func() error {
20,339✔
166
                if err := task.Execute(); err != nil {
10,353✔
167
                        return task.HandleErr(err)
134✔
168
                }
134✔
169
                return nil
10,087✔
170
        }
171

172
        isRetryable := func(err error) bool {
10,254✔
173
                select {
134✔
174
                case <-shutdownCh:
1✔
175
                        return false
1✔
176
                default:
133✔
177
                }
178

179
                return task.RetryErr(err)
133✔
180
        }
181
        throttleRetry := backoff.NewThrottleRetry(
10,120✔
182
                backoff.WithRetryPolicy(p.options.RetryPolicy),
10,120✔
183
                backoff.WithRetryableError(isRetryable),
10,120✔
184
        )
10,120✔
185

10,120✔
186
        if err := throttleRetry.Do(context.Background(), op); err != nil {
10,155✔
187
                // non-retryable error or exhausted all retries or worker shutdown
35✔
188
                task.Nack()
35✔
189
                return
35✔
190
        }
35✔
191

192
        // no error
193
        task.Ack()
10,087✔
194
}
195

196
func (p *parallelTaskProcessorImpl) workerMonitor(tickerDuration time.Duration) {
22✔
197
        defer p.shutdownWG.Done()
22✔
198

22✔
199
        ticker := time.NewTicker(tickerDuration)
22✔
200

22✔
201
        for {
387✔
202
                select {
365✔
203
                case <-p.shutdownCh:
22✔
204
                        ticker.Stop()
22✔
205
                        p.removeWorker(len(p.workerShutdownCh))
22✔
206
                        return
22✔
207
                case <-ticker.C:
346✔
208
                        targetWorkerCount := p.options.WorkerCount()
346✔
209
                        currentWorkerCount := len(p.workerShutdownCh)
346✔
210
                        p.addWorker(targetWorkerCount - currentWorkerCount)
346✔
211
                        p.removeWorker(currentWorkerCount - targetWorkerCount)
346✔
212
                }
213
        }
214
}
215

216
func (p *parallelTaskProcessorImpl) addWorker(count int) {
349✔
217
        for i := 0; i < count; i++ {
374✔
218
                shutdownCh := make(chan struct{})
25✔
219
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
25✔
220

25✔
221
                p.shutdownWG.Add(1)
25✔
222
                go p.taskWorker(shutdownCh)
25✔
223
        }
25✔
224
}
225

226
func (p *parallelTaskProcessorImpl) removeWorker(count int) {
367✔
227
        if count <= 0 {
713✔
228
                return
346✔
229
        }
346✔
230

231
        currentWorkerCount := len(p.workerShutdownCh)
24✔
232
        if count > currentWorkerCount {
24✔
233
                count = currentWorkerCount
×
234
        }
×
235

236
        shutdownChToClose := p.workerShutdownCh[currentWorkerCount-count:]
24✔
237
        p.workerShutdownCh = p.workerShutdownCh[:currentWorkerCount-count]
24✔
238

24✔
239
        for _, shutdownCh := range shutdownChToClose {
2,451✔
240
                close(shutdownCh)
2,427✔
241
        }
2,427✔
242
}
243

244
func (p *parallelTaskProcessorImpl) drainAndNackTasks() {
21✔
245
        for {
136✔
246
                select {
115✔
247
                case task := <-p.tasksCh:
94✔
248
                        task.Nack()
94✔
249
                default:
21✔
250
                        return
21✔
251
                }
252
        }
253
}
254

255
func (p *parallelTaskProcessorImpl) isStopped() bool {
28,793✔
256
        return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped
28,793✔
257
}
28,793✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc