• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01880345-b8b6-4ce5-a8af-9d71e4931c2a

10 May 2023 01:47AM UTC coverage: 57.193% (-0.02%) from 57.212%
01880345-b8b6-4ce5-a8af-9d71e4931c2a

push

buildkite

GitHub
Add helper function to store/retrieve partition config from context (#5195) (#5271)

103 of 103 new or added lines in 4 files covered. (100.0%)

86187 of 150696 relevant lines covered (57.19%)

2409.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.18
/service/frontend/clusterRedirectionHandler.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package frontend
22

23
import (
24
        "context"
25
        "time"
26

27
        "go.uber.org/yarpc"
28

29
        "github.com/uber/cadence/common"
30
        "github.com/uber/cadence/common/config"
31
        "github.com/uber/cadence/common/log"
32
        "github.com/uber/cadence/common/metrics"
33
        "github.com/uber/cadence/common/resource"
34
        "github.com/uber/cadence/common/types"
35
)
36

37
var _ Handler = (*ClusterRedirectionHandlerImpl)(nil)
38

39
type (
40
        // ClusterRedirectionHandlerImpl is simple wrapper over frontend service, doing redirection based on policy for global domains not being active in current cluster
41
        ClusterRedirectionHandlerImpl struct {
42
                resource.Resource
43

44
                currentClusterName string
45
                redirectionPolicy  ClusterRedirectionPolicy
46
                tokenSerializer    common.TaskTokenSerializer
47
                frontendHandler    Handler
48
                callOptions        []yarpc.CallOption
49
        }
50
)
51

52
// NewClusterRedirectionHandler creates a frontend handler to handle cluster redirection for global domains not being active in current cluster
53
func NewClusterRedirectionHandler(
54
        wfHandler Handler,
55
        resource resource.Resource,
56
        config *Config,
57
        policy config.ClusterRedirectionPolicy,
58
) *ClusterRedirectionHandlerImpl {
48✔
59
        dcRedirectionPolicy := RedirectionPolicyGenerator(
48✔
60
                resource.GetClusterMetadata(),
48✔
61
                config,
48✔
62
                resource.GetDomainCache(),
48✔
63
                policy,
48✔
64
        )
48✔
65

48✔
66
        return &ClusterRedirectionHandlerImpl{
48✔
67
                Resource:           resource,
48✔
68
                currentClusterName: resource.GetClusterMetadata().GetCurrentClusterName(),
48✔
69
                redirectionPolicy:  dcRedirectionPolicy,
48✔
70
                tokenSerializer:    common.NewJSONTaskTokenSerializer(),
48✔
71
                frontendHandler:    wfHandler,
48✔
72
                callOptions:        []yarpc.CallOption{yarpc.WithHeader(common.AutoforwardingClusterHeaderName, resource.GetClusterMetadata().GetCurrentClusterName())},
48✔
73
        }
48✔
74
}
48✔
75

76
// Health is for health check
77
func (handler *ClusterRedirectionHandlerImpl) Health(ctx context.Context) (*types.HealthStatus, error) {
×
78
        return handler.frontendHandler.Health(ctx)
×
79
}
×
80

81
// Domain APIs, domain APIs does not require redirection
82

83
// DeprecateDomain API call
84
func (handler *ClusterRedirectionHandlerImpl) DeprecateDomain(
85
        ctx context.Context,
86
        request *types.DeprecateDomainRequest,
87
) (retError error) {
×
88

×
89
        var cluster = handler.currentClusterName
×
90

×
91
        scope, startTime := handler.beforeCall(metrics.DCRedirectionDeprecateDomainScope)
×
92
        defer func() {
×
93
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
×
94
        }()
×
95

96
        return handler.frontendHandler.DeprecateDomain(ctx, request)
×
97
}
98

99
// DescribeDomain API call
100
func (handler *ClusterRedirectionHandlerImpl) DescribeDomain(
101
        ctx context.Context,
102
        request *types.DescribeDomainRequest,
103
) (resp *types.DescribeDomainResponse, retError error) {
132✔
104

132✔
105
        var cluster = handler.currentClusterName
132✔
106

132✔
107
        scope, startTime := handler.beforeCall(metrics.DCRedirectionDescribeDomainScope)
132✔
108
        defer func() {
264✔
109
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
132✔
110
        }()
132✔
111

112
        return handler.frontendHandler.DescribeDomain(ctx, request)
132✔
113
}
114

115
// ListDomains API call
116
func (handler *ClusterRedirectionHandlerImpl) ListDomains(
117
        ctx context.Context,
118
        request *types.ListDomainsRequest,
119
) (resp *types.ListDomainsResponse, retError error) {
×
120

×
121
        var cluster = handler.currentClusterName
×
122

×
123
        scope, startTime := handler.beforeCall(metrics.DCRedirectionListDomainsScope)
×
124
        defer func() {
×
125
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
×
126
        }()
×
127

128
        return handler.frontendHandler.ListDomains(ctx, request)
×
129
}
130

131
// RegisterDomain API call
132
func (handler *ClusterRedirectionHandlerImpl) RegisterDomain(
133
        ctx context.Context,
134
        request *types.RegisterDomainRequest,
135
) (retError error) {
39✔
136

39✔
137
        var cluster = handler.currentClusterName
39✔
138

39✔
139
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRegisterDomainScope)
39✔
140
        defer func() {
78✔
141
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
39✔
142
        }()
39✔
143

144
        return handler.frontendHandler.RegisterDomain(ctx, request)
39✔
145
}
146

147
// UpdateDomain API call
148
func (handler *ClusterRedirectionHandlerImpl) UpdateDomain(
149
        ctx context.Context,
150
        request *types.UpdateDomainRequest,
151
) (resp *types.UpdateDomainResponse, retError error) {
×
152

×
153
        var cluster = handler.currentClusterName
×
154

×
155
        scope, startTime := handler.beforeCall(metrics.DCRedirectionUpdateDomainScope)
×
156
        defer func() {
×
157
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
×
158
        }()
×
159

160
        return handler.frontendHandler.UpdateDomain(ctx, request)
×
161
}
162

163
// Other APIs
164

165
// DescribeTaskList API call
166
func (handler *ClusterRedirectionHandlerImpl) DescribeTaskList(
167
        ctx context.Context,
168
        request *types.DescribeTaskListRequest,
169
) (resp *types.DescribeTaskListResponse, retError error) {
19✔
170

19✔
171
        var apiName = "DescribeTaskList"
19✔
172
        var err error
19✔
173
        var cluster string
19✔
174

19✔
175
        scope, startTime := handler.beforeCall(metrics.DCRedirectionDescribeTaskListScope)
19✔
176
        defer func() {
38✔
177
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
19✔
178
        }()
19✔
179

180
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
39✔
181
                cluster = targetDC
20✔
182
                switch {
20✔
183
                case targetDC == handler.currentClusterName:
19✔
184
                        resp, err = handler.frontendHandler.DescribeTaskList(ctx, request)
19✔
185
                default:
1✔
186
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
187
                        resp, err = remoteClient.DescribeTaskList(ctx, request, handler.callOptions...)
1✔
188
                }
189
                return err
20✔
190
        })
191

192
        return resp, err
19✔
193
}
194

195
// DescribeWorkflowExecution API call
196
func (handler *ClusterRedirectionHandlerImpl) DescribeWorkflowExecution(
197
        ctx context.Context,
198
        request *types.DescribeWorkflowExecutionRequest,
199
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
94✔
200

94✔
201
        var apiName = "DescribeWorkflowExecution"
94✔
202
        var err error
94✔
203
        var cluster string
94✔
204

94✔
205
        scope, startTime := handler.beforeCall(metrics.DCRedirectionDescribeWorkflowExecutionScope)
94✔
206
        defer func() {
188✔
207
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
94✔
208
        }()
94✔
209

210
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
189✔
211
                cluster = targetDC
95✔
212
                switch {
95✔
213
                case targetDC == handler.currentClusterName:
94✔
214
                        resp, err = handler.frontendHandler.DescribeWorkflowExecution(ctx, request)
94✔
215
                default:
1✔
216
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
217
                        resp, err = remoteClient.DescribeWorkflowExecution(ctx, request, handler.callOptions...)
1✔
218
                }
219
                return err
95✔
220
        })
221

222
        return resp, err
94✔
223
}
224

225
// GetWorkflowExecutionHistory API call
226
func (handler *ClusterRedirectionHandlerImpl) GetWorkflowExecutionHistory(
227
        ctx context.Context,
228
        request *types.GetWorkflowExecutionHistoryRequest,
229
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
452✔
230

452✔
231
        var apiName = "GetWorkflowExecutionHistory"
452✔
232
        var err error
452✔
233
        var cluster string
452✔
234

452✔
235
        scope, startTime := handler.beforeCall(metrics.DCRedirectionGetWorkflowExecutionHistoryScope)
452✔
236
        defer func() {
904✔
237
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
452✔
238
        }()
452✔
239

240
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
905✔
241
                cluster = targetDC
453✔
242
                switch {
453✔
243
                case targetDC == handler.currentClusterName:
452✔
244
                        resp, err = handler.frontendHandler.GetWorkflowExecutionHistory(ctx, request)
452✔
245
                default:
1✔
246
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
247
                        resp, err = remoteClient.GetWorkflowExecutionHistory(ctx, request, handler.callOptions...)
1✔
248
                }
249
                return err
453✔
250
        })
251

252
        return resp, err
452✔
253
}
254

255
// ListArchivedWorkflowExecutions API call
256
func (handler *ClusterRedirectionHandlerImpl) ListArchivedWorkflowExecutions(
257
        ctx context.Context,
258
        request *types.ListArchivedWorkflowExecutionsRequest,
259
) (resp *types.ListArchivedWorkflowExecutionsResponse, retError error) {
10✔
260

10✔
261
        var apiName = "ListArchivedWorkflowExecutions"
10✔
262
        var err error
10✔
263
        var cluster string
10✔
264

10✔
265
        scope, startTime := handler.beforeCall(metrics.DCRedirectionListArchivedWorkflowExecutionsScope)
10✔
266
        defer func() {
20✔
267
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
10✔
268
        }()
10✔
269

270
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
21✔
271
                cluster = targetDC
11✔
272
                switch {
11✔
273
                case targetDC == handler.currentClusterName:
10✔
274
                        resp, err = handler.frontendHandler.ListArchivedWorkflowExecutions(ctx, request)
10✔
275
                default:
1✔
276
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
277
                        resp, err = remoteClient.ListArchivedWorkflowExecutions(ctx, request, handler.callOptions...)
1✔
278
                }
279
                return err
11✔
280
        })
281

282
        return resp, err
10✔
283
}
284

285
// ListClosedWorkflowExecutions API call
286
func (handler *ClusterRedirectionHandlerImpl) ListClosedWorkflowExecutions(
287
        ctx context.Context,
288
        request *types.ListClosedWorkflowExecutionsRequest,
289
) (resp *types.ListClosedWorkflowExecutionsResponse, retError error) {
25✔
290

25✔
291
        var apiName = "ListClosedWorkflowExecutions"
25✔
292
        var err error
25✔
293
        var cluster string
25✔
294

25✔
295
        scope, startTime := handler.beforeCall(metrics.DCRedirectionListClosedWorkflowExecutionsScope)
25✔
296
        defer func() {
50✔
297
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
25✔
298
        }()
25✔
299

300
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
51✔
301
                cluster = targetDC
26✔
302
                switch {
26✔
303
                case targetDC == handler.currentClusterName:
25✔
304
                        resp, err = handler.frontendHandler.ListClosedWorkflowExecutions(ctx, request)
25✔
305
                default:
1✔
306
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
307
                        resp, err = remoteClient.ListClosedWorkflowExecutions(ctx, request, handler.callOptions...)
1✔
308
                }
309
                return err
26✔
310
        })
311

312
        return resp, err
25✔
313
}
314

315
// ListOpenWorkflowExecutions API call
316
func (handler *ClusterRedirectionHandlerImpl) ListOpenWorkflowExecutions(
317
        ctx context.Context,
318
        request *types.ListOpenWorkflowExecutionsRequest,
319
) (resp *types.ListOpenWorkflowExecutionsResponse, retError error) {
103✔
320

103✔
321
        var apiName = "ListOpenWorkflowExecutions"
103✔
322
        var err error
103✔
323
        var cluster string
103✔
324

103✔
325
        scope, startTime := handler.beforeCall(metrics.DCRedirectionListOpenWorkflowExecutionsScope)
103✔
326
        defer func() {
206✔
327
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
103✔
328
        }()
103✔
329

330
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
207✔
331
                cluster = targetDC
104✔
332
                switch {
104✔
333
                case targetDC == handler.currentClusterName:
103✔
334
                        resp, err = handler.frontendHandler.ListOpenWorkflowExecutions(ctx, request)
103✔
335
                default:
1✔
336
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
337
                        resp, err = remoteClient.ListOpenWorkflowExecutions(ctx, request, handler.callOptions...)
1✔
338
                }
339
                return err
104✔
340
        })
341

342
        return resp, err
103✔
343
}
344

345
// ListWorkflowExecutions API call
346
func (handler *ClusterRedirectionHandlerImpl) ListWorkflowExecutions(
347
        ctx context.Context,
348
        request *types.ListWorkflowExecutionsRequest,
349
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
140✔
350

140✔
351
        var apiName = "ListWorkflowExecutions"
140✔
352
        var err error
140✔
353
        var cluster string
140✔
354

140✔
355
        scope, startTime := handler.beforeCall(metrics.DCRedirectionListWorkflowExecutionsScope)
140✔
356
        defer func() {
280✔
357
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
140✔
358
        }()
140✔
359

360
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
281✔
361
                cluster = targetDC
141✔
362
                switch {
141✔
363
                case targetDC == handler.currentClusterName:
140✔
364
                        resp, err = handler.frontendHandler.ListWorkflowExecutions(ctx, request)
140✔
365
                default:
1✔
366
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
367
                        resp, err = remoteClient.ListWorkflowExecutions(ctx, request, handler.callOptions...)
1✔
368
                }
369
                return err
141✔
370
        })
371

372
        return resp, err
140✔
373
}
374

375
// ScanWorkflowExecutions API call
376
func (handler *ClusterRedirectionHandlerImpl) ScanWorkflowExecutions(
377
        ctx context.Context,
378
        request *types.ListWorkflowExecutionsRequest,
379
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
33✔
380

33✔
381
        var apiName = "ScanWorkflowExecutions"
33✔
382
        var err error
33✔
383
        var cluster string
33✔
384

33✔
385
        scope, startTime := handler.beforeCall(metrics.DCRedirectionScanWorkflowExecutionsScope)
33✔
386
        defer func() {
66✔
387
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
33✔
388
        }()
33✔
389
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
67✔
390
                cluster = targetDC
34✔
391
                switch {
34✔
392
                case targetDC == handler.currentClusterName:
33✔
393
                        resp, err = handler.frontendHandler.ScanWorkflowExecutions(ctx, request)
33✔
394
                default:
1✔
395
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
396
                        resp, err = remoteClient.ScanWorkflowExecutions(ctx, request, handler.callOptions...)
1✔
397
                }
398
                return err
34✔
399
        })
400

401
        return resp, err
33✔
402
}
403

404
// CountWorkflowExecutions API call
405
func (handler *ClusterRedirectionHandlerImpl) CountWorkflowExecutions(
406
        ctx context.Context,
407
        request *types.CountWorkflowExecutionsRequest,
408
) (resp *types.CountWorkflowExecutionsResponse, retError error) {
13✔
409

13✔
410
        var apiName = "CountWorkflowExecutions"
13✔
411
        var err error
13✔
412
        var cluster string
13✔
413

13✔
414
        scope, startTime := handler.beforeCall(metrics.DCRedirectionCountWorkflowExecutionsScope)
13✔
415
        defer func() {
26✔
416
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
13✔
417
        }()
13✔
418

419
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
27✔
420
                cluster = targetDC
14✔
421
                switch {
14✔
422
                case targetDC == handler.currentClusterName:
13✔
423
                        resp, err = handler.frontendHandler.CountWorkflowExecutions(ctx, request)
13✔
424
                default:
1✔
425
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
426
                        resp, err = remoteClient.CountWorkflowExecutions(ctx, request, handler.callOptions...)
1✔
427
                }
428
                return err
14✔
429
        })
430

431
        return resp, err
13✔
432
}
433

434
// GetSearchAttributes API call
435
func (handler *ClusterRedirectionHandlerImpl) GetSearchAttributes(
436
        ctx context.Context,
437
) (resp *types.GetSearchAttributesResponse, retError error) {
×
438

×
439
        var cluster = handler.currentClusterName
×
440

×
441
        scope, startTime := handler.beforeCall(metrics.DCRedirectionGetSearchAttributesScope)
×
442
        defer func() {
×
443
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
×
444
        }()
×
445

446
        return handler.frontendHandler.GetSearchAttributes(ctx)
×
447
}
448

449
// PollForActivityTask API call
450
func (handler *ClusterRedirectionHandlerImpl) PollForActivityTask(
451
        ctx context.Context,
452
        request *types.PollForActivityTaskRequest,
453
) (resp *types.PollForActivityTaskResponse, retError error) {
768✔
454

768✔
455
        var apiName = "PollForActivityTask"
768✔
456
        var err error
768✔
457
        var cluster string
768✔
458

768✔
459
        scope, startTime := handler.beforeCall(metrics.DCRedirectionPollForActivityTaskScope)
768✔
460
        defer func() {
1,536✔
461
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
768✔
462
        }()
768✔
463

464
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
1,537✔
465
                cluster = targetDC
769✔
466
                switch {
769✔
467
                case targetDC == handler.currentClusterName:
768✔
468
                        resp, err = handler.frontendHandler.PollForActivityTask(ctx, request)
768✔
469
                default:
1✔
470
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
471
                        resp, err = remoteClient.PollForActivityTask(ctx, request, handler.callOptions...)
1✔
472
                }
473
                return err
769✔
474
        })
475

476
        return resp, err
768✔
477
}
478

479
// PollForDecisionTask API call
480
func (handler *ClusterRedirectionHandlerImpl) PollForDecisionTask(
481
        ctx context.Context,
482
        request *types.PollForDecisionTaskRequest,
483
) (resp *types.PollForDecisionTaskResponse, retError error) {
1,549✔
484

1,549✔
485
        var apiName = "PollForDecisionTask"
1,549✔
486
        var err error
1,549✔
487
        var cluster string
1,549✔
488

1,549✔
489
        scope, startTime := handler.beforeCall(metrics.DCRedirectionPollForDecisionTaskScope)
1,549✔
490
        defer func() {
3,098✔
491
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
1,549✔
492
        }()
1,549✔
493

494
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
3,099✔
495
                cluster = targetDC
1,550✔
496
                switch {
1,550✔
497
                case targetDC == handler.currentClusterName:
1,549✔
498
                        resp, err = handler.frontendHandler.PollForDecisionTask(ctx, request)
1,549✔
499
                default:
1✔
500
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
501
                        resp, err = remoteClient.PollForDecisionTask(ctx, request, handler.callOptions...)
1✔
502
                }
503
                return err
1,550✔
504
        })
505

506
        return resp, err
1,549✔
507
}
508

509
// QueryWorkflow API call
510
func (handler *ClusterRedirectionHandlerImpl) QueryWorkflow(
511
        ctx context.Context,
512
        request *types.QueryWorkflowRequest,
513
) (resp *types.QueryWorkflowResponse, retError error) {
47✔
514

47✔
515
        var apiName = "QueryWorkflow"
47✔
516
        var err error
47✔
517
        var cluster string
47✔
518

47✔
519
        // Only autoforward strong consistent queries, this is done for two reasons:
47✔
520
        // 1. Query is meant to be fast, autoforwarding all queries will increase latency.
47✔
521
        // 2. If eventual consistency was requested then the results from running out of local dc will be fine.
47✔
522
        if request.GetQueryConsistencyLevel() == types.QueryConsistencyLevelStrong {
60✔
523
                apiName = "QueryWorkflowStrongConsistency"
13✔
524
        }
13✔
525
        scope, startTime := handler.beforeCall(metrics.DCRedirectionQueryWorkflowScope)
47✔
526
        defer func() {
94✔
527
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
47✔
528
        }()
47✔
529

530
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
96✔
531
                cluster = targetDC
49✔
532
                switch {
49✔
533
                case targetDC == handler.currentClusterName:
47✔
534
                        resp, err = handler.frontendHandler.QueryWorkflow(ctx, request)
47✔
535
                default:
2✔
536
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
2✔
537
                        resp, err = remoteClient.QueryWorkflow(ctx, request, handler.callOptions...)
2✔
538
                }
539
                return err
49✔
540
        })
541

542
        return resp, err
47✔
543
}
544

545
// RecordActivityTaskHeartbeat API call
546
func (handler *ClusterRedirectionHandlerImpl) RecordActivityTaskHeartbeat(
547
        ctx context.Context,
548
        request *types.RecordActivityTaskHeartbeatRequest,
549
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
382✔
550

382✔
551
        var apiName = "RecordActivityTaskHeartbeat"
382✔
552
        var err error
382✔
553
        var cluster string
382✔
554

382✔
555
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRecordActivityTaskHeartbeatScope)
382✔
556
        defer func() {
764✔
557
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
382✔
558
        }()
382✔
559

560
        token, err := handler.tokenSerializer.Deserialize(request.TaskToken)
382✔
561
        if err != nil {
382✔
562
                return nil, err
×
563
        }
×
564

565
        err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error {
765✔
566
                cluster = targetDC
383✔
567
                switch {
383✔
568
                case targetDC == handler.currentClusterName:
382✔
569
                        resp, err = handler.frontendHandler.RecordActivityTaskHeartbeat(ctx, request)
382✔
570
                default:
1✔
571
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
572
                        resp, err = remoteClient.RecordActivityTaskHeartbeat(ctx, request, handler.callOptions...)
1✔
573
                }
574
                return err
383✔
575
        })
576

577
        return resp, err
382✔
578
}
579

580
// RecordActivityTaskHeartbeatByID API call
581
func (handler *ClusterRedirectionHandlerImpl) RecordActivityTaskHeartbeatByID(
582
        ctx context.Context,
583
        request *types.RecordActivityTaskHeartbeatByIDRequest,
584
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
1✔
585

1✔
586
        var apiName = "RecordActivityTaskHeartbeatByID"
1✔
587
        var err error
1✔
588
        var cluster string
1✔
589

1✔
590
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRecordActivityTaskHeartbeatByIDScope)
1✔
591
        defer func() {
2✔
592
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
1✔
593
        }()
1✔
594

595
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
3✔
596
                cluster = targetDC
2✔
597
                switch {
2✔
598
                case targetDC == handler.currentClusterName:
1✔
599
                        resp, err = handler.frontendHandler.RecordActivityTaskHeartbeatByID(ctx, request)
1✔
600
                default:
1✔
601
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
602
                        resp, err = remoteClient.RecordActivityTaskHeartbeatByID(ctx, request, handler.callOptions...)
1✔
603
                }
604
                return err
2✔
605
        })
606

607
        return resp, err
1✔
608
}
609

610
// RequestCancelWorkflowExecution API call
611
func (handler *ClusterRedirectionHandlerImpl) RequestCancelWorkflowExecution(
612
        ctx context.Context,
613
        request *types.RequestCancelWorkflowExecutionRequest,
614
) (retError error) {
7✔
615

7✔
616
        var apiName = "RequestCancelWorkflowExecution"
7✔
617
        var err error
7✔
618
        var cluster string
7✔
619

7✔
620
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRequestCancelWorkflowExecutionScope)
7✔
621
        defer func() {
14✔
622
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
7✔
623
        }()
7✔
624

625
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
15✔
626
                cluster = targetDC
8✔
627
                switch {
8✔
628
                case targetDC == handler.currentClusterName:
7✔
629
                        err = handler.frontendHandler.RequestCancelWorkflowExecution(ctx, request)
7✔
630
                default:
1✔
631
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
632
                        err = remoteClient.RequestCancelWorkflowExecution(ctx, request, handler.callOptions...)
1✔
633
                }
634
                return err
8✔
635
        })
636

637
        return err
7✔
638
}
639

640
// ResetStickyTaskList API call
641
func (handler *ClusterRedirectionHandlerImpl) ResetStickyTaskList(
642
        ctx context.Context,
643
        request *types.ResetStickyTaskListRequest,
644
) (resp *types.ResetStickyTaskListResponse, retError error) {
4✔
645

4✔
646
        var apiName = "ResetStickyTaskList"
4✔
647
        var err error
4✔
648
        var cluster string
4✔
649

4✔
650
        scope, startTime := handler.beforeCall(metrics.DCRedirectionResetStickyTaskListScope)
4✔
651
        defer func() {
8✔
652
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
4✔
653
        }()
4✔
654

655
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
9✔
656
                cluster = targetDC
5✔
657
                switch {
5✔
658
                case targetDC == handler.currentClusterName:
4✔
659
                        resp, err = handler.frontendHandler.ResetStickyTaskList(ctx, request)
4✔
660
                default:
1✔
661
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
662
                        resp, err = remoteClient.ResetStickyTaskList(ctx, request, handler.callOptions...)
1✔
663
                }
664
                return err
5✔
665
        })
666

667
        return resp, err
4✔
668
}
669

670
// ResetWorkflowExecution API call
671
func (handler *ClusterRedirectionHandlerImpl) ResetWorkflowExecution(
672
        ctx context.Context,
673
        request *types.ResetWorkflowExecutionRequest,
674
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
16✔
675

16✔
676
        var apiName = "ResetWorkflowExecution"
16✔
677
        var err error
16✔
678
        var cluster string
16✔
679

16✔
680
        scope, startTime := handler.beforeCall(metrics.DCRedirectionResetWorkflowExecutionScope)
16✔
681
        defer func() {
32✔
682
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
16✔
683
        }()
16✔
684

685
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
33✔
686
                cluster = targetDC
17✔
687
                switch {
17✔
688
                case targetDC == handler.currentClusterName:
16✔
689
                        resp, err = handler.frontendHandler.ResetWorkflowExecution(ctx, request)
16✔
690
                default:
1✔
691
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
692
                        resp, err = remoteClient.ResetWorkflowExecution(ctx, request, handler.callOptions...)
1✔
693
                }
694
                return err
17✔
695
        })
696

697
        return resp, err
16✔
698
}
699

700
// RespondActivityTaskCanceled API call
701
func (handler *ClusterRedirectionHandlerImpl) RespondActivityTaskCanceled(
702
        ctx context.Context,
703
        request *types.RespondActivityTaskCanceledRequest,
704
) (retError error) {
1✔
705

1✔
706
        var apiName = "RespondActivityTaskCanceled"
1✔
707
        var err error
1✔
708
        var cluster string
1✔
709

1✔
710
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskCanceledScope)
1✔
711
        defer func() {
2✔
712
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
1✔
713
        }()
1✔
714

715
        token, err := handler.tokenSerializer.Deserialize(request.TaskToken)
1✔
716
        if err != nil {
1✔
717
                return err
×
718
        }
×
719

720
        err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error {
3✔
721
                cluster = targetDC
2✔
722
                switch {
2✔
723
                case targetDC == handler.currentClusterName:
1✔
724
                        err = handler.frontendHandler.RespondActivityTaskCanceled(ctx, request)
1✔
725
                default:
1✔
726
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
727
                        err = remoteClient.RespondActivityTaskCanceled(ctx, request, handler.callOptions...)
1✔
728
                }
729
                return err
2✔
730
        })
731

732
        return err
1✔
733
}
734

735
// RespondActivityTaskCanceledByID API call
736
func (handler *ClusterRedirectionHandlerImpl) RespondActivityTaskCanceledByID(
737
        ctx context.Context,
738
        request *types.RespondActivityTaskCanceledByIDRequest,
739
) (retError error) {
1✔
740

1✔
741
        var apiName = "RespondActivityTaskCanceledByID"
1✔
742
        var err error
1✔
743
        var cluster string
1✔
744

1✔
745
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskCanceledByIDScope)
1✔
746
        defer func() {
2✔
747
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
1✔
748
        }()
1✔
749

750
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
3✔
751
                cluster = targetDC
2✔
752
                switch {
2✔
753
                case targetDC == handler.currentClusterName:
1✔
754
                        err = handler.frontendHandler.RespondActivityTaskCanceledByID(ctx, request)
1✔
755
                default:
1✔
756
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
757
                        err = remoteClient.RespondActivityTaskCanceledByID(ctx, request, handler.callOptions...)
1✔
758
                }
759
                return err
2✔
760
        })
761

762
        return err
1✔
763
}
764

765
// RespondActivityTaskCompleted API call
766
func (handler *ClusterRedirectionHandlerImpl) RespondActivityTaskCompleted(
767
        ctx context.Context,
768
        request *types.RespondActivityTaskCompletedRequest,
769
) (retError error) {
247✔
770

247✔
771
        var apiName = "RespondActivityTaskCompleted"
247✔
772
        var err error
247✔
773
        var cluster string
247✔
774

247✔
775
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskCompletedScope)
247✔
776
        defer func() {
494✔
777
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
247✔
778
        }()
247✔
779

780
        token, err := handler.tokenSerializer.Deserialize(request.TaskToken)
247✔
781
        if err != nil {
247✔
782
                return err
×
783
        }
×
784

785
        err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error {
495✔
786
                cluster = targetDC
248✔
787
                switch {
248✔
788
                case targetDC == handler.currentClusterName:
247✔
789
                        err = handler.frontendHandler.RespondActivityTaskCompleted(ctx, request)
247✔
790
                default:
1✔
791
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
792
                        err = remoteClient.RespondActivityTaskCompleted(ctx, request, handler.callOptions...)
1✔
793
                }
794
                return err
248✔
795
        })
796

797
        return err
247✔
798
}
799

800
// RespondActivityTaskCompletedByID API call
801
func (handler *ClusterRedirectionHandlerImpl) RespondActivityTaskCompletedByID(
802
        ctx context.Context,
803
        request *types.RespondActivityTaskCompletedByIDRequest,
804
) (retError error) {
76✔
805

76✔
806
        var apiName = "RespondActivityTaskCompletedByID"
76✔
807
        var err error
76✔
808
        var cluster string
76✔
809

76✔
810
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskCompletedByIDScope)
76✔
811
        defer func() {
152✔
812
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
76✔
813
        }()
76✔
814

815
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
153✔
816
                cluster = targetDC
77✔
817
                switch {
77✔
818
                case targetDC == handler.currentClusterName:
76✔
819
                        err = handler.frontendHandler.RespondActivityTaskCompletedByID(ctx, request)
76✔
820
                default:
1✔
821
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
822
                        err = remoteClient.RespondActivityTaskCompletedByID(ctx, request, handler.callOptions...)
1✔
823
                }
824
                return err
77✔
825
        })
826

827
        return err
76✔
828
}
829

830
// RespondActivityTaskFailed API call
831
func (handler *ClusterRedirectionHandlerImpl) RespondActivityTaskFailed(
832
        ctx context.Context,
833
        request *types.RespondActivityTaskFailedRequest,
834
) (retError error) {
13✔
835

13✔
836
        var apiName = "RespondActivityTaskFailed"
13✔
837
        var err error
13✔
838
        var cluster string
13✔
839

13✔
840
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskFailedScope)
13✔
841
        defer func() {
26✔
842
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
13✔
843
        }()
13✔
844

845
        token, err := handler.tokenSerializer.Deserialize(request.TaskToken)
13✔
846
        if err != nil {
13✔
847
                return err
×
848
        }
×
849

850
        err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error {
27✔
851
                cluster = targetDC
14✔
852
                switch {
14✔
853
                case targetDC == handler.currentClusterName:
13✔
854
                        err = handler.frontendHandler.RespondActivityTaskFailed(ctx, request)
13✔
855
                default:
1✔
856
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
857
                        err = remoteClient.RespondActivityTaskFailed(ctx, request, handler.callOptions...)
1✔
858
                }
859
                return err
14✔
860
        })
861

862
        return err
13✔
863
}
864

865
// RespondActivityTaskFailedByID API call
866
func (handler *ClusterRedirectionHandlerImpl) RespondActivityTaskFailedByID(
867
        ctx context.Context,
868
        request *types.RespondActivityTaskFailedByIDRequest,
869
) (retError error) {
1✔
870

1✔
871
        var apiName = "RespondActivityTaskFailedByID"
1✔
872
        var err error
1✔
873
        var cluster string
1✔
874

1✔
875
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskFailedByIDScope)
1✔
876
        defer func() {
2✔
877
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
1✔
878
        }()
1✔
879

880
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
3✔
881
                cluster = targetDC
2✔
882
                switch {
2✔
883
                case targetDC == handler.currentClusterName:
1✔
884
                        err = handler.frontendHandler.RespondActivityTaskFailedByID(ctx, request)
1✔
885
                default:
1✔
886
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
887
                        err = remoteClient.RespondActivityTaskFailedByID(ctx, request, handler.callOptions...)
1✔
888
                }
889
                return err
2✔
890
        })
891

892
        return err
1✔
893
}
894

895
// RespondDecisionTaskCompleted API call
896
func (handler *ClusterRedirectionHandlerImpl) RespondDecisionTaskCompleted(
897
        ctx context.Context,
898
        request *types.RespondDecisionTaskCompletedRequest,
899
) (resp *types.RespondDecisionTaskCompletedResponse, retError error) {
933✔
900

933✔
901
        var apiName = "RespondDecisionTaskCompleted"
933✔
902
        var err error
933✔
903
        var cluster string
933✔
904

933✔
905
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondDecisionTaskCompletedScope)
933✔
906
        defer func() {
1,866✔
907
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
933✔
908
        }()
933✔
909

910
        token, err := handler.tokenSerializer.Deserialize(request.TaskToken)
933✔
911
        if err != nil {
933✔
912
                return nil, err
×
913
        }
×
914

915
        err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error {
1,867✔
916
                cluster = targetDC
934✔
917
                switch {
934✔
918
                case targetDC == handler.currentClusterName:
933✔
919
                        resp, err = handler.frontendHandler.RespondDecisionTaskCompleted(ctx, request)
933✔
920
                default:
1✔
921
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
922
                        resp, err = remoteClient.RespondDecisionTaskCompleted(ctx, request, handler.callOptions...)
1✔
923
                }
924
                return err
934✔
925
        })
926

927
        return resp, err
933✔
928
}
929

930
// RespondDecisionTaskFailed API call
931
func (handler *ClusterRedirectionHandlerImpl) RespondDecisionTaskFailed(
932
        ctx context.Context,
933
        request *types.RespondDecisionTaskFailedRequest,
934
) (retError error) {
160✔
935

160✔
936
        var apiName = "RespondDecisionTaskFailed"
160✔
937
        var err error
160✔
938
        var cluster string
160✔
939

160✔
940
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondDecisionTaskFailedScope)
160✔
941
        defer func() {
320✔
942
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
160✔
943
        }()
160✔
944

945
        token, err := handler.tokenSerializer.Deserialize(request.TaskToken)
160✔
946
        if err != nil {
160✔
947
                return err
×
948
        }
×
949

950
        err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error {
321✔
951
                cluster = targetDC
161✔
952
                switch {
161✔
953
                case targetDC == handler.currentClusterName:
160✔
954
                        err = handler.frontendHandler.RespondDecisionTaskFailed(ctx, request)
160✔
955
                default:
1✔
956
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
957
                        err = remoteClient.RespondDecisionTaskFailed(ctx, request, handler.callOptions...)
1✔
958
                }
959
                return err
161✔
960
        })
961

962
        return err
160✔
963
}
964

965
// RespondQueryTaskCompleted API call
966
func (handler *ClusterRedirectionHandlerImpl) RespondQueryTaskCompleted(
967
        ctx context.Context,
968
        request *types.RespondQueryTaskCompletedRequest,
969
) (retError error) {
31✔
970

31✔
971
        var apiName = "RespondQueryTaskCompleted"
31✔
972
        var err error
31✔
973
        var cluster string
31✔
974

31✔
975
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondQueryTaskCompletedScope)
31✔
976
        defer func() {
62✔
977
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
31✔
978
        }()
31✔
979

980
        token, err := handler.tokenSerializer.DeserializeQueryTaskToken(request.TaskToken)
31✔
981
        if err != nil {
31✔
982
                return err
×
983
        }
×
984

985
        err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error {
63✔
986
                cluster = targetDC
32✔
987
                switch {
32✔
988
                case targetDC == handler.currentClusterName:
31✔
989
                        err = handler.frontendHandler.RespondQueryTaskCompleted(ctx, request)
31✔
990
                default:
1✔
991
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
992
                        err = remoteClient.RespondQueryTaskCompleted(ctx, request, handler.callOptions...)
1✔
993
                }
994
                return err
32✔
995
        })
996

997
        return err
31✔
998
}
999

1000
func (handler *ClusterRedirectionHandlerImpl) RestartWorkflowExecution(ctx context.Context, request *types.RestartWorkflowExecutionRequest) (resp *types.RestartWorkflowExecutionResponse, retError error) {
×
1001
        var apiName = "RestartWorkflowExecution"
×
1002
        var err error
×
1003
        var cluster string
×
1004

×
1005
        scope, startTime := handler.beforeCall(metrics.DCRedirectionStartWorkflowExecutionScope)
×
1006
        defer func() {
×
1007
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
×
1008
        }()
×
1009

1010
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
×
1011
                cluster = targetDC
×
1012
                switch {
×
1013
                case targetDC == handler.currentClusterName:
×
1014
                        resp, err = handler.frontendHandler.RestartWorkflowExecution(ctx, request)
×
1015
                default:
×
1016
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
×
1017
                        resp, err = remoteClient.RestartWorkflowExecution(ctx, request, handler.callOptions...)
×
1018
                }
1019
                return err
×
1020
        })
1021

1022
        return resp, err
×
1023
}
1024

1025
// SignalWithStartWorkflowExecution API call
1026
func (handler *ClusterRedirectionHandlerImpl) SignalWithStartWorkflowExecution(
1027
        ctx context.Context,
1028
        request *types.SignalWithStartWorkflowExecutionRequest,
1029
) (resp *types.StartWorkflowExecutionResponse, retError error) {
34✔
1030

34✔
1031
        var apiName = "SignalWithStartWorkflowExecution"
34✔
1032
        var err error
34✔
1033
        var cluster string
34✔
1034

34✔
1035
        scope, startTime := handler.beforeCall(metrics.DCRedirectionSignalWithStartWorkflowExecutionScope)
34✔
1036
        defer func() {
68✔
1037
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
34✔
1038
        }()
34✔
1039

1040
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
69✔
1041
                cluster = targetDC
35✔
1042
                switch {
35✔
1043
                case targetDC == handler.currentClusterName:
34✔
1044
                        resp, err = handler.frontendHandler.SignalWithStartWorkflowExecution(ctx, request)
34✔
1045
                default:
1✔
1046
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
1047
                        resp, err = remoteClient.SignalWithStartWorkflowExecution(ctx, request, handler.callOptions...)
1✔
1048
                }
1049
                return err
35✔
1050
        })
1051

1052
        return resp, err
34✔
1053
}
1054

1055
// SignalWorkflowExecution API call
1056
func (handler *ClusterRedirectionHandlerImpl) SignalWorkflowExecution(
1057
        ctx context.Context,
1058
        request *types.SignalWorkflowExecutionRequest,
1059
) (retError error) {
724✔
1060

724✔
1061
        var apiName = "SignalWorkflowExecution"
724✔
1062
        var err error
724✔
1063
        var cluster string
724✔
1064

724✔
1065
        scope, startTime := handler.beforeCall(metrics.DCRedirectionSignalWorkflowExecutionScope)
724✔
1066
        defer func() {
1,448✔
1067
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
724✔
1068
        }()
724✔
1069

1070
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
1,449✔
1071
                cluster = targetDC
725✔
1072
                switch {
725✔
1073
                case targetDC == handler.currentClusterName:
724✔
1074
                        err = handler.frontendHandler.SignalWorkflowExecution(ctx, request)
724✔
1075
                default:
1✔
1076
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
1077
                        err = remoteClient.SignalWorkflowExecution(ctx, request, handler.callOptions...)
1✔
1078
                }
1079
                return err
725✔
1080
        })
1081
        return err
724✔
1082
}
1083

1084
// StartWorkflowExecution API call
1085
func (handler *ClusterRedirectionHandlerImpl) StartWorkflowExecution(
1086
        ctx context.Context,
1087
        request *types.StartWorkflowExecutionRequest,
1088
) (resp *types.StartWorkflowExecutionResponse, retError error) {
445✔
1089

445✔
1090
        var apiName = "StartWorkflowExecution"
445✔
1091
        var err error
445✔
1092
        var cluster string
445✔
1093

445✔
1094
        scope, startTime := handler.beforeCall(metrics.DCRedirectionStartWorkflowExecutionScope)
445✔
1095
        defer func() {
890✔
1096
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
445✔
1097
        }()
445✔
1098

1099
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
891✔
1100
                cluster = targetDC
446✔
1101
                switch {
446✔
1102
                case targetDC == handler.currentClusterName:
445✔
1103
                        resp, err = handler.frontendHandler.StartWorkflowExecution(ctx, request)
445✔
1104
                default:
1✔
1105
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
1106
                        resp, err = remoteClient.StartWorkflowExecution(ctx, request, handler.callOptions...)
1✔
1107
                }
1108
                return err
446✔
1109
        })
1110

1111
        return resp, err
445✔
1112
}
1113

1114
// TerminateWorkflowExecution API call
1115
func (handler *ClusterRedirectionHandlerImpl) TerminateWorkflowExecution(
1116
        ctx context.Context,
1117
        request *types.TerminateWorkflowExecutionRequest,
1118
) (retError error) {
49✔
1119

49✔
1120
        var apiName = "TerminateWorkflowExecution"
49✔
1121
        var err error
49✔
1122
        var cluster string
49✔
1123

49✔
1124
        scope, startTime := handler.beforeCall(metrics.DCRedirectionTerminateWorkflowExecutionScope)
49✔
1125
        defer func() {
98✔
1126
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
49✔
1127
        }()
49✔
1128

1129
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
99✔
1130
                cluster = targetDC
50✔
1131
                switch {
50✔
1132
                case targetDC == handler.currentClusterName:
49✔
1133
                        err = handler.frontendHandler.TerminateWorkflowExecution(ctx, request)
49✔
1134
                default:
1✔
1135
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
1136
                        err = remoteClient.TerminateWorkflowExecution(ctx, request, handler.callOptions...)
1✔
1137
                }
1138
                return err
50✔
1139
        })
1140

1141
        return err
49✔
1142
}
1143

1144
// ListTaskListPartitions API call
1145
func (handler *ClusterRedirectionHandlerImpl) ListTaskListPartitions(
1146
        ctx context.Context,
1147
        request *types.ListTaskListPartitionsRequest,
1148
) (resp *types.ListTaskListPartitionsResponse, retError error) {
1✔
1149

1✔
1150
        var apiName = "ListTaskListPartitions"
1✔
1151
        var err error
1✔
1152
        var cluster string
1✔
1153

1✔
1154
        scope, startTime := handler.beforeCall(metrics.DCRedirectionListTaskListPartitionsScope)
1✔
1155
        defer func() {
2✔
1156
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
1✔
1157
        }()
1✔
1158

1159
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
3✔
1160
                cluster = targetDC
2✔
1161
                switch {
2✔
1162
                case targetDC == handler.currentClusterName:
1✔
1163
                        resp, err = handler.frontendHandler.ListTaskListPartitions(ctx, request)
1✔
1164
                default:
1✔
1165
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
1166
                        resp, err = remoteClient.ListTaskListPartitions(ctx, request, handler.callOptions...)
1✔
1167
                }
1168
                return err
2✔
1169
        })
1170

1171
        return resp, err
1✔
1172
}
1173

1174
// GetTaskListsByDomain API call
1175
func (handler *ClusterRedirectionHandlerImpl) GetTaskListsByDomain(
1176
        ctx context.Context,
1177
        request *types.GetTaskListsByDomainRequest,
1178
) (resp *types.GetTaskListsByDomainResponse, retError error) {
1✔
1179

1✔
1180
        var apiName = "GetTaskListsByDomain"
1✔
1181
        var err error
1✔
1182
        var cluster string
1✔
1183

1✔
1184
        scope, startTime := handler.beforeCall(metrics.DCRedirectionGetTaskListsByDomainScope)
1✔
1185
        defer func() {
2✔
1186
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
1✔
1187
        }()
1✔
1188

1189
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
3✔
1190
                cluster = targetDC
2✔
1191
                switch {
2✔
1192
                case targetDC == handler.currentClusterName:
1✔
1193
                        resp, err = handler.frontendHandler.GetTaskListsByDomain(ctx, request)
1✔
1194
                default:
1✔
1195
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
1✔
1196
                        resp, err = remoteClient.GetTaskListsByDomain(ctx, request, handler.callOptions...)
1✔
1197
                }
1198
                return err
2✔
1199
        })
1200

1201
        return resp, err
1✔
1202
}
1203

1204
// RefreshWorkflowTasks API call
1205
func (handler *ClusterRedirectionHandlerImpl) RefreshWorkflowTasks(
1206
        ctx context.Context,
1207
        request *types.RefreshWorkflowTasksRequest,
1208
) (retError error) {
×
1209

×
1210
        var apiName = "RefreshWorkflowTasks"
×
1211
        var err error
×
1212
        var cluster string
×
1213

×
1214
        scope, startTime := handler.beforeCall(metrics.DCRedirectionRefreshWorkflowTasksScope)
×
1215
        defer func() {
×
1216
                handler.afterCall(recover(), scope, startTime, cluster, &retError)
×
1217
        }()
×
1218

1219
        err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
×
1220
                cluster = targetDC
×
1221
                switch {
×
1222
                case targetDC == handler.currentClusterName:
×
1223
                        err = handler.frontendHandler.RefreshWorkflowTasks(ctx, request)
×
1224
                default:
×
1225
                        remoteClient := handler.GetRemoteFrontendClient(targetDC)
×
1226
                        err = remoteClient.RefreshWorkflowTasks(ctx, request, handler.callOptions...)
×
1227
                }
1228
                return err
×
1229
        })
1230

1231
        return err
×
1232
}
1233

1234
// GetClusterInfo API call
1235
func (handler *ClusterRedirectionHandlerImpl) GetClusterInfo(
1236
        ctx context.Context,
1237
) (*types.ClusterInfo, error) {
×
1238
        return handler.frontendHandler.GetClusterInfo(ctx)
×
1239
}
×
1240

1241
func (handler *ClusterRedirectionHandlerImpl) beforeCall(
1242
        scope int,
1243
) (metrics.Scope, time.Time) {
6,542✔
1244

6,542✔
1245
        return handler.GetMetricsClient().Scope(scope), handler.GetTimeSource().Now()
6,542✔
1246
}
6,542✔
1247

1248
func (handler *ClusterRedirectionHandlerImpl) afterCall(
1249
        recovered interface{},
1250
        scope metrics.Scope,
1251
        startTime time.Time,
1252
        cluster string,
1253
        retError *error,
1254
) {
6,542✔
1255

6,542✔
1256
        log.CapturePanic(recovered, handler.GetLogger(), retError)
6,542✔
1257

6,542✔
1258
        scope = scope.Tagged(metrics.TargetClusterTag(cluster))
6,542✔
1259
        scope.IncCounter(metrics.CadenceDcRedirectionClientRequests)
6,542✔
1260
        scope.RecordTimer(metrics.CadenceDcRedirectionClientLatency, handler.GetTimeSource().Now().Sub(startTime))
6,542✔
1261
        if *retError != nil {
7,285✔
1262
                scope.IncCounter(metrics.CadenceDcRedirectionClientFailures)
743✔
1263
        }
743✔
1264
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc