• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018e7d9e-6de5-4965-9d8c-615d282ceaf9

27 Mar 2024 01:55AM UTC coverage: 65.246% (-0.03%) from 65.279%
018e7d9e-6de5-4965-9d8c-615d282ceaf9

Pull #5822

buildkite

bowenxia
check time type in custom search attribute
Pull Request #5822: check time type in custom search attribute

5 of 7 new or added lines in 1 file covered. (71.43%)

77 existing lines in 10 files now uncovered.

95369 of 146169 relevant lines covered (65.25%)

2380.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.06
/common/task/parallel_task_processor.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "sync"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/uber/cadence/common"
32
        "github.com/uber/cadence/common/backoff"
33
        "github.com/uber/cadence/common/dynamicconfig"
34
        "github.com/uber/cadence/common/log"
35
        "github.com/uber/cadence/common/log/tag"
36
        "github.com/uber/cadence/common/metrics"
37
)
38

39
type (
40
        // ParallelTaskProcessorOptions configs PriorityTaskProcessor
41
        ParallelTaskProcessorOptions struct {
42
                QueueSize   int
43
                WorkerCount dynamicconfig.IntPropertyFn
44
                RetryPolicy backoff.RetryPolicy
45
        }
46

47
        parallelTaskProcessorImpl struct {
48
                status           int32
49
                tasksCh          chan Task
50
                shutdownCh       chan struct{}
51
                workerShutdownCh []chan struct{}
52
                shutdownWG       sync.WaitGroup
53
                logger           log.Logger
54
                metricsScope     metrics.Scope
55
                options          *ParallelTaskProcessorOptions
56
        }
57
)
58

59
const (
60
        defaultMonitorTickerDuration = 5 * time.Second
61
)
62

63
var (
64
        // ErrTaskProcessorClosed is the error returned when submiting task to a stopped processor
65
        ErrTaskProcessorClosed = errors.New("task processor has already shutdown")
66
)
67

68
// NewParallelTaskProcessor creates a new PriorityTaskProcessor
69
func NewParallelTaskProcessor(
70
        logger log.Logger,
71
        metricsClient metrics.Client,
72
        options *ParallelTaskProcessorOptions,
73
) Processor {
39✔
74
        return &parallelTaskProcessorImpl{
39✔
75
                status:           common.DaemonStatusInitialized,
39✔
76
                tasksCh:          make(chan Task, options.QueueSize),
39✔
77
                shutdownCh:       make(chan struct{}),
39✔
78
                workerShutdownCh: make([]chan struct{}, 0, options.WorkerCount()),
39✔
79
                logger:           logger,
39✔
80
                metricsScope:     metricsClient.Scope(metrics.ParallelTaskProcessingScope),
39✔
81
                options:          options,
39✔
82
        }
39✔
83
}
39✔
84

85
func (p *parallelTaskProcessorImpl) Start() {
21✔
86
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
21✔
87
                return
×
88
        }
×
89

90
        initialWorkerCount := p.options.WorkerCount()
21✔
91

21✔
92
        p.shutdownWG.Add(initialWorkerCount)
21✔
93
        for i := 0; i < initialWorkerCount; i++ {
2,439✔
94
                shutdownCh := make(chan struct{})
2,418✔
95
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
2,418✔
96
                go p.taskWorker(shutdownCh)
2,418✔
97
        }
2,418✔
98

99
        p.shutdownWG.Add(1)
21✔
100
        go p.workerMonitor(defaultMonitorTickerDuration)
21✔
101

21✔
102
        p.logger.Info("Parallel task processor started.")
21✔
103
}
104

105
func (p *parallelTaskProcessorImpl) Stop() {
21✔
106
        if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
21✔
107
                return
×
108
        }
×
109

110
        close(p.shutdownCh)
21✔
111

21✔
112
        p.drainAndNackTasks()
21✔
113

21✔
114
        if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
21✔
115
                p.logger.Warn("Parallel task processor timedout on shutdown.")
×
116
        }
×
117
        p.logger.Info("Parallel task processor shutdown.")
21✔
118
}
119

120
func (p *parallelTaskProcessorImpl) Submit(task Task) error {
18,857✔
121
        p.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
18,857✔
122
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
18,857✔
123
        defer sw.Stop()
18,857✔
124

18,857✔
125
        if p.isStopped() {
26,492✔
126
                return ErrTaskProcessorClosed
7,635✔
127
        }
7,635✔
128

129
        select {
11,222✔
130
        case p.tasksCh <- task:
11,218✔
131
                if p.isStopped() {
11,218✔
UNCOV
132
                        p.drainAndNackTasks()
×
UNCOV
133
                }
×
134
                return nil
11,218✔
135
        case <-p.shutdownCh:
4✔
136
                return ErrTaskProcessorClosed
4✔
137
        }
138
}
139

140
func (p *parallelTaskProcessorImpl) taskWorker(shutdownCh chan struct{}) {
2,444✔
141
        defer p.shutdownWG.Done()
2,444✔
142

2,444✔
143
        for {
16,009✔
144
                select {
13,565✔
145
                case <-shutdownCh:
2,444✔
146
                        return
2,444✔
147
                case task := <-p.tasksCh:
11,124✔
148
                        p.executeTask(task, shutdownCh)
11,124✔
149
                }
150
        }
151
}
152

153
func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struct{}) {
11,128✔
154
        sw := p.metricsScope.StartTimer(metrics.ParallelTaskTaskProcessingLatency)
11,128✔
155
        defer sw.Stop()
11,128✔
156

11,128✔
157
        defer func() {
22,256✔
158
                if r := recover(); r != nil {
11,129✔
159
                        p.logger.Error("recovered panic in task execution", tag.Dynamic("recovered-panic", r))
1✔
160
                        task.HandleErr(fmt.Errorf("recovered panic: %v", r))
1✔
161
                        task.Nack()
1✔
162
                }
1✔
163
        }()
164

165
        op := func() error {
22,355✔
166
                if err := task.Execute(); err != nil {
11,361✔
167
                        return task.HandleErr(err)
134✔
168
                }
134✔
169
                return nil
11,095✔
170
        }
171

172
        isRetryable := func(err error) bool {
11,262✔
173
                select {
134✔
174
                case <-shutdownCh:
1✔
175
                        return false
1✔
176
                default:
133✔
177
                }
178

179
                return task.RetryErr(err)
133✔
180
        }
181
        throttleRetry := backoff.NewThrottleRetry(
11,128✔
182
                backoff.WithRetryPolicy(p.options.RetryPolicy),
11,128✔
183
                backoff.WithRetryableError(isRetryable),
11,128✔
184
        )
11,128✔
185

11,128✔
186
        if err := throttleRetry.Do(context.Background(), op); err != nil {
11,163✔
187
                // non-retryable error or exhausted all retries or worker shutdown
35✔
188
                task.Nack()
35✔
189
                return
35✔
190
        }
35✔
191

192
        // no error
193
        task.Ack()
11,095✔
194
}
195

196
func (p *parallelTaskProcessorImpl) workerMonitor(tickerDuration time.Duration) {
22✔
197
        defer p.shutdownWG.Done()
22✔
198

22✔
199
        ticker := time.NewTicker(tickerDuration)
22✔
200

22✔
201
        for {
387✔
202
                select {
365✔
203
                case <-p.shutdownCh:
22✔
204
                        ticker.Stop()
22✔
205
                        p.removeWorker(len(p.workerShutdownCh))
22✔
206
                        return
22✔
207
                case <-ticker.C:
346✔
208
                        targetWorkerCount := p.options.WorkerCount()
346✔
209
                        currentWorkerCount := len(p.workerShutdownCh)
346✔
210
                        p.addWorker(targetWorkerCount - currentWorkerCount)
346✔
211
                        p.removeWorker(currentWorkerCount - targetWorkerCount)
346✔
212
                }
213
        }
214
}
215

216
func (p *parallelTaskProcessorImpl) addWorker(count int) {
349✔
217
        for i := 0; i < count; i++ {
374✔
218
                shutdownCh := make(chan struct{})
25✔
219
                p.workerShutdownCh = append(p.workerShutdownCh, shutdownCh)
25✔
220

25✔
221
                p.shutdownWG.Add(1)
25✔
222
                go p.taskWorker(shutdownCh)
25✔
223
        }
25✔
224
}
225

226
func (p *parallelTaskProcessorImpl) removeWorker(count int) {
367✔
227
        if count <= 0 {
713✔
228
                return
346✔
229
        }
346✔
230

231
        currentWorkerCount := len(p.workerShutdownCh)
24✔
232
        if count > currentWorkerCount {
24✔
233
                count = currentWorkerCount
×
234
        }
×
235

236
        shutdownChToClose := p.workerShutdownCh[currentWorkerCount-count:]
24✔
237
        p.workerShutdownCh = p.workerShutdownCh[:currentWorkerCount-count]
24✔
238

24✔
239
        for _, shutdownCh := range shutdownChToClose {
2,451✔
240
                close(shutdownCh)
2,427✔
241
        }
2,427✔
242
}
243

244
func (p *parallelTaskProcessorImpl) drainAndNackTasks() {
21✔
245
        for {
136✔
246
                select {
115✔
247
                case task := <-p.tasksCh:
94✔
248
                        task.Nack()
94✔
249
                default:
21✔
250
                        return
21✔
251
                }
252
        }
253
}
254

255
func (p *parallelTaskProcessorImpl) isStopped() bool {
30,072✔
256
        return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped
30,072✔
257
}
30,072✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc