• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.54
/common/isolationgroup/defaultisolationgroupstate/state.go
1
// The MIT License (MIT)
2

3
// Copyright (c) 2017-2020 Uber Technologies Inc.
4

5
// Permission is hereby granted, free of charge, to any person obtaining a copy
6
// of this software and associated documentation files (the "Software"), to deal
7
// in the Software without restriction, including without limitation the rights
8
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
// copies of the Software, and to permit persons to whom the Software is
10
// furnished to do so, subject to the following conditions:
11
//
12
// The above copyright notice and this permission notice shall be included in all
13
// copies or substantial portions of the Software.
14
//
15
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
// SOFTWARE.
22

23
package defaultisolationgroupstate
24

25
import (
26
        "context"
27
        "fmt"
28
        "sync/atomic"
29

30
        "github.com/uber/cadence/common/isolationgroup/isolationgroupapi"
31

32
        "github.com/uber/cadence/common"
33
        "github.com/uber/cadence/common/cache"
34
        "github.com/uber/cadence/common/dynamicconfig"
35
        "github.com/uber/cadence/common/isolationgroup"
36
        "github.com/uber/cadence/common/log"
37
        "github.com/uber/cadence/common/types"
38
)
39

40
type defaultIsolationGroupStateHandler struct {
41
        status                     int32
42
        done                       chan struct{}
43
        log                        log.Logger
44
        domainCache                cache.DomainCache
45
        globalIsolationGroupDrains dynamicconfig.Client
46
        config                     defaultConfig
47
        lastSeen                   *isolationGroups
48
        updateCB                   func()
49
        // subscriptions is a map of domains->subscription-keys-> subscription channels
50
        // for notifying when there's a state change
51
        subscriptions map[string]map[string]chan<- isolationgroup.ChangeEvent
52
}
53

54
// NewDefaultIsolationGroupStateWatcherWithConfigStoreClient Is a constructor which allows passing in the dynamic config client
55
func NewDefaultIsolationGroupStateWatcherWithConfigStoreClient(
56
        logger log.Logger,
57
        dc *dynamicconfig.Collection,
58
        domainCache cache.DomainCache,
59
        cfgStoreClient dynamicconfig.Client,
60
) (isolationgroup.State, error) {
13✔
61
        stopChan := make(chan struct{})
13✔
62

13✔
63
        allIGs := dc.GetListProperty(dynamicconfig.AllIsolationGroups)()
13✔
64
        allIsolationGroups, err := isolationgroupapi.MapAllIsolationGroupsResponse(allIGs)
13✔
65
        if err != nil {
13✔
66
                return nil, fmt.Errorf("could not get all isolation groups fron dynamic config: %w", err)
×
67
        }
×
68

69
        config := defaultConfig{
13✔
70
                IsolationGroupEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation),
13✔
71
                AllIsolationGroups:    allIsolationGroups,
13✔
72
        }
13✔
73

13✔
74
        return &defaultIsolationGroupStateHandler{
13✔
75
                done:                       stopChan,
13✔
76
                domainCache:                domainCache,
13✔
77
                globalIsolationGroupDrains: cfgStoreClient,
13✔
78
                status:                     common.DaemonStatusInitialized,
13✔
79
                log:                        logger,
13✔
80
                config:                     config,
13✔
81
        }, nil
13✔
82
}
83

84
func (z *defaultIsolationGroupStateHandler) AvailableIsolationGroupsByDomainID(ctx context.Context, domainID string, availableIsolationGroups []string) (types.IsolationGroupConfiguration, error) {
×
85
        state, err := z.getByDomainID(ctx, domainID)
×
86
        if err != nil {
×
87
                return nil, fmt.Errorf("unable to get isolation group state: %w", err)
×
88
        }
×
89
        isolationGroups := common.IntersectionStringSlice(z.config.AllIsolationGroups, availableIsolationGroups)
×
90
        return availableIG(isolationGroups, state.Global, state.Domain), nil
×
91
}
92

93
func (z *defaultIsolationGroupStateHandler) IsDrained(ctx context.Context, domain string, isolationGroup string) (bool, error) {
2✔
94
        state, err := z.get(ctx, domain)
2✔
95
        if err != nil {
2✔
96
                return false, fmt.Errorf("could not determine if drained: %w", err)
×
97
        }
×
98
        return isDrained(isolationGroup, state.Global, state.Domain), nil
2✔
99
}
100

101
func (z *defaultIsolationGroupStateHandler) IsDrainedByDomainID(ctx context.Context, domainID string, isolationGroup string) (bool, error) {
2✔
102
        domain, err := z.domainCache.GetDomainByID(domainID)
2✔
103
        if err != nil {
2✔
104
                return false, fmt.Errorf("could not determine if drained: %w", err)
×
105
        }
×
106
        return z.IsDrained(ctx, domain.GetInfo().Name, isolationGroup)
2✔
107
}
108

109
// Start the state handler
110
func (z *defaultIsolationGroupStateHandler) Start() {
×
111
        if !atomic.CompareAndSwapInt32(&z.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
×
112
                return
×
113
        }
×
114
        go z.updateCB()
×
115
}
116

117
func (z *defaultIsolationGroupStateHandler) Stop() {
14✔
118
        if z == nil {
14✔
119
                return
×
120
        }
×
121
        if !atomic.CompareAndSwapInt32(&z.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
28✔
122
                return
14✔
123
        }
14✔
124
        close(z.done)
×
125
}
126

127
func (z *defaultIsolationGroupStateHandler) getByDomainID(ctx context.Context, domainID string) (*isolationGroups, error) {
×
128
        domain, err := z.domainCache.GetDomainByID(domainID)
×
129
        if err != nil {
×
130
                return nil, fmt.Errorf("could not resolve domain in isolationGroup handler: %w", err)
×
131
        }
×
132
        return z.get(ctx, domain.GetInfo().Name)
×
133
}
134

135
// Get the statue of a isolationGroup, with respect to both domain and global drains. Domain-specific drains override global config
136
// will return nil, nil when it is not enabled
137
func (z *defaultIsolationGroupStateHandler) get(ctx context.Context, domain string) (*isolationGroups, error) {
2✔
138
        if !z.config.IsolationGroupEnabled(domain) {
2✔
139
                return nil, nil
×
140
        }
×
141

142
        domainData, err := z.domainCache.GetDomain(domain)
2✔
143
        if err != nil {
2✔
144
                return nil, fmt.Errorf("could not resolve domain in isolationGroup handler: %w", err)
×
145
        }
×
146

147
        domainCfg := domainData.GetConfig()
2✔
148
        var domainState types.IsolationGroupConfiguration
2✔
149
        if domainCfg != nil && domainCfg.IsolationGroups != nil {
2✔
150
                domainState = domainCfg.IsolationGroups
×
151
        }
×
152

153
        globalCfg, err := z.globalIsolationGroupDrains.GetListValue(dynamicconfig.DefaultIsolationGroupConfigStoreManagerGlobalMapping, nil)
2✔
154
        if err != nil {
2✔
155
                return nil, fmt.Errorf("could not resolve global drains in %w", err)
×
156
        }
×
157

158
        globalState, err := isolationgroupapi.MapDynamicConfigResponse(globalCfg)
2✔
159
        if err != nil {
2✔
160
                return nil, fmt.Errorf("could not resolve global drains in isolationGroup handler: %w", err)
×
161
        }
×
162

163
        ig := &isolationGroups{
2✔
164
                Global: globalState,
2✔
165
                Domain: domainState,
2✔
166
        }
2✔
167

2✔
168
        return ig, nil
2✔
169
}
170

171
// A simple explicit deny-based isolation group implementation
172
func availableIG(allIsolationGroups []string, global types.IsolationGroupConfiguration, domain types.IsolationGroupConfiguration) types.IsolationGroupConfiguration {
4✔
173
        out := types.IsolationGroupConfiguration{}
4✔
174
        for _, isolationGroup := range allIsolationGroups {
16✔
175
                globalCfg, hasGlobalConfig := global[isolationGroup]
12✔
176
                domainCfg, hasDomainConfig := domain[isolationGroup]
12✔
177
                if hasGlobalConfig {
14✔
178
                        if globalCfg.State == types.IsolationGroupStateDrained {
4✔
179
                                continue
2✔
180
                        }
181
                }
182
                if hasDomainConfig {
13✔
183
                        if domainCfg.State == types.IsolationGroupStateDrained {
6✔
184
                                continue
3✔
185
                        }
186
                }
187
                out[isolationGroup] = types.IsolationGroupPartition{
7✔
188
                        Name:  isolationGroup,
7✔
189
                        State: types.IsolationGroupStateHealthy,
7✔
190
                }
7✔
191
        }
192
        return out
4✔
193
}
194

195
func isDrained(isolationGroup string, global types.IsolationGroupConfiguration, domain types.IsolationGroupConfiguration) bool {
7✔
196
        globalCfg, hasGlobalConfig := global[isolationGroup]
7✔
197
        domainCfg, hasDomainConfig := domain[isolationGroup]
7✔
198
        if hasGlobalConfig {
12✔
199
                if globalCfg.State == types.IsolationGroupStateDrained {
8✔
200
                        return true
3✔
201
                }
3✔
202
        }
203
        if hasDomainConfig {
6✔
204
                if domainCfg.State == types.IsolationGroupStateDrained {
3✔
205
                        return true
1✔
206
                }
1✔
207
        }
208
        return false
3✔
209
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc