• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.57
/common/resource/resourceImpl.go
1
// Copyright (c) 2019 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package resource
22

23
import (
24
        "math/rand"
25
        "sync/atomic"
26
        "time"
27

28
        "github.com/uber/cadence/common/dynamicconfig/configstore"
29
        csc "github.com/uber/cadence/common/dynamicconfig/configstore/config"
30

31
        "github.com/uber/cadence/common/isolationgroup/defaultisolationgroupstate"
32

33
        "github.com/uber/cadence/common/isolationgroup"
34
        "github.com/uber/cadence/common/partition"
35

36
        "github.com/uber-go/tally"
37
        "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
38
        "go.uber.org/yarpc"
39

40
        "github.com/uber/cadence/client"
41
        "github.com/uber/cadence/client/admin"
42
        "github.com/uber/cadence/client/frontend"
43
        "github.com/uber/cadence/client/history"
44
        "github.com/uber/cadence/client/matching"
45
        "github.com/uber/cadence/common"
46
        "github.com/uber/cadence/common/archiver"
47
        "github.com/uber/cadence/common/archiver/provider"
48
        "github.com/uber/cadence/common/blobstore"
49
        "github.com/uber/cadence/common/cache"
50
        "github.com/uber/cadence/common/clock"
51
        "github.com/uber/cadence/common/cluster"
52
        "github.com/uber/cadence/common/domain"
53
        "github.com/uber/cadence/common/dynamicconfig"
54
        "github.com/uber/cadence/common/log"
55
        "github.com/uber/cadence/common/log/loggerimpl"
56
        "github.com/uber/cadence/common/log/tag"
57
        "github.com/uber/cadence/common/membership"
58
        "github.com/uber/cadence/common/messaging"
59
        "github.com/uber/cadence/common/metrics"
60
        "github.com/uber/cadence/common/persistence"
61
        persistenceClient "github.com/uber/cadence/common/persistence/client"
62
        "github.com/uber/cadence/common/quotas"
63
        "github.com/uber/cadence/common/service"
64
)
65

66
type (
67

68
        // VisibilityManagerInitializer is the function each service should implement
69
        // for visibility manager initialization
70
        VisibilityManagerInitializer func(
71
                persistenceBean persistenceClient.Bean,
72
                logger log.Logger,
73
        ) (persistence.VisibilityManager, error)
74

75
        // Impl contains all common resources shared across frontend / matching / history / worker
76
        Impl struct {
77
                status int32
78

79
                // static infos
80
                numShards       int
81
                serviceName     string
82
                hostInfo        membership.HostInfo
83
                metricsScope    tally.Scope
84
                clusterMetadata cluster.Metadata
85

86
                // other common resources
87

88
                domainCache             cache.DomainCache
89
                domainMetricsScopeCache cache.DomainMetricsScopeCache
90
                timeSource              clock.TimeSource
91
                payloadSerializer       persistence.PayloadSerializer
92
                metricsClient           metrics.Client
93
                messagingClient         messaging.Client
94
                blobstoreClient         blobstore.Client
95
                archivalMetadata        archiver.ArchivalMetadata
96
                archiverProvider        provider.ArchiverProvider
97
                domainReplicationQueue  domain.ReplicationQueue
98

99
                // membership infos
100

101
                membershipResolver membership.Resolver
102

103
                // internal services clients
104

105
                sdkClient         workflowserviceclient.Interface
106
                frontendRawClient frontend.Client
107
                frontendClient    frontend.Client
108
                matchingRawClient matching.Client
109
                matchingClient    matching.Client
110
                historyRawClient  history.Client
111
                historyClient     history.Client
112
                clientBean        client.Bean
113

114
                // persistence clients
115
                persistenceBean persistenceClient.Bean
116

117
                // hostName
118
                hostName string
119

120
                // loggers
121
                logger          log.Logger
122
                throttledLogger log.Logger
123

124
                // for registering handlers
125
                dispatcher *yarpc.Dispatcher
126

127
                // internal vars
128

129
                pprofInitializer       common.PProfInitializer
130
                runtimeMetricsReporter *metrics.RuntimeMetricsReporter
131
                rpcFactory             common.RPCFactory
132

133
                isolationGroups           isolationgroup.State
134
                isolationGroupConfigStore configstore.Client
135
                partitioner               partition.Partitioner
136
        }
137
)
138

139
var _ Resource = (*Impl)(nil)
140

141
// New create a new resource containing common dependencies
142
func New(
143
        params *Params,
144
        serviceName string,
145
        serviceConfig *service.Config,
146
) (impl *Impl, retError error) {
39✔
147

39✔
148
        hostname := params.HostName
39✔
149

39✔
150
        logger := params.Logger
39✔
151
        throttledLogger := loggerimpl.NewThrottledLogger(logger, serviceConfig.ThrottledLoggerMaxRPS)
39✔
152

39✔
153
        numShards := params.PersistenceConfig.NumHistoryShards
39✔
154
        dispatcher := params.RPCFactory.GetDispatcher()
39✔
155
        membershipResolver := params.MembershipResolver
39✔
156

39✔
157
        dynamicCollection := dynamicconfig.NewCollection(
39✔
158
                params.DynamicConfig,
39✔
159
                logger,
39✔
160
                dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()),
39✔
161
        )
39✔
162
        clientBean, err := client.NewClientBean(
39✔
163
                client.NewRPCClientFactory(
39✔
164
                        params.RPCFactory,
39✔
165
                        membershipResolver,
39✔
166
                        params.MetricsClient,
39✔
167
                        dynamicCollection,
39✔
168
                        numShards,
39✔
169
                        logger,
39✔
170
                ),
39✔
171
                params.RPCFactory.GetDispatcher(),
39✔
172
                params.ClusterMetadata,
39✔
173
        )
39✔
174
        if err != nil {
39✔
175
                return nil, err
×
176
        }
×
177

178
        persistenceBean, err := persistenceClient.NewBeanFromFactory(persistenceClient.NewFactory(
39✔
179
                &params.PersistenceConfig,
39✔
180
                quotas.PerMemberDynamic(
39✔
181
                        serviceName,
39✔
182
                        serviceConfig.PersistenceGlobalMaxQPS.AsFloat64(),
39✔
183
                        serviceConfig.PersistenceMaxQPS.AsFloat64(),
39✔
184
                        membershipResolver,
39✔
185
                ),
39✔
186
                params.ClusterMetadata.GetCurrentClusterName(),
39✔
187
                params.MetricsClient,
39✔
188
                logger,
39✔
189
                persistence.NewDynamicConfiguration(dynamicCollection),
39✔
190
        ), &persistenceClient.Params{
39✔
191
                PersistenceConfig: params.PersistenceConfig,
39✔
192
                MetricsClient:     params.MetricsClient,
39✔
193
                MessagingClient:   params.MessagingClient,
39✔
194
                ESClient:          params.ESClient,
39✔
195
                ESConfig:          params.ESConfig,
39✔
196
        }, serviceConfig)
39✔
197
        if err != nil {
39✔
198
                return nil, err
×
199
        }
×
200

201
        domainCache := cache.NewDomainCache(
39✔
202
                persistenceBean.GetDomainManager(),
39✔
203
                params.ClusterMetadata,
39✔
204
                params.MetricsClient,
39✔
205
                logger,
39✔
206
        )
39✔
207

39✔
208
        domainMetricsScopeCache := cache.NewDomainMetricsScopeCache()
39✔
209
        domainReplicationQueue := domain.NewReplicationQueue(
39✔
210
                persistenceBean.GetDomainReplicationQueueManager(),
39✔
211
                params.ClusterMetadata.GetCurrentClusterName(),
39✔
212
                params.MetricsClient,
39✔
213
                logger,
39✔
214
        )
39✔
215

39✔
216
        frontendRawClient := clientBean.GetFrontendClient()
39✔
217
        frontendClient := frontend.NewRetryableClient(
39✔
218
                frontendRawClient,
39✔
219
                common.CreateFrontendServiceRetryPolicy(),
39✔
220
                common.IsServiceTransientError,
39✔
221
        )
39✔
222

39✔
223
        matchingRawClient, err := clientBean.GetMatchingClient(domainCache.GetDomainName)
39✔
224
        if err != nil {
39✔
225
                return nil, err
×
226
        }
×
227
        matchingClient := matching.NewRetryableClient(
39✔
228
                matchingRawClient,
39✔
229
                common.CreateMatchingServiceRetryPolicy(),
39✔
230
                common.IsServiceTransientError,
39✔
231
        )
39✔
232

39✔
233
        historyRawClient := clientBean.GetHistoryClient()
39✔
234
        historyClient := history.NewRetryableClient(
39✔
235
                historyRawClient,
39✔
236
                common.CreateHistoryServiceRetryPolicy(),
39✔
237
                common.IsServiceTransientError,
39✔
238
        )
39✔
239

39✔
240
        historyArchiverBootstrapContainer := &archiver.HistoryBootstrapContainer{
39✔
241
                HistoryV2Manager: persistenceBean.GetHistoryManager(),
39✔
242
                Logger:           logger,
39✔
243
                MetricsClient:    params.MetricsClient,
39✔
244
                ClusterMetadata:  params.ClusterMetadata,
39✔
245
                DomainCache:      domainCache,
39✔
246
        }
39✔
247
        visibilityArchiverBootstrapContainer := &archiver.VisibilityBootstrapContainer{
39✔
248
                Logger:          logger,
39✔
249
                MetricsClient:   params.MetricsClient,
39✔
250
                ClusterMetadata: params.ClusterMetadata,
39✔
251
                DomainCache:     domainCache,
39✔
252
        }
39✔
253
        if err := params.ArchiverProvider.RegisterBootstrapContainer(
39✔
254
                serviceName,
39✔
255
                historyArchiverBootstrapContainer,
39✔
256
                visibilityArchiverBootstrapContainer,
39✔
257
        ); err != nil {
39✔
258
                return nil, err
×
259
        }
×
260

261
        isolationGroupStore := createConfigStoreOrDefault(params, dynamicCollection)
39✔
262

39✔
263
        isolationGroupState := createIsolationGroupStateHandlerOrDefault(
39✔
264
                params,
39✔
265
                dynamicCollection,
39✔
266
                domainCache,
39✔
267
                isolationGroupStore,
39✔
268
        )
39✔
269
        partitioner := createPartitionerOrDefault(params, dynamicCollection, isolationGroupState)
39✔
270

39✔
271
        impl = &Impl{
39✔
272
                status: common.DaemonStatusInitialized,
39✔
273

39✔
274
                // static infos
39✔
275

39✔
276
                numShards:       numShards,
39✔
277
                serviceName:     params.Name,
39✔
278
                metricsScope:    params.MetricScope,
39✔
279
                clusterMetadata: params.ClusterMetadata,
39✔
280

39✔
281
                // other common resources
39✔
282

39✔
283
                domainCache:             domainCache,
39✔
284
                domainMetricsScopeCache: domainMetricsScopeCache,
39✔
285
                timeSource:              clock.NewRealTimeSource(),
39✔
286
                payloadSerializer:       persistence.NewPayloadSerializer(),
39✔
287
                metricsClient:           params.MetricsClient,
39✔
288
                messagingClient:         params.MessagingClient,
39✔
289
                blobstoreClient:         params.BlobstoreClient,
39✔
290
                archivalMetadata:        params.ArchivalMetadata,
39✔
291
                archiverProvider:        params.ArchiverProvider,
39✔
292
                domainReplicationQueue:  domainReplicationQueue,
39✔
293

39✔
294
                // membership infos
39✔
295
                membershipResolver: membershipResolver,
39✔
296

39✔
297
                // internal services clients
39✔
298

39✔
299
                sdkClient:         params.PublicClient,
39✔
300
                frontendRawClient: frontendRawClient,
39✔
301
                frontendClient:    frontendClient,
39✔
302
                matchingRawClient: matchingRawClient,
39✔
303
                matchingClient:    matchingClient,
39✔
304
                historyRawClient:  historyRawClient,
39✔
305
                historyClient:     historyClient,
39✔
306
                clientBean:        clientBean,
39✔
307

39✔
308
                // persistence clients
39✔
309
                persistenceBean: persistenceBean,
39✔
310

39✔
311
                // hostname
39✔
312
                hostName: hostname,
39✔
313

39✔
314
                // loggers
39✔
315

39✔
316
                logger:          logger,
39✔
317
                throttledLogger: throttledLogger,
39✔
318

39✔
319
                // for registering handlers
39✔
320
                dispatcher: dispatcher,
39✔
321

39✔
322
                // internal vars
39✔
323
                pprofInitializer: params.PProfInitializer,
39✔
324
                runtimeMetricsReporter: metrics.NewRuntimeMetricsReporter(
39✔
325
                        params.MetricScope,
39✔
326
                        time.Minute,
39✔
327
                        logger,
39✔
328
                        params.InstanceID,
39✔
329
                ),
39✔
330
                rpcFactory:                params.RPCFactory,
39✔
331
                isolationGroups:           isolationGroupState, // can be nil where persistence is not available
39✔
332
                isolationGroupConfigStore: isolationGroupStore, // can be nil where persistence is not available
39✔
333
                partitioner:               partitioner,         // can be nil where persistence is not available
39✔
334
        }
39✔
335
        return impl, nil
39✔
336
}
337

338
// Start start all resources
339
func (h *Impl) Start() {
39✔
340

39✔
341
        if !atomic.CompareAndSwapInt32(
39✔
342
                &h.status,
39✔
343
                common.DaemonStatusInitialized,
39✔
344
                common.DaemonStatusStarted,
39✔
345
        ) {
39✔
346
                return
×
347
        }
×
348

349
        h.metricsScope.Counter(metrics.RestartCount).Inc(1)
39✔
350
        h.runtimeMetricsReporter.Start()
39✔
351

39✔
352
        if err := h.pprofInitializer.Start(); err != nil {
39✔
353
                h.logger.WithTags(tag.Error(err)).Fatal("fail to start PProf")
×
354
        }
×
355
        if err := h.dispatcher.Start(); err != nil {
39✔
356
                h.logger.WithTags(tag.Error(err)).Fatal("fail to start dispatcher")
×
357
        }
×
358
        h.membershipResolver.Start()
39✔
359
        h.domainCache.Start()
39✔
360
        h.domainMetricsScopeCache.Start()
39✔
361

39✔
362
        hostInfo, err := h.membershipResolver.WhoAmI()
39✔
363
        if err != nil {
39✔
364
                h.logger.WithTags(tag.Error(err)).Fatal("fail to get host info from membership monitor")
×
365
        }
×
366
        h.hostInfo = hostInfo
39✔
367

39✔
368
        if h.isolationGroupConfigStore != nil {
52✔
369
                h.isolationGroupConfigStore.Start()
13✔
370
        }
13✔
371
        // The service is now started up
372
        h.logger.Info("service started")
39✔
373
        // seed the random generator once for this service
39✔
374
        rand.Seed(time.Now().UTC().UnixNano())
39✔
375
}
376

377
// Stop stops all resources
378
func (h *Impl) Stop() {
40✔
379

40✔
380
        if !atomic.CompareAndSwapInt32(
40✔
381
                &h.status,
40✔
382
                common.DaemonStatusStarted,
40✔
383
                common.DaemonStatusStopped,
40✔
384
        ) {
41✔
385
                return
1✔
386
        }
1✔
387

388
        h.domainCache.Stop()
39✔
389
        h.domainMetricsScopeCache.Stop()
39✔
390
        h.membershipResolver.Stop()
39✔
391
        if err := h.dispatcher.Stop(); err != nil {
39✔
392
                h.logger.WithTags(tag.Error(err)).Error("failed to stop dispatcher")
×
393
        }
×
394
        h.runtimeMetricsReporter.Stop()
39✔
395
        h.persistenceBean.Close()
39✔
396
        if h.isolationGroupConfigStore != nil {
52✔
397
                h.isolationGroupConfigStore.Stop()
13✔
398
        }
13✔
399
        if h.isolationGroups != nil {
52✔
400
                h.isolationGroups.Stop()
13✔
401
        }
13✔
402
}
403

404
// GetServiceName return service name
405
func (h *Impl) GetServiceName() string {
×
406
        return h.serviceName
×
407
}
×
408

409
// GetHostInfo return host info
410
func (h *Impl) GetHostInfo() membership.HostInfo {
375✔
411
        return h.hostInfo
375✔
412
}
375✔
413

414
// GetClusterMetadata return cluster metadata
415
func (h *Impl) GetClusterMetadata() cluster.Metadata {
25,706✔
416
        return h.clusterMetadata
25,706✔
417
}
25,706✔
418

419
// other common resources
420

421
// GetDomainCache return domain cache
422
func (h *Impl) GetDomainCache() cache.DomainCache {
94,854✔
423
        return h.domainCache
94,854✔
424
}
94,854✔
425

426
// GetDomainMetricsScopeCache return domainMetricsScope cache
427
func (h *Impl) GetDomainMetricsScopeCache() cache.DomainMetricsScopeCache {
7,988✔
428
        return h.domainMetricsScopeCache
7,988✔
429
}
7,988✔
430

431
// GetTimeSource return time source
432
func (h *Impl) GetTimeSource() clock.TimeSource {
57,873✔
433
        return h.timeSource
57,873✔
434
}
57,873✔
435

436
// GetPayloadSerializer return binary payload serializer
437
func (h *Impl) GetPayloadSerializer() persistence.PayloadSerializer {
51✔
438
        return h.payloadSerializer
51✔
439
}
51✔
440

441
// GetMetricsClient return metrics client
442
func (h *Impl) GetMetricsClient() metrics.Client {
34,695✔
443
        return h.metricsClient
34,695✔
444
}
34,695✔
445

446
// GetMessagingClient return messaging client
447
func (h *Impl) GetMessagingClient() messaging.Client {
×
448
        return h.messagingClient
×
449
}
×
450

451
// GetBlobstoreClient returns blobstore client
452
func (h *Impl) GetBlobstoreClient() blobstore.Client {
×
453
        return h.blobstoreClient
×
454
}
×
455

456
// GetArchivalMetadata return archival metadata
457
func (h *Impl) GetArchivalMetadata() archiver.ArchivalMetadata {
1,013✔
458
        return h.archivalMetadata
1,013✔
459
}
1,013✔
460

461
// GetArchiverProvider return archival provider
462
func (h *Impl) GetArchiverProvider() provider.ArchiverProvider {
96✔
463
        return h.archiverProvider
96✔
464
}
96✔
465

466
// GetDomainReplicationQueue return domain replication queue
467
func (h *Impl) GetDomainReplicationQueue() domain.ReplicationQueue {
27✔
468
        return h.domainReplicationQueue
27✔
469
}
27✔
470

471
// GetMembershipResolver return the membership resolver
472
func (h *Impl) GetMembershipResolver() membership.Resolver {
432✔
473
        return h.membershipResolver
432✔
474
}
432✔
475

476
// internal services clients
477

478
// GetSDKClient return sdk client
479
func (h *Impl) GetSDKClient() workflowserviceclient.Interface {
147✔
480
        return h.sdkClient
147✔
481
}
147✔
482

483
// GetFrontendRawClient return frontend client without retry policy
484
func (h *Impl) GetFrontendRawClient() frontend.Client {
×
485
        return h.frontendRawClient
×
486
}
×
487

488
// GetFrontendClient return frontend client with retry policy
489
func (h *Impl) GetFrontendClient() frontend.Client {
×
490
        return h.frontendClient
×
491
}
×
492

493
// GetMatchingRawClient return matching client without retry policy
494
func (h *Impl) GetMatchingRawClient() matching.Client {
63✔
495
        return h.matchingRawClient
63✔
496
}
63✔
497

498
// GetMatchingClient return matching client with retry policy
499
func (h *Impl) GetMatchingClient() matching.Client {
2,014✔
500
        return h.matchingClient
2,014✔
501
}
2,014✔
502

503
// GetHistoryRawClient return history client without retry policy
504
func (h *Impl) GetHistoryRawClient() history.Client {
×
505
        return h.historyRawClient
×
506
}
×
507

508
// GetHistoryClient return history client with retry policy
509
func (h *Impl) GetHistoryClient() history.Client {
4,114✔
510
        return h.historyClient
4,114✔
511
}
4,114✔
512

513
// GetRemoteAdminClient return remote admin client for given cluster name
514
func (h *Impl) GetRemoteAdminClient(
515
        cluster string,
516
) admin.Client {
×
517

×
518
        return h.clientBean.GetRemoteAdminClient(cluster)
×
519
}
×
520

521
// GetRemoteFrontendClient return remote frontend client for given cluster name
522
func (h *Impl) GetRemoteFrontendClient(
523
        cluster string,
524
) frontend.Client {
×
525

×
526
        return h.clientBean.GetRemoteFrontendClient(cluster)
×
527
}
×
528

529
// GetClientBean return RPC client bean
530
func (h *Impl) GetClientBean() client.Bean {
267✔
531
        return h.clientBean
267✔
532
}
267✔
533

534
// persistence clients
535

536
// GetMetadataManager return metadata manager
537
func (h *Impl) GetDomainManager() persistence.DomainManager {
51✔
538
        return h.persistenceBean.GetDomainManager()
51✔
539
}
51✔
540

541
// GetTaskManager return task manager
542
func (h *Impl) GetTaskManager() persistence.TaskManager {
15✔
543
        return h.persistenceBean.GetTaskManager()
15✔
544
}
15✔
545

546
// GetVisibilityManager return visibility manager
547
func (h *Impl) GetVisibilityManager() persistence.VisibilityManager {
504✔
548
        return h.persistenceBean.GetVisibilityManager()
504✔
549
}
504✔
550

551
// GetShardManager return shard manager
552
func (h *Impl) GetShardManager() persistence.ShardManager {
167✔
553
        return h.persistenceBean.GetShardManager()
167✔
554
}
167✔
555

556
// GetHistoryManager return history manager
557
func (h *Impl) GetHistoryManager() persistence.HistoryManager {
5,675✔
558
        return h.persistenceBean.GetHistoryManager()
5,675✔
559
}
5,675✔
560

561
// GetExecutionManager return execution manager for given shard ID
562
func (h *Impl) GetExecutionManager(
563
        shardID int,
564
) (persistence.ExecutionManager, error) {
51✔
565

51✔
566
        return h.persistenceBean.GetExecutionManager(shardID)
51✔
567
}
51✔
568

569
// GetPersistenceBean return persistence bean
570
func (h *Impl) GetPersistenceBean() persistenceClient.Bean {
×
571
        return h.persistenceBean
×
572
}
×
573

574
func (h *Impl) GetHostName() string {
×
575
        return h.hostName
×
576
}
×
577

578
// loggers
579

580
// GetLogger return logger
581
func (h *Impl) GetLogger() log.Logger {
39,673✔
582
        return h.logger
39,673✔
583
}
39,673✔
584

585
// GetThrottledLogger return throttled logger
586
func (h *Impl) GetThrottledLogger() log.Logger {
6,217✔
587
        return h.throttledLogger
6,217✔
588
}
6,217✔
589

590
// GetDispatcher return YARPC dispatcher, used for registering handlers
591
func (h *Impl) GetDispatcher() *yarpc.Dispatcher {
135✔
592
        return h.dispatcher
135✔
593
}
135✔
594

595
// GetIsolationGroupState returns the isolationGroupState or nil
596
func (h *Impl) GetIsolationGroupState() isolationgroup.State {
2,878✔
597
        return h.isolationGroups
2,878✔
598
}
2,878✔
599

600
// GetPartitioner returns the partitioner or nil
601
func (h *Impl) GetPartitioner() partition.Partitioner {
15✔
602
        return h.partitioner
15✔
603
}
15✔
604

605
// GetIsolationGroupStore returns the isolation group configuration store or nil
606
func (h *Impl) GetIsolationGroupStore() configstore.Client {
15✔
607
        return h.isolationGroupConfigStore
15✔
608
}
15✔
609

610
// due to the config store being only available for some
611
// persistence layers, *both* the configStoreClient and IsolationGroupState
612
// will be optionally available
613
func createConfigStoreOrDefault(
614
        params *Params,
615
        dc *dynamicconfig.Collection,
616
) configstore.Client {
39✔
617

39✔
618
        if params.IsolationGroupStore != nil {
39✔
619
                return params.IsolationGroupStore
×
620
        }
×
621
        cscConfig := &csc.ClientConfig{
39✔
622
                PollInterval:        dc.GetDurationProperty(dynamicconfig.IsolationGroupStateRefreshInterval)(),
39✔
623
                UpdateRetryAttempts: dc.GetIntProperty(dynamicconfig.IsolationGroupStateUpdateRetryAttempts)(),
39✔
624
                FetchTimeout:        dc.GetDurationProperty(dynamicconfig.IsolationGroupStateFetchTimeout)(),
39✔
625
                UpdateTimeout:       dc.GetDurationProperty(dynamicconfig.IsolationGroupStateUpdateTimeout)(),
39✔
626
        }
39✔
627
        cfgStoreClient, err := configstore.NewConfigStoreClient(cscConfig, &params.PersistenceConfig, params.Logger, persistence.GlobalIsolationGroupConfig)
39✔
628
        if err != nil {
65✔
629
                // not possible to create the client under some persistence configurations, so this is expected
26✔
630
                params.Logger.Warn("not instantiating Isolation group config store, this feature will not be enabled", tag.Error(err))
26✔
631
                return nil
26✔
632
        }
26✔
633
        return cfgStoreClient
13✔
634
}
635

636
// Use the provided IsolationGroupStateHandler or the default one
637
// due to the config store being only available for some
638
// persistence layers, *both* the configStoreClient and IsolationGroupState
639
// will be optionally available
640
func createIsolationGroupStateHandlerOrDefault(
641
        params *Params,
642
        dc *dynamicconfig.Collection,
643
        domainCache cache.DomainCache,
644
        isolationGroupStore dynamicconfig.Client,
645
) isolationgroup.State {
39✔
646

39✔
647
        if params.IsolationGroupState != nil {
39✔
648
                return params.IsolationGroupState
×
649
        }
×
650

651
        if isolationGroupStore == nil {
65✔
652
                return nil
26✔
653
        }
26✔
654

655
        ig, err := defaultisolationgroupstate.NewDefaultIsolationGroupStateWatcherWithConfigStoreClient(
13✔
656
                params.Logger,
13✔
657
                dc,
13✔
658
                domainCache,
13✔
659
                isolationGroupStore,
13✔
660
        )
13✔
661
        if err != nil {
13✔
662
                params.Logger.Error("failed to load up isolation-group", tag.Error(err))
×
663
                return nil
×
664
        }
×
665
        return ig
13✔
666
}
667

668
// Use the provided partitioner or the default one
669
func createPartitionerOrDefault(params *Params, dc *dynamicconfig.Collection, state isolationgroup.State) partition.Partitioner {
39✔
670
        if params.Partitioner != nil {
39✔
671
                return params.Partitioner
×
672
        }
×
673
        if state == nil {
65✔
674
                return nil
26✔
675
        }
26✔
676
        cfg := partition.Config{
13✔
677
                IsolationGroupEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation),
13✔
678
        }
13✔
679
        return partition.NewDefaultPartitioner(params.Logger, state, cfg)
13✔
680
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc