• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0188b6ba-6aaf-4739-8b30-52eaead5800c

13 Jun 2023 09:47PM UTC coverage: 57.204% (-0.01%) from 57.217%
0188b6ba-6aaf-4739-8b30-52eaead5800c

push

buildkite

web-flow
Bugfix/isolation groups domain drains (#5315)

What changed?
Fixes several bugs in the domain isolation-group handling which wasn't tested properly in replication it notably:

Fixes the problem of upserting configuration from the inactive region, which was previously would error
Fixes the problem of replication of configuration, which was entirely not working
Refactors the domain controller by splitting out this functionality into its own, much simpler function rather than continuing to overload the already incomprehensible domain controller.
Why?

How did you test it?

cadence --env docstore-prod11 --proxy_region dca admin isolation-groups get-domain --domain cadence-canary-global
Isolation Groups State
asdf5            Drained
asfd             Drained
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region dca admin isolation-groups update-domain --domain cadence-canary-global  --remove-all-drains
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region dca admin isolation-groups get-domain --domain cadence-canary-global
-- No groups found --
-------------------------------------------------------------------------------------------------------------------------------------------------------------
~ » cadence --env an-env --proxy_region phx admin isolation-groups get-domain --domain cadence-canary-global
-- No groups found --
Potential risks

141 of 141 new or added lines in 5 files covered. (100.0%)

86988 of 152065 relevant lines covered (57.2%)

2482.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.54
/common/task/fifoTaskScheduler.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "sync"
25
        "sync/atomic"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/backoff"
30
        "github.com/uber/cadence/common/dynamicconfig"
31
        "github.com/uber/cadence/common/log"
32
        "github.com/uber/cadence/common/log/tag"
33
        "github.com/uber/cadence/common/metrics"
34
)
35

36
type (
37
        // FIFOTaskSchedulerOptions configs FIFO task scheduler
38
        FIFOTaskSchedulerOptions struct {
39
                QueueSize       int
40
                WorkerCount     dynamicconfig.IntPropertyFn
41
                DispatcherCount int
42
                RetryPolicy     backoff.RetryPolicy
43
        }
44

45
        fifoTaskSchedulerImpl struct {
46
                status       int32
47
                logger       log.Logger
48
                metricsScope metrics.Scope
49
                options      *FIFOTaskSchedulerOptions
50
                dispatcherWG sync.WaitGroup
51
                taskCh       chan PriorityTask
52
                shutdownCh   chan struct{}
53

54
                processor Processor
55
        }
56
)
57

58
// NewFIFOTaskScheduler creates a new FIFO task scheduler
59
// it's an no-op implementation as it simply copy tasks from
60
// one task channel to another task channel.
61
// This scheduler is only for development purpose.
62
func NewFIFOTaskScheduler(
63
        logger log.Logger,
64
        metricsClient metrics.Client,
65
        options *FIFOTaskSchedulerOptions,
66
) Scheduler {
3✔
67
        return &fifoTaskSchedulerImpl{
3✔
68
                status:       common.DaemonStatusInitialized,
3✔
69
                logger:       logger,
3✔
70
                metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
3✔
71
                options:      options,
3✔
72
                taskCh:       make(chan PriorityTask, options.QueueSize),
3✔
73
                shutdownCh:   make(chan struct{}),
3✔
74
                processor: NewParallelTaskProcessor(
3✔
75
                        logger,
3✔
76
                        metricsClient,
3✔
77
                        &ParallelTaskProcessorOptions{
3✔
78
                                QueueSize:   options.QueueSize,
3✔
79
                                WorkerCount: options.WorkerCount,
3✔
80
                                RetryPolicy: options.RetryPolicy,
3✔
81
                        },
3✔
82
                ),
3✔
83
        }
3✔
84
}
3✔
85

86
func (f *fifoTaskSchedulerImpl) Start() {
2✔
87
        if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
2✔
88
                return
×
89
        }
×
90

91
        f.processor.Start()
2✔
92

2✔
93
        f.dispatcherWG.Add(f.options.DispatcherCount)
2✔
94
        for i := 0; i != f.options.DispatcherCount; i++ {
4✔
95
                go f.dispatcher()
2✔
96
        }
2✔
97

98
        f.logger.Info("FIFO task scheduler started.")
2✔
99
}
100

101
func (f *fifoTaskSchedulerImpl) Stop() {
2✔
102
        if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
2✔
103
                return
×
104
        }
×
105

106
        close(f.shutdownCh)
2✔
107

2✔
108
        f.processor.Stop()
2✔
109

2✔
110
        f.drainAndNackTasks()
2✔
111

2✔
112
        if success := common.AwaitWaitGroup(&f.dispatcherWG, time.Minute); !success {
2✔
113
                f.logger.Warn("FIFO task scheduler timedout on shutdown.")
×
114
        }
×
115

116
        f.logger.Info("FIFO task scheduler shutdown.")
2✔
117
}
118

119
func (f *fifoTaskSchedulerImpl) Submit(
120
        task PriorityTask,
121
) error {
4,990✔
122
        f.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
4,990✔
123
        sw := f.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
4,990✔
124
        defer sw.Stop()
4,990✔
125

4,990✔
126
        if f.isStopped() {
9,367✔
127
                return ErrTaskSchedulerClosed
4,377✔
128
        }
4,377✔
129

130
        select {
613✔
131
        case f.taskCh <- task:
612✔
132
                if f.isStopped() {
612✔
133
                        f.drainAndNackTasks()
×
134
                }
×
135
                return nil
612✔
136
        case <-f.shutdownCh:
1✔
137
                return ErrTaskSchedulerClosed
1✔
138
        }
139
}
140

141
func (f *fifoTaskSchedulerImpl) TrySubmit(
142
        task PriorityTask,
143
) (bool, error) {
5,018✔
144
        if f.isStopped() {
9,388✔
145
                return false, ErrTaskSchedulerClosed
4,370✔
146
        }
4,370✔
147

148
        select {
648✔
149
        case f.taskCh <- task:
239✔
150
                f.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
239✔
151
                if f.isStopped() {
239✔
152
                        f.drainAndNackTasks()
×
153
                }
×
154
                return true, nil
239✔
155
        case <-f.shutdownCh:
×
156
                return false, ErrTaskSchedulerClosed
×
157
        default:
409✔
158
                return false, nil
409✔
159
        }
160
}
161

162
func (f *fifoTaskSchedulerImpl) dispatcher() {
2✔
163
        defer f.dispatcherWG.Done()
2✔
164

2✔
165
        for {
851✔
166
                select {
849✔
167
                case task := <-f.taskCh:
847✔
168
                        if err := f.processor.Submit(task); err != nil {
847✔
169
                                f.logger.Error("failed to submit task to processor", tag.Error(err))
×
170
                                task.Nack()
×
171
                        }
×
172
                case <-f.shutdownCh:
2✔
173
                        return
2✔
174
                }
175
        }
176
}
177

178
func (f *fifoTaskSchedulerImpl) isStopped() bool {
10,859✔
179
        return atomic.LoadInt32(&f.status) == common.DaemonStatusStopped
10,859✔
180
}
10,859✔
181

182
func (f *fifoTaskSchedulerImpl) drainAndNackTasks() {
2✔
183
        for {
6✔
184
                select {
4✔
185
                case task := <-f.taskCh:
2✔
186
                        task.Nack()
2✔
187
                default:
2✔
188
                        return
2✔
189
                }
190
        }
191
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc