• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0188c593-095e-4f5b-bb3b-5a4ee91fa38c

16 Jun 2023 06:58PM UTC coverage: 57.257% (+0.05%) from 57.204%
0188c593-095e-4f5b-bb3b-5a4ee91fa38c

push

buildkite

web-flow
Set 12.4 version for postgres containers (#5326)

87086 of 152096 relevant lines covered (57.26%)

2485.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.06
/common/task/parallelTaskProcessor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/uber/cadence/common/log/tag"
32

33
        "github.com/uber/cadence/common"
34
        "github.com/uber/cadence/common/backoff"
35
        "github.com/uber/cadence/common/dynamicconfig"
36
        "github.com/uber/cadence/common/log"
37
        "github.com/uber/cadence/common/metrics"
38
)
39

40
type (
41
        // ParallelTaskProcessorOptions configs PriorityTaskProcessor
42
        ParallelTaskProcessorOptions struct {
43
                QueueSize   int
44
                WorkerCount dynamicconfig.IntPropertyFn
45
                RetryPolicy backoff.RetryPolicy
46
        }
47

48
        parallelTaskProcessorImpl struct {
49
                status           int32
50
                tasksCh          chan Task
51
                shutdownCh       chan struct{}
52
                workerShutdownCh []chan struct{}
53
                shutdownWG       sync.WaitGroup
54
                logger           log.Logger
55
                metricsScope     metrics.Scope
56
                options          *ParallelTaskProcessorOptions
57
        }
58
)
59

60
const (
61
        defaultMonitorTickerDuration = 5 * time.Second
62
)
63

64
var (
65
        // ErrTaskProcessorClosed is the error returned when submiting task to a stopped processor
66
        ErrTaskProcessorClosed = errors.New("task processor has already shutdown")
67
)
68

69
// NewParallelTaskProcessor creates a new PriorityTaskProcessor
70
func NewParallelTaskProcessor(
71
        logger log.Logger,
72
        metricsClient metrics.Client,
73
        options *ParallelTaskProcessorOptions,
74
) Processor {
39✔
75
        return &parallelTaskProcessorImpl{
39✔
76
                status:           common.DaemonStatusInitialized,
39✔
77
                tasksCh:          make(chan Task, options.QueueSize),
39✔
78
                shutdownCh:       make(chan struct{}),
39✔
79
                workerShutdownCh: make([]chan struct{}, 0, options.WorkerCount()),
39✔
80
                logger:           logger,
39✔
81
                metricsScope:     metricsClient.Scope(metrics.ParallelTaskProcessingScope),
39✔
82
                options:          options,
39✔
83
        }
39✔
84
}
39✔
85

86
func (p *parallelTaskProcessorImpl) Start() {
21✔
87
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
21✔
88
                return
×
89
        }
×
90

91
        initialWorkerCount := p.options.WorkerCount()
21✔
92

21✔
93
        p.shutdownWG.Add(initialWorkerCount)
21✔
94
        for i := 0; i < initialWorkerCount; i++ {
2,439✔
95
                shutdownCh := make(chan struct{})
2,418✔
96
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
2,418✔
97
                go p.taskWorker(shutdownCh)
2,418✔
98
        }
2,418✔
99

100
        p.shutdownWG.Add(1)
21✔
101
        go p.workerMonitor(defaultMonitorTickerDuration)
21✔
102

21✔
103
        p.logger.Info("Parallel task processor started.")
21✔
104
}
105

106
func (p *parallelTaskProcessorImpl) Stop() {
21✔
107
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
21✔
108
                return
×
109
        }
×
110

111
        close(p.shutdownCh)
21✔
112

21✔
113
        p.drainAndNackTasks()
21✔
114

21✔
115
        if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
21✔
116
                p.logger.Warn("Parallel task processor timedout on shutdown.")
×
117
        }
×
118
        p.logger.Info("Parallel task processor shutdown.")
21✔
119
}
120

121
func (p *parallelTaskProcessorImpl) Submit(task Task) error {
19,018✔
122
        p.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
19,018✔
123
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
19,018✔
124
        defer sw.Stop()
19,018✔
125

19,018✔
126
        if p.isStopped() {
28,191✔
127
                return ErrTaskProcessorClosed
9,173✔
128
        }
9,173✔
129

130
        select {
9,845✔
131
        case p.tasksCh <- task:
9,842✔
132
                if p.isStopped() {
9,842✔
133
                        p.drainAndNackTasks()
×
134
                }
×
135
                return nil
9,842✔
136
        case <-p.shutdownCh:
3✔
137
                return ErrTaskProcessorClosed
3✔
138
        }
139
}
140

141
func (p *parallelTaskProcessorImpl) taskWorker(shutdownCh chan struct{}) {
2,444✔
142
        defer p.shutdownWG.Done()
2,444✔
143

2,444✔
144
        for {
14,642✔
145
                select {
12,198✔
146
                case <-shutdownCh:
2,444✔
147
                        return
2,444✔
148
                case task := <-p.tasksCh:
9,757✔
149
                        p.executeTask(task, shutdownCh)
9,757✔
150
                }
151
        }
152
}
153

154
func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struct{}) {
9,761✔
155
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskTaskProcessingLatency)
9,761✔
156
        defer sw.Stop()
9,761✔
157

9,761✔
158
        defer func() {
19,522✔
159
                if r := recover(); r != nil {
9,762✔
160
                        p.logger.Error("recovered panic in task execution", tag.Dynamic("recovered-panic", r))
1✔
161
                        task.HandleErr(fmt.Errorf("recovered panic: %v", r))
1✔
162
                        task.Nack()
1✔
163
                }
1✔
164
        }()
165

166
        op := func() error {
19,621✔
167
                if err := task.Execute(); err != nil {
9,994✔
168
                        return task.HandleErr(err)
134✔
169
                }
134✔
170
                return nil
9,728✔
171
        }
172

173
        isRetryable := func(err error) bool {
9,895✔
174
                select {
134✔
175
                case <-shutdownCh:
1✔
176
                        return false
1✔
177
                default:
133✔
178
                }
179

180
                return task.RetryErr(err)
133✔
181
        }
182
        throttleRetry := backoff.NewThrottleRetry(
9,761✔
183
                backoff.WithRetryPolicy(p.options.RetryPolicy),
9,761✔
184
                backoff.WithRetryableError(isRetryable),
9,761✔
185
        )
9,761✔
186

9,761✔
187
        if err := throttleRetry.Do(context.Background(), op); err != nil {
9,796✔
188
                // non-retryable error or exhausted all retries or worker shutdown
35✔
189
                task.Nack()
35✔
190
                return
35✔
191
        }
35✔
192

193
        // no error
194
        task.Ack()
9,728✔
195
}
196

197
func (p *parallelTaskProcessorImpl) workerMonitor(tickerDuration time.Duration) {
22✔
198
        defer p.shutdownWG.Done()
22✔
199

22✔
200
        ticker := time.NewTicker(tickerDuration)
22✔
201

22✔
202
        for {
453✔
203
                select {
431✔
204
                case <-p.shutdownCh:
22✔
205
                        ticker.Stop()
22✔
206
                        p.removeWorker(len(p.workerShutdownCh))
22✔
207
                        return
22✔
208
                case <-ticker.C:
412✔
209
                        targetWorkerCount := p.options.WorkerCount()
412✔
210
                        currentWorkerCount := len(p.workerShutdownCh)
412✔
211
                        p.addWorker(targetWorkerCount - currentWorkerCount)
412✔
212
                        p.removeWorker(currentWorkerCount - targetWorkerCount)
412✔
213
                }
214
        }
215
}
216

217
func (p *parallelTaskProcessorImpl) addWorker(count int) {
415✔
218
        for i := 0; i < count; i++ {
440✔
219
                shutdownCh := make(chan struct{})
25✔
220
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
25✔
221

25✔
222
                p.shutdownWG.Add(1)
25✔
223
                go p.taskWorker(shutdownCh)
25✔
224
        }
25✔
225
}
226

227
func (p *parallelTaskProcessorImpl) removeWorker(count int) {
433✔
228
        if count <= 0 {
845✔
229
                return
412✔
230
        }
412✔
231

232
        currentWorkerCount := len(p.workerShutdownCh)
24✔
233
        if count > currentWorkerCount {
24✔
234
                count = currentWorkerCount
×
235
        }
×
236

237
        shutdownChToClose := p.workerShutdownCh[currentWorkerCount-count:]
24✔
238
        p.workerShutdownCh = p.workerShutdownCh[:currentWorkerCount-count]
24✔
239

24✔
240
        for _, shutdownCh := range shutdownChToClose {
2,451✔
241
                close(shutdownCh)
2,427✔
242
        }
2,427✔
243
}
244

245
func (p *parallelTaskProcessorImpl) drainAndNackTasks() {
21✔
246
        for {
127✔
247
                select {
106✔
248
                case task := <-p.tasksCh:
85✔
249
                        task.Nack()
85✔
250
                default:
21✔
251
                        return
21✔
252
                }
253
        }
254
}
255

256
func (p *parallelTaskProcessorImpl) isStopped() bool {
28,857✔
257
        return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped
28,857✔
258
}
28,857✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc