• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

08 May 2023 05:10PM UTC coverage: 57.153% (-0.07%) from 57.225%
0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

push

buildkite

GitHub
Update persistence layer to adopt idl update for isolation (#5254)

204 of 204 new or added lines in 15 files covered. (100.0%)

85781 of 150089 relevant lines covered (57.15%)

2419.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.54
/common/task/fifoTaskScheduler.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "sync"
25
        "sync/atomic"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/backoff"
30
        "github.com/uber/cadence/common/dynamicconfig"
31
        "github.com/uber/cadence/common/log"
32
        "github.com/uber/cadence/common/log/tag"
33
        "github.com/uber/cadence/common/metrics"
34
)
35

36
type (
37
        // FIFOTaskSchedulerOptions configs FIFO task scheduler
38
        FIFOTaskSchedulerOptions struct {
39
                QueueSize       int
40
                WorkerCount     dynamicconfig.IntPropertyFn
41
                DispatcherCount int
42
                RetryPolicy     backoff.RetryPolicy
43
        }
44

45
        fifoTaskSchedulerImpl struct {
46
                status       int32
47
                logger       log.Logger
48
                metricsScope metrics.Scope
49
                options      *FIFOTaskSchedulerOptions
50
                dispatcherWG sync.WaitGroup
51
                taskCh       chan PriorityTask
52
                shutdownCh   chan struct{}
53

54
                processor Processor
55
        }
56
)
57

58
// NewFIFOTaskScheduler creates a new FIFO task scheduler
59
// it's an no-op implementation as it simply copy tasks from
60
// one task channel to another task channel.
61
// This scheduler is only for development purpose.
62
func NewFIFOTaskScheduler(
63
        logger log.Logger,
64
        metricsClient metrics.Client,
65
        options *FIFOTaskSchedulerOptions,
66
) Scheduler {
3✔
67
        return &fifoTaskSchedulerImpl{
3✔
68
                status:       common.DaemonStatusInitialized,
3✔
69
                logger:       logger,
3✔
70
                metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
3✔
71
                options:      options,
3✔
72
                taskCh:       make(chan PriorityTask, options.QueueSize),
3✔
73
                shutdownCh:   make(chan struct{}),
3✔
74
                processor: NewParallelTaskProcessor(
3✔
75
                        logger,
3✔
76
                        metricsClient,
3✔
77
                        &ParallelTaskProcessorOptions{
3✔
78
                                QueueSize:   options.QueueSize,
3✔
79
                                WorkerCount: options.WorkerCount,
3✔
80
                                RetryPolicy: options.RetryPolicy,
3✔
81
                        },
3✔
82
                ),
3✔
83
        }
3✔
84
}
3✔
85

86
func (f *fifoTaskSchedulerImpl) Start() {
2✔
87
        if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
2✔
88
                return
×
89
        }
×
90

91
        f.processor.Start()
2✔
92

2✔
93
        f.dispatcherWG.Add(f.options.DispatcherCount)
2✔
94
        for i := 0; i != f.options.DispatcherCount; i++ {
4✔
95
                go f.dispatcher()
2✔
96
        }
2✔
97

98
        f.logger.Info("FIFO task scheduler started.")
2✔
99
}
100

101
func (f *fifoTaskSchedulerImpl) Stop() {
2✔
102
        if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
2✔
103
                return
×
104
        }
×
105

106
        close(f.shutdownCh)
2✔
107

2✔
108
        f.processor.Stop()
2✔
109

2✔
110
        f.drainAndNackTasks()
2✔
111

2✔
112
        if success := common.AwaitWaitGroup(&f.dispatcherWG, time.Minute); !success {
2✔
113
                f.logger.Warn("FIFO task scheduler timedout on shutdown.")
×
114
        }
×
115

116
        f.logger.Info("FIFO task scheduler shutdown.")
2✔
117
}
118

119
func (f *fifoTaskSchedulerImpl) Submit(
120
        task PriorityTask,
121
) error {
4,990✔
122
        f.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
4,990✔
123
        sw := f.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
4,990✔
124
        defer sw.Stop()
4,990✔
125

4,990✔
126
        if f.isStopped() {
9,835✔
127
                return ErrTaskSchedulerClosed
4,845✔
128
        }
4,845✔
129

130
        select {
145✔
131
        case f.taskCh <- task:
144✔
132
                if f.isStopped() {
144✔
133
                        f.drainAndNackTasks()
×
134
                }
×
135
                return nil
144✔
136
        case <-f.shutdownCh:
1✔
137
                return ErrTaskSchedulerClosed
1✔
138
        }
139
}
140

141
func (f *fifoTaskSchedulerImpl) TrySubmit(
142
        task PriorityTask,
143
) (bool, error) {
5,018✔
144
        if f.isStopped() {
9,894✔
145
                return false, ErrTaskSchedulerClosed
4,876✔
146
        }
4,876✔
147

148
        select {
142✔
149
        case f.taskCh <- task:
60✔
150
                f.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
60✔
151
                if f.isStopped() {
60✔
152
                        f.drainAndNackTasks()
×
153
                }
×
154
                return true, nil
60✔
155
        case <-f.shutdownCh:
×
156
                return false, ErrTaskSchedulerClosed
×
157
        default:
82✔
158
                return false, nil
82✔
159
        }
160
}
161

162
func (f *fifoTaskSchedulerImpl) dispatcher() {
2✔
163
        defer f.dispatcherWG.Done()
2✔
164

2✔
165
        for {
204✔
166
                select {
202✔
167
                case task := <-f.taskCh:
200✔
168
                        if err := f.processor.Submit(task); err != nil {
200✔
169
                                f.logger.Error("failed to submit task to processor", tag.Error(err))
×
170
                                task.Nack()
×
171
                        }
×
172
                case <-f.shutdownCh:
2✔
173
                        return
2✔
174
                }
175
        }
176
}
177

178
func (f *fifoTaskSchedulerImpl) isStopped() bool {
10,212✔
179
        return atomic.LoadInt32(&f.status) == common.DaemonStatusStopped
10,212✔
180
}
10,212✔
181

182
func (f *fifoTaskSchedulerImpl) drainAndNackTasks() {
2✔
183
        for {
6✔
184
                select {
4✔
185
                case task := <-f.taskCh:
2✔
186
                        task.Nack()
2✔
187
                default:
2✔
188
                        return
2✔
189
                }
190
        }
191
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc