• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01896606-c2a7-4c89-817d-e6d1dc3a5a18

17 Jul 2023 10:44PM UTC coverage: 57.021% (-0.2%) from 57.249%
01896606-c2a7-4c89-817d-e6d1dc3a5a18

push

buildkite

web-flow
Merge master (#5351)

* Add Opensearch2 client with bulk API shared between clients (#5241)

* Allow to configure HTTP settings using template (#5329)

* CDNC-3181 Cleanup the unused watchdog code (#5096)

* Removed the Watchdog code and it's service calls

* Removed watchdog occurences and dependencies

---------

Co-authored-by: David Porter <david.porter@uber.com>

* Upgrade yarpc to v1.70.3 (#5341)

* upgrade mysql (#5345)

* upgrade mysql

* upgrade mysql version into 8.0 in docker files

* update all 5.7 to 8.0

* [dynamic config] add Filters method to dynamic config Key (#5346)

What changed?

Add Filters method to Key interface
Add implementations on most keys by parsing the comments on keys (assuming they are correct)
Why?

This is needed to know what dynamic config is domain specific. And this could possible simplify the collection struct by consolidating all GetPropertyFilterBy** methods.

How did you test it?

Potential risks

no risk since the method will be read only in non-critical path

Release notes

Documentation Changes

---------

Co-authored-by: Mantas Å idlauskas <sidlauskas.mantas@gmail.com>
Co-authored-by: agautam478 <72432016+agautam478@users.noreply.github.com>
Co-authored-by: David Porter <david.porter@uber.com>
Co-authored-by: Mantas Å idlauskas <mantass@netapp.com>
Co-authored-by: bowen xiao <xbowen@uber.com>

799 of 799 new or added lines in 17 files covered. (100.0%)

87154 of 152845 relevant lines covered (57.02%)

2484.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.06
/common/task/parallelTaskProcessor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/uber/cadence/common/log/tag"
32

33
        "github.com/uber/cadence/common"
34
        "github.com/uber/cadence/common/backoff"
35
        "github.com/uber/cadence/common/dynamicconfig"
36
        "github.com/uber/cadence/common/log"
37
        "github.com/uber/cadence/common/metrics"
38
)
39

40
type (
41
        // ParallelTaskProcessorOptions configs PriorityTaskProcessor
42
        ParallelTaskProcessorOptions struct {
43
                QueueSize   int
44
                WorkerCount dynamicconfig.IntPropertyFn
45
                RetryPolicy backoff.RetryPolicy
46
        }
47

48
        parallelTaskProcessorImpl struct {
49
                status           int32
50
                tasksCh          chan Task
51
                shutdownCh       chan struct{}
52
                workerShutdownCh []chan struct{}
53
                shutdownWG       sync.WaitGroup
54
                logger           log.Logger
55
                metricsScope     metrics.Scope
56
                options          *ParallelTaskProcessorOptions
57
        }
58
)
59

60
const (
61
        defaultMonitorTickerDuration = 5 * time.Second
62
)
63

64
var (
65
        // ErrTaskProcessorClosed is the error returned when submiting task to a stopped processor
66
        ErrTaskProcessorClosed = errors.New("task processor has already shutdown")
67
)
68

69
// NewParallelTaskProcessor creates a new PriorityTaskProcessor
70
func NewParallelTaskProcessor(
71
        logger log.Logger,
72
        metricsClient metrics.Client,
73
        options *ParallelTaskProcessorOptions,
74
) Processor {
39✔
75
        return &parallelTaskProcessorImpl{
39✔
76
                status:           common.DaemonStatusInitialized,
39✔
77
                tasksCh:          make(chan Task, options.QueueSize),
39✔
78
                shutdownCh:       make(chan struct{}),
39✔
79
                workerShutdownCh: make([]chan struct{}, 0, options.WorkerCount()),
39✔
80
                logger:           logger,
39✔
81
                metricsScope:     metricsClient.Scope(metrics.ParallelTaskProcessingScope),
39✔
82
                options:          options,
39✔
83
        }
39✔
84
}
39✔
85

86
func (p *parallelTaskProcessorImpl) Start() {
21✔
87
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
21✔
88
                return
×
89
        }
×
90

91
        initialWorkerCount := p.options.WorkerCount()
21✔
92

21✔
93
        p.shutdownWG.Add(initialWorkerCount)
21✔
94
        for i := 0; i < initialWorkerCount; i++ {
2,439✔
95
                shutdownCh := make(chan struct{})
2,418✔
96
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
2,418✔
97
                go p.taskWorker(shutdownCh)
2,418✔
98
        }
2,418✔
99

100
        p.shutdownWG.Add(1)
21✔
101
        go p.workerMonitor(defaultMonitorTickerDuration)
21✔
102

21✔
103
        p.logger.Info("Parallel task processor started.")
21✔
104
}
105

106
func (p *parallelTaskProcessorImpl) Stop() {
21✔
107
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
21✔
108
                return
×
109
        }
×
110

111
        close(p.shutdownCh)
21✔
112

21✔
113
        p.drainAndNackTasks()
21✔
114

21✔
115
        if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
21✔
116
                p.logger.Warn("Parallel task processor timedout on shutdown.")
×
117
        }
×
118
        p.logger.Info("Parallel task processor shutdown.")
21✔
119
}
120

121
func (p *parallelTaskProcessorImpl) Submit(task Task) error {
18,955✔
122
        p.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
18,955✔
123
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
18,955✔
124
        defer sw.Stop()
18,955✔
125

18,955✔
126
        if p.isStopped() {
28,154✔
127
                return ErrTaskProcessorClosed
9,199✔
128
        }
9,199✔
129

130
        select {
9,756✔
131
        case p.tasksCh <- task:
9,753✔
132
                if p.isStopped() {
9,753✔
133
                        p.drainAndNackTasks()
×
134
                }
×
135
                return nil
9,753✔
136
        case <-p.shutdownCh:
3✔
137
                return ErrTaskProcessorClosed
3✔
138
        }
139
}
140

141
func (p *parallelTaskProcessorImpl) taskWorker(shutdownCh chan struct{}) {
2,444✔
142
        defer p.shutdownWG.Done()
2,444✔
143

2,444✔
144
        for {
14,546✔
145
                select {
12,102✔
146
                case <-shutdownCh:
2,444✔
147
                        return
2,444✔
148
                case task := <-p.tasksCh:
9,661✔
149
                        p.executeTask(task, shutdownCh)
9,661✔
150
                }
151
        }
152
}
153

154
func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struct{}) {
9,665✔
155
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskTaskProcessingLatency)
9,665✔
156
        defer sw.Stop()
9,665✔
157

9,665✔
158
        defer func() {
19,330✔
159
                if r := recover(); r != nil {
9,666✔
160
                        p.logger.Error("recovered panic in task execution", tag.Dynamic("recovered-panic", r))
1✔
161
                        task.HandleErr(fmt.Errorf("recovered panic: %v", r))
1✔
162
                        task.Nack()
1✔
163
                }
1✔
164
        }()
165

166
        op := func() error {
19,433✔
167
                if err := task.Execute(); err != nil {
9,908✔
168
                        return task.HandleErr(err)
140✔
169
                }
140✔
170
                return nil
9,630✔
171
        }
172

173
        isRetryable := func(err error) bool {
9,805✔
174
                select {
140✔
175
                case <-shutdownCh:
3✔
176
                        return false
3✔
177
                default:
137✔
178
                }
179

180
                return task.RetryErr(err)
137✔
181
        }
182
        throttleRetry := backoff.NewThrottleRetry(
9,665✔
183
                backoff.WithRetryPolicy(p.options.RetryPolicy),
9,665✔
184
                backoff.WithRetryableError(isRetryable),
9,665✔
185
        )
9,665✔
186

9,665✔
187
        if err := throttleRetry.Do(context.Background(), op); err != nil {
9,702✔
188
                // non-retryable error or exhausted all retries or worker shutdown
37✔
189
                task.Nack()
37✔
190
                return
37✔
191
        }
37✔
192

193
        // no error
194
        task.Ack()
9,630✔
195
}
196

197
func (p *parallelTaskProcessorImpl) workerMonitor(tickerDuration time.Duration) {
22✔
198
        defer p.shutdownWG.Done()
22✔
199

22✔
200
        ticker := time.NewTicker(tickerDuration)
22✔
201

22✔
202
        for {
454✔
203
                select {
432✔
204
                case <-p.shutdownCh:
22✔
205
                        ticker.Stop()
22✔
206
                        p.removeWorker(len(p.workerShutdownCh))
22✔
207
                        return
22✔
208
                case <-ticker.C:
413✔
209
                        targetWorkerCount := p.options.WorkerCount()
413✔
210
                        currentWorkerCount := len(p.workerShutdownCh)
413✔
211
                        p.addWorker(targetWorkerCount - currentWorkerCount)
413✔
212
                        p.removeWorker(currentWorkerCount - targetWorkerCount)
413✔
213
                }
214
        }
215
}
216

217
func (p *parallelTaskProcessorImpl) addWorker(count int) {
416✔
218
        for i := 0; i < count; i++ {
441✔
219
                shutdownCh := make(chan struct{})
25✔
220
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
25✔
221

25✔
222
                p.shutdownWG.Add(1)
25✔
223
                go p.taskWorker(shutdownCh)
25✔
224
        }
25✔
225
}
226

227
func (p *parallelTaskProcessorImpl) removeWorker(count int) {
434✔
228
        if count <= 0 {
847✔
229
                return
413✔
230
        }
413✔
231

232
        currentWorkerCount := len(p.workerShutdownCh)
24✔
233
        if count > currentWorkerCount {
24✔
234
                count = currentWorkerCount
×
235
        }
×
236

237
        shutdownChToClose := p.workerShutdownCh[currentWorkerCount-count:]
24✔
238
        p.workerShutdownCh = p.workerShutdownCh[:currentWorkerCount-count]
24✔
239

24✔
240
        for _, shutdownCh := range shutdownChToClose {
2,451✔
241
                close(shutdownCh)
2,427✔
242
        }
2,427✔
243
}
244

245
func (p *parallelTaskProcessorImpl) drainAndNackTasks() {
21✔
246
        for {
134✔
247
                select {
113✔
248
                case task := <-p.tasksCh:
92✔
249
                        task.Nack()
92✔
250
                default:
21✔
251
                        return
21✔
252
                }
253
        }
254
}
255

256
func (p *parallelTaskProcessorImpl) isStopped() bool {
28,705✔
257
        return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped
28,705✔
258
}
28,705✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc