• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/cmd/server/cadence/server.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package cadence
22

23
import (
24
        "log"
25
        "time"
26

27
        "github.com/uber/cadence/common/persistence"
28

29
        "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
30
        "go.uber.org/cadence/compatibility"
31

32
        apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"
33

34
        "github.com/uber/cadence/common"
35
        "github.com/uber/cadence/common/archiver"
36
        "github.com/uber/cadence/common/archiver/provider"
37
        "github.com/uber/cadence/common/blobstore/filestore"
38
        "github.com/uber/cadence/common/cluster"
39
        "github.com/uber/cadence/common/config"
40
        "github.com/uber/cadence/common/dynamicconfig"
41
        "github.com/uber/cadence/common/dynamicconfig/configstore"
42
        "github.com/uber/cadence/common/elasticsearch"
43
        "github.com/uber/cadence/common/log/loggerimpl"
44
        "github.com/uber/cadence/common/log/tag"
45
        "github.com/uber/cadence/common/membership"
46
        "github.com/uber/cadence/common/messaging/kafka"
47
        "github.com/uber/cadence/common/metrics"
48
        "github.com/uber/cadence/common/peerprovider/ringpopprovider"
49
        "github.com/uber/cadence/common/resource"
50
        "github.com/uber/cadence/common/rpc"
51
        "github.com/uber/cadence/common/service"
52
        "github.com/uber/cadence/service/frontend"
53
        "github.com/uber/cadence/service/history"
54
        "github.com/uber/cadence/service/matching"
55
        "github.com/uber/cadence/service/worker"
56
)
57

58
type (
59
        server struct {
60
                name   string
61
                cfg    *config.Config
62
                doneC  chan struct{}
63
                daemon common.Daemon
64
        }
65
)
66

67
// newServer returns a new instance of a daemon
68
// that represents a cadence service
69
func newServer(service string, cfg *config.Config) common.Daemon {
×
70
        return &server{
×
71
                cfg:   cfg,
×
72
                name:  service,
×
73
                doneC: make(chan struct{}),
×
74
        }
×
75
}
×
76

77
// Start starts the server
78
func (s *server) Start() {
×
79
        s.daemon = s.startService()
×
80
}
×
81

82
// Stop stops the server
83
func (s *server) Stop() {
×
84

×
85
        if s.daemon == nil {
×
86
                return
×
87
        }
×
88

89
        select {
×
90
        case <-s.doneC:
×
91
        default:
×
92
                s.daemon.Stop()
×
93
                select {
×
94
                case <-s.doneC:
×
95
                case <-time.After(time.Minute):
×
96
                        log.Printf("timed out waiting for server %v to exit\n", s.name)
×
97
                }
98
        }
99
}
100

101
// startService starts a service with the given name and config
102
func (s *server) startService() common.Daemon {
×
103
        svcCfg, err := s.cfg.GetServiceConfig(s.name)
×
104
        if err != nil {
×
105
                log.Fatal(err.Error())
×
106
        }
×
107

108
        params := resource.Params{}
×
109
        params.Name = service.FullName(s.name)
×
110

×
111
        zapLogger, err := s.cfg.Log.NewZapLogger()
×
112
        if err != nil {
×
113
                log.Fatal("failed to create the zap logger, err: ", err.Error())
×
114
        }
×
115
        params.Logger = loggerimpl.NewLogger(zapLogger).WithTags(tag.Service(params.Name))
×
116

×
117
        params.PersistenceConfig = s.cfg.Persistence
×
118

×
119
        err = nil
×
120
        if s.cfg.DynamicConfig.Client == "" {
×
121
                params.Logger.Warn("falling back to legacy file based dynamicClientConfig")
×
122
                params.DynamicConfig, err = dynamicconfig.NewFileBasedClient(&s.cfg.DynamicConfigClient, params.Logger, s.doneC)
×
123
        } else {
×
124
                switch s.cfg.DynamicConfig.Client {
×
125
                case dynamicconfig.ConfigStoreClient:
×
126
                        params.Logger.Info("initialising ConfigStore dynamic config client")
×
127
                        params.DynamicConfig, err = configstore.NewConfigStoreClient(
×
128
                                &s.cfg.DynamicConfig.ConfigStore,
×
129
                                &s.cfg.Persistence,
×
130
                                params.Logger,
×
131
                                persistence.DynamicConfig,
×
132
                        )
×
133
                case dynamicconfig.FileBasedClient:
×
134
                        params.Logger.Info("initialising File Based dynamic config client")
×
135
                        params.DynamicConfig, err = dynamicconfig.NewFileBasedClient(&s.cfg.DynamicConfig.FileBased, params.Logger, s.doneC)
×
136
                default:
×
137
                        params.Logger.Info("initialising NOP dynamic config client")
×
138
                        params.DynamicConfig = dynamicconfig.NewNopClient()
×
139
                }
140
        }
141

142
        if err != nil {
×
143
                params.Logger.Error("creating dynamic config client failed, using no-op config client instead", tag.Error(err))
×
144
                params.DynamicConfig = dynamicconfig.NewNopClient()
×
145
        }
×
146

147
        clusterGroupMetadata := s.cfg.ClusterGroupMetadata
×
148
        dc := dynamicconfig.NewCollection(
×
149
                params.DynamicConfig,
×
150
                params.Logger,
×
151
                dynamicconfig.ClusterNameFilter(clusterGroupMetadata.CurrentClusterName),
×
152
        )
×
153

×
154
        params.MetricScope = svcCfg.Metrics.NewScope(params.Logger, params.Name)
×
155

×
156
        rpcParams, err := rpc.NewParams(params.Name, s.cfg, dc)
×
157
        if err != nil {
×
158
                log.Fatalf("error creating rpc factory params: %v", err)
×
159
        }
×
160
        rpcParams.OutboundsBuilder = rpc.CombineOutbounds(
×
161
                rpcParams.OutboundsBuilder,
×
162
                rpc.NewCrossDCOutbounds(clusterGroupMetadata.ClusterGroup, rpc.NewDNSPeerChooserFactory(s.cfg.PublicClient.RefreshInterval, params.Logger)),
×
163
        )
×
164
        rpcFactory := rpc.NewFactory(params.Logger, rpcParams)
×
165
        params.RPCFactory = rpcFactory
×
166

×
167
        peerProvider, err := ringpopprovider.New(
×
168
                params.Name,
×
169
                &s.cfg.Ringpop,
×
170
                rpcFactory.GetChannel(),
×
171
                membership.PortMap{
×
172
                        membership.PortGRPC:     svcCfg.RPC.GRPCPort,
×
173
                        membership.PortTchannel: svcCfg.RPC.Port,
×
174
                },
×
175
                params.Logger,
×
176
        )
×
177

×
178
        if err != nil {
×
179
                log.Fatalf("ringpop provider failed: %v", err)
×
180
        }
×
181

182
        params.MembershipResolver, err = membership.NewResolver(
×
183
                peerProvider,
×
184
                params.Logger,
×
185
        )
×
186
        if err != nil {
×
187
                log.Fatalf("error creating membership monitor: %v", err)
×
188
        }
×
189
        params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)
×
190

×
191
        params.ClusterRedirectionPolicy = s.cfg.ClusterGroupMetadata.ClusterRedirectionPolicy
×
192

×
193
        params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))
×
194

×
195
        params.ClusterMetadata = cluster.NewMetadata(
×
196
                clusterGroupMetadata.FailoverVersionIncrement,
×
197
                clusterGroupMetadata.PrimaryClusterName,
×
198
                clusterGroupMetadata.CurrentClusterName,
×
199
                clusterGroupMetadata.ClusterGroup,
×
200
                dc.GetBoolPropertyFilteredByDomain(dynamicconfig.UseNewInitialFailoverVersion),
×
201
                params.MetricsClient,
×
202
                params.Logger,
×
203
        )
×
204

×
205
        advancedVisMode := dc.GetStringProperty(
×
206
                dynamicconfig.AdvancedVisibilityWritingMode,
×
207
        )()
×
208
        isAdvancedVisEnabled := common.IsAdvancedVisibilityWritingEnabled(advancedVisMode, params.PersistenceConfig.IsAdvancedVisibilityConfigExist())
×
209
        if isAdvancedVisEnabled {
×
210
                params.MessagingClient = kafka.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, params.Logger, params.MetricScope, isAdvancedVisEnabled)
×
211
        } else {
×
212
                params.MessagingClient = nil
×
213
        }
×
214

215
        if isAdvancedVisEnabled {
×
216
                // verify config of advanced visibility store
×
217
                advancedVisStoreKey := s.cfg.Persistence.AdvancedVisibilityStore
×
218
                advancedVisStore, ok := s.cfg.Persistence.DataStores[advancedVisStoreKey]
×
219
                if !ok {
×
220
                        log.Fatalf("not able to find advanced visibility store in config: %v", advancedVisStoreKey)
×
221
                }
×
222

223
                params.ESConfig = advancedVisStore.ElasticSearch
×
224
                params.ESConfig.SetUsernamePassword()
×
225
                esClient, err := elasticsearch.NewGenericClient(params.ESConfig, params.Logger)
×
226
                if err != nil {
×
227
                        log.Fatalf("error creating elastic search client: %v", err)
×
228
                }
×
229
                params.ESClient = esClient
×
230

×
231
                // verify index name
×
232
                indexName, ok := params.ESConfig.Indices[common.VisibilityAppName]
×
233
                if !ok || len(indexName) == 0 {
×
234
                        log.Fatalf("elastic search config missing visibility index")
×
235
                }
×
236
        }
237

238
        publicClientConfig := params.RPCFactory.GetDispatcher().ClientConfig(rpc.OutboundPublicClient)
×
239
        if rpc.IsGRPCOutbound(publicClientConfig) {
×
240
                params.PublicClient = compatibility.NewThrift2ProtoAdapter(
×
241
                        apiv1.NewDomainAPIYARPCClient(publicClientConfig),
×
242
                        apiv1.NewWorkflowAPIYARPCClient(publicClientConfig),
×
243
                        apiv1.NewWorkerAPIYARPCClient(publicClientConfig),
×
244
                        apiv1.NewVisibilityAPIYARPCClient(publicClientConfig),
×
245
                )
×
246
        } else {
×
247
                params.PublicClient = workflowserviceclient.New(publicClientConfig)
×
248
        }
×
249

250
        params.ArchivalMetadata = archiver.NewArchivalMetadata(
×
251
                dc,
×
252
                s.cfg.Archival.History.Status,
×
253
                s.cfg.Archival.History.EnableRead,
×
254
                s.cfg.Archival.Visibility.Status,
×
255
                s.cfg.Archival.Visibility.EnableRead,
×
256
                &s.cfg.DomainDefaults.Archival,
×
257
        )
×
258

×
259
        params.ArchiverProvider = provider.NewArchiverProvider(s.cfg.Archival.History.Provider, s.cfg.Archival.Visibility.Provider)
×
260
        params.PersistenceConfig.TransactionSizeLimit = dc.GetIntProperty(dynamicconfig.TransactionSizeLimit)
×
261
        params.PersistenceConfig.ErrorInjectionRate = dc.GetFloat64Property(dynamicconfig.PersistenceErrorInjectionRate)
×
262
        params.AuthorizationConfig = s.cfg.Authorization
×
263
        params.BlobstoreClient, err = filestore.NewFilestoreClient(s.cfg.Blobstore.Filestore)
×
264
        if err != nil {
×
265
                log.Printf("failed to create file blobstore client, will continue startup without it: %v", err)
×
266
                params.BlobstoreClient = nil
×
267
        }
×
268

269
        params.Logger.Info("Starting service " + s.name)
×
270

×
271
        var daemon common.Daemon
×
272

×
273
        switch params.Name {
×
274
        case service.Frontend:
×
275
                daemon, err = frontend.NewService(&params)
×
276
        case service.History:
×
277
                daemon, err = history.NewService(&params)
×
278
        case service.Matching:
×
279
                daemon, err = matching.NewService(&params)
×
280
        case service.Worker:
×
281
                daemon, err = worker.NewService(&params)
×
282
        }
283
        if err != nil {
×
284
                params.Logger.Fatal("Fail to start "+s.name+" service ", tag.Error(err))
×
285
        }
×
286

287
        go execute(daemon, s.doneC)
×
288

×
289
        return daemon
×
290
}
291

292
// execute runs the daemon in a separate go routine
293
func execute(d common.Daemon, doneC chan struct{}) {
×
294
        d.Start()
×
295
        close(doneC)
×
296
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc