• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018daedd-c4d7-4e21-b456-76c002abd778

15 Feb 2024 10:23PM UTC coverage: 62.721% (-0.03%) from 62.749%
018daedd-c4d7-4e21-b456-76c002abd778

Pull #5666

buildkite

neil-xie
Update in submodule
Pull Request #5666: Upgrade pinot client version

92590 of 147622 relevant lines covered (62.72%)

2306.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.1
/client/history/client.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package history
22

23
import (
24
        "context"
25
        "math/rand"
26
        "sync"
27
        "time"
28

29
        "go.uber.org/yarpc"
30
        "golang.org/x/sync/errgroup"
31

32
        "github.com/uber/cadence/common"
33
        "github.com/uber/cadence/common/future"
34
        "github.com/uber/cadence/common/log"
35
        "github.com/uber/cadence/common/log/tag"
36
        "github.com/uber/cadence/common/rpc"
37
        "github.com/uber/cadence/common/types"
38
)
39

40
var _ Client = (*clientImpl)(nil)
41

42
const (
43
        // DefaultTimeout is the default timeout used to make calls
44
        DefaultTimeout = time.Second * 30
45
)
46

47
type (
48
        clientImpl struct {
49
                numberOfShards    int
50
                rpcMaxSizeInBytes int // This value currently only used in GetReplicationMessage API
51
                tokenSerializer   common.TaskTokenSerializer
52
                timeout           time.Duration
53
                client            Client
54
                peerResolver      PeerResolver
55
                logger            log.Logger
56
        }
57

58
        getReplicationMessagesWithSize struct {
59
                response *types.GetReplicationMessagesResponse
60
                size     int
61
                peer     string
62
        }
63
)
64

65
// NewClient creates a new history service TChannel client
66
func NewClient(
67
        numberOfShards int,
68
        rpcMaxSizeInBytes int,
69
        timeout time.Duration,
70
        client Client,
71
        peerResolver PeerResolver,
72
        logger log.Logger,
73
) Client {
45✔
74
        return &clientImpl{
45✔
75
                numberOfShards:    numberOfShards,
45✔
76
                rpcMaxSizeInBytes: rpcMaxSizeInBytes,
45✔
77
                tokenSerializer:   common.NewJSONTaskTokenSerializer(),
45✔
78
                timeout:           timeout,
45✔
79
                client:            client,
45✔
80
                peerResolver:      peerResolver,
45✔
81
                logger:            logger,
45✔
82
        }
45✔
83
}
45✔
84

85
func (c *clientImpl) StartWorkflowExecution(
86
        ctx context.Context,
87
        request *types.HistoryStartWorkflowExecutionRequest,
88
        opts ...yarpc.CallOption,
89
) (*types.StartWorkflowExecutionResponse, error) {
462✔
90
        peer, err := c.peerResolver.FromWorkflowID(request.StartRequest.WorkflowID)
462✔
91
        if err != nil {
462✔
92
                return nil, err
×
93
        }
×
94
        var response *types.StartWorkflowExecutionResponse
462✔
95
        op := func(ctx context.Context, peer string) error {
924✔
96
                var err error
462✔
97
                ctx, cancel := c.createContext(ctx)
462✔
98
                defer cancel()
462✔
99
                response, err = c.client.StartWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
462✔
100
                return err
462✔
101
        }
462✔
102
        err = c.executeWithRedirect(ctx, peer, op)
462✔
103
        if err != nil {
480✔
104
                return nil, err
18✔
105
        }
18✔
106
        return response, nil
444✔
107
}
108

109
func (c *clientImpl) GetMutableState(
110
        ctx context.Context,
111
        request *types.GetMutableStateRequest,
112
        opts ...yarpc.CallOption,
113
) (*types.GetMutableStateResponse, error) {
439✔
114
        peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID)
439✔
115
        if err != nil {
439✔
116
                return nil, err
×
117
        }
×
118
        var response *types.GetMutableStateResponse
439✔
119
        op := func(ctx context.Context, peer string) error {
878✔
120
                var err error
439✔
121
                ctx, cancel := c.createContext(ctx)
439✔
122
                defer cancel()
439✔
123
                response, err = c.client.GetMutableState(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
439✔
124
                return err
439✔
125
        }
439✔
126
        err = c.executeWithRedirect(ctx, peer, op)
439✔
127
        if err != nil {
462✔
128
                return nil, err
23✔
129
        }
23✔
130
        return response, nil
416✔
131
}
132

133
func (c *clientImpl) PollMutableState(
134
        ctx context.Context,
135
        request *types.PollMutableStateRequest,
136
        opts ...yarpc.CallOption,
137
) (*types.PollMutableStateResponse, error) {
392✔
138
        peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID)
392✔
139
        if err != nil {
392✔
140
                return nil, err
×
141
        }
×
142
        var response *types.PollMutableStateResponse
392✔
143
        op := func(ctx context.Context, peer string) error {
784✔
144
                var err error
392✔
145
                ctx, cancel := c.createContext(ctx)
392✔
146
                defer cancel()
392✔
147
                response, err = c.client.PollMutableState(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
392✔
148
                return err
392✔
149
        }
392✔
150
        err = c.executeWithRedirect(ctx, peer, op)
392✔
151
        if err != nil {
392✔
152
                return nil, err
×
153
        }
×
154
        return response, nil
392✔
155
}
156

157
func (c *clientImpl) DescribeHistoryHost(
158
        ctx context.Context,
159
        request *types.DescribeHistoryHostRequest,
160
        opts ...yarpc.CallOption,
161
) (*types.DescribeHistoryHostResponse, error) {
×
162

×
163
        var err error
×
164
        var peer string
×
165

×
166
        if request.ShardIDForHost != nil {
×
167
                peer, err = c.peerResolver.FromShardID(int(request.GetShardIDForHost()))
×
168
        } else if request.ExecutionForHost != nil {
×
169
                peer, err = c.peerResolver.FromWorkflowID(request.ExecutionForHost.GetWorkflowID())
×
170
        } else {
×
171
                peer, err = c.peerResolver.FromHostAddress(request.GetHostAddress())
×
172
        }
×
173
        if err != nil {
×
174
                return nil, err
×
175
        }
×
176

177
        var response *types.DescribeHistoryHostResponse
×
178
        op := func(ctx context.Context, peer string) error {
×
179
                var err error
×
180
                ctx, cancel := c.createContext(ctx)
×
181
                defer cancel()
×
182
                response, err = c.client.DescribeHistoryHost(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
183
                return err
×
184
        }
×
185
        err = c.executeWithRedirect(ctx, peer, op)
×
186
        if err != nil {
×
187
                return nil, err
×
188
        }
×
189
        return response, nil
×
190
}
191

192
func (c *clientImpl) RemoveTask(
193
        ctx context.Context,
194
        request *types.RemoveTaskRequest,
195
        opts ...yarpc.CallOption,
196
) error {
×
197
        peer, err := c.peerResolver.FromShardID(int(request.GetShardID()))
×
198
        if err != nil {
×
199
                return err
×
200
        }
×
201
        op := func(ctx context.Context, peer string) error {
×
202
                var err error
×
203
                ctx, cancel := c.createContext(ctx)
×
204
                defer cancel()
×
205
                err = c.client.RemoveTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
206
                return err
×
207
        }
×
208

209
        err = c.executeWithRedirect(ctx, peer, op)
×
210
        return err
×
211
}
212

213
func (c *clientImpl) CloseShard(
214
        ctx context.Context,
215
        request *types.CloseShardRequest,
216
        opts ...yarpc.CallOption,
217
) error {
×
218
        peer, err := c.peerResolver.FromShardID(int(request.GetShardID()))
×
219
        if err != nil {
×
220
                return err
×
221
        }
×
222
        op := func(ctx context.Context, peer string) error {
×
223
                var err error
×
224
                ctx, cancel := c.createContext(ctx)
×
225
                defer cancel()
×
226
                err = c.client.CloseShard(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
227
                return err
×
228
        }
×
229

230
        err = c.executeWithRedirect(ctx, peer, op)
×
231
        if err != nil {
×
232
                return err
×
233
        }
×
234
        return nil
×
235
}
236

237
func (c *clientImpl) ResetQueue(
238
        ctx context.Context,
239
        request *types.ResetQueueRequest,
240
        opts ...yarpc.CallOption,
241
) error {
×
242
        peer, err := c.peerResolver.FromShardID(int(request.GetShardID()))
×
243
        if err != nil {
×
244
                return err
×
245
        }
×
246
        op := func(ctx context.Context, peer string) error {
×
247
                var err error
×
248
                ctx, cancel := c.createContext(ctx)
×
249
                defer cancel()
×
250
                err = c.client.ResetQueue(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
251
                return err
×
252
        }
×
253

254
        err = c.executeWithRedirect(ctx, peer, op)
×
255
        if err != nil {
×
256
                return err
×
257
        }
×
258
        return nil
×
259
}
260

261
func (c *clientImpl) DescribeQueue(
262
        ctx context.Context,
263
        request *types.DescribeQueueRequest,
264
        opts ...yarpc.CallOption,
265
) (*types.DescribeQueueResponse, error) {
×
266
        peer, err := c.peerResolver.FromShardID(int(request.GetShardID()))
×
267
        if err != nil {
×
268
                return nil, err
×
269
        }
×
270
        var response *types.DescribeQueueResponse
×
271
        op := func(ctx context.Context, peer string) error {
×
272
                var err error
×
273
                ctx, cancel := c.createContext(ctx)
×
274
                defer cancel()
×
275
                response, err = c.client.DescribeQueue(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
276
                return err
×
277
        }
×
278

279
        err = c.executeWithRedirect(ctx, peer, op)
×
280
        if err != nil {
×
281
                return nil, err
×
282
        }
×
283
        return response, nil
×
284
}
285

286
func (c *clientImpl) DescribeMutableState(
287
        ctx context.Context,
288
        request *types.DescribeMutableStateRequest,
289
        opts ...yarpc.CallOption,
290
) (*types.DescribeMutableStateResponse, error) {
×
291
        peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID)
×
292
        if err != nil {
×
293
                return nil, err
×
294
        }
×
295
        var response *types.DescribeMutableStateResponse
×
296
        op := func(ctx context.Context, peer string) error {
×
297
                var err error
×
298
                ctx, cancel := c.createContext(ctx)
×
299
                defer cancel()
×
300
                response, err = c.client.DescribeMutableState(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
301
                return err
×
302
        }
×
303
        err = c.executeWithRedirect(ctx, peer, op)
×
304
        if err != nil {
×
305
                return nil, err
×
306
        }
×
307
        return response, nil
×
308
}
309

310
func (c *clientImpl) ResetStickyTaskList(
311
        ctx context.Context,
312
        request *types.HistoryResetStickyTaskListRequest,
313
        opts ...yarpc.CallOption,
314
) (*types.HistoryResetStickyTaskListResponse, error) {
3✔
315
        peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID)
3✔
316
        if err != nil {
3✔
317
                return nil, err
×
318
        }
×
319
        var response *types.HistoryResetStickyTaskListResponse
3✔
320
        op := func(ctx context.Context, peer string) error {
6✔
321
                var err error
3✔
322
                ctx, cancel := c.createContext(ctx)
3✔
323
                defer cancel()
3✔
324
                response, err = c.client.ResetStickyTaskList(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
3✔
325
                return err
3✔
326
        }
3✔
327
        err = c.executeWithRedirect(ctx, peer, op)
3✔
328
        if err != nil {
3✔
329
                return nil, err
×
330
        }
×
331
        return response, nil
3✔
332
}
333

334
func (c *clientImpl) DescribeWorkflowExecution(
335
        ctx context.Context,
336
        request *types.HistoryDescribeWorkflowExecutionRequest,
337
        opts ...yarpc.CallOption,
338
) (*types.DescribeWorkflowExecutionResponse, error) {
93✔
339
        peer, err := c.peerResolver.FromWorkflowID(request.Request.Execution.WorkflowID)
93✔
340
        if err != nil {
93✔
341
                return nil, err
×
342
        }
×
343
        var response *types.DescribeWorkflowExecutionResponse
93✔
344
        op := func(ctx context.Context, peer string) error {
186✔
345
                var err error
93✔
346
                ctx, cancel := c.createContext(ctx)
93✔
347
                defer cancel()
93✔
348
                response, err = c.client.DescribeWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
93✔
349
                return err
93✔
350
        }
93✔
351
        err = c.executeWithRedirect(ctx, peer, op)
93✔
352
        if err != nil {
93✔
353
                return nil, err
×
354
        }
×
355
        return response, nil
93✔
356
}
357

358
func (c *clientImpl) RecordDecisionTaskStarted(
359
        ctx context.Context,
360
        request *types.RecordDecisionTaskStartedRequest,
361
        opts ...yarpc.CallOption,
362
) (*types.RecordDecisionTaskStartedResponse, error) {
1,096✔
363
        peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID)
1,096✔
364
        if err != nil {
1,096✔
365
                return nil, err
×
366
        }
×
367
        var response *types.RecordDecisionTaskStartedResponse
1,096✔
368
        op := func(ctx context.Context, peer string) error {
2,192✔
369
                var err error
1,096✔
370
                ctx, cancel := c.createContext(ctx)
1,096✔
371
                defer cancel()
1,096✔
372
                response, err = c.client.RecordDecisionTaskStarted(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
1,096✔
373
                return err
1,096✔
374
        }
1,096✔
375
        err = c.executeWithRedirect(ctx, peer, op)
1,096✔
376
        if err != nil {
1,117✔
377
                return nil, err
21✔
378
        }
21✔
379
        return response, nil
1,075✔
380
}
381

382
func (c *clientImpl) RecordActivityTaskStarted(
383
        ctx context.Context,
384
        request *types.RecordActivityTaskStartedRequest,
385
        opts ...yarpc.CallOption,
386
) (*types.RecordActivityTaskStartedResponse, error) {
330✔
387
        peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID)
330✔
388
        if err != nil {
330✔
389
                return nil, err
×
390
        }
×
391
        var response *types.RecordActivityTaskStartedResponse
330✔
392
        op := func(ctx context.Context, peer string) error {
660✔
393
                var err error
330✔
394
                ctx, cancel := c.createContext(ctx)
330✔
395
                defer cancel()
330✔
396
                response, err = c.client.RecordActivityTaskStarted(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
330✔
397
                return err
330✔
398
        }
330✔
399
        err = c.executeWithRedirect(ctx, peer, op)
330✔
400
        if err != nil {
336✔
401
                return nil, err
6✔
402
        }
6✔
403
        return response, nil
324✔
404
}
405

406
func (c *clientImpl) RespondDecisionTaskCompleted(
407
        ctx context.Context,
408
        request *types.HistoryRespondDecisionTaskCompletedRequest,
409
        opts ...yarpc.CallOption,
410
) (*types.HistoryRespondDecisionTaskCompletedResponse, error) {
931✔
411
        taskToken, err := c.tokenSerializer.Deserialize(request.CompleteRequest.TaskToken)
931✔
412
        if err != nil {
931✔
413
                return nil, err
×
414
        }
×
415
        peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID)
931✔
416
        if err != nil {
931✔
417
                return nil, err
×
418
        }
×
419
        var response *types.HistoryRespondDecisionTaskCompletedResponse
931✔
420
        op := func(ctx context.Context, peer string) error {
1,862✔
421
                ctx, cancel := c.createContext(ctx)
931✔
422
                defer cancel()
931✔
423
                response, err = c.client.RespondDecisionTaskCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
931✔
424
                return err
931✔
425
        }
931✔
426
        err = c.executeWithRedirect(ctx, peer, op)
931✔
427
        return response, err
931✔
428
}
429

430
func (c *clientImpl) RespondDecisionTaskFailed(
431
        ctx context.Context,
432
        request *types.HistoryRespondDecisionTaskFailedRequest,
433
        opts ...yarpc.CallOption,
434
) error {
159✔
435
        taskToken, err := c.tokenSerializer.Deserialize(request.FailedRequest.TaskToken)
159✔
436
        if err != nil {
159✔
437
                return err
×
438
        }
×
439
        peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID)
159✔
440
        if err != nil {
159✔
441
                return err
×
442
        }
×
443
        op := func(ctx context.Context, peer string) error {
318✔
444
                ctx, cancel := c.createContext(ctx)
159✔
445
                defer cancel()
159✔
446
                return c.client.RespondDecisionTaskFailed(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
159✔
447
        }
159✔
448
        err = c.executeWithRedirect(ctx, peer, op)
159✔
449
        return err
159✔
450
}
451

452
func (c *clientImpl) RespondActivityTaskCompleted(
453
        ctx context.Context,
454
        request *types.HistoryRespondActivityTaskCompletedRequest,
455
        opts ...yarpc.CallOption,
456
) error {
321✔
457
        taskToken, err := c.tokenSerializer.Deserialize(request.CompleteRequest.TaskToken)
321✔
458
        if err != nil {
321✔
459
                return err
×
460
        }
×
461
        peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID)
321✔
462
        if err != nil {
321✔
463
                return err
×
464
        }
×
465
        op := func(ctx context.Context, peer string) error {
642✔
466
                ctx, cancel := c.createContext(ctx)
321✔
467
                defer cancel()
321✔
468
                return c.client.RespondActivityTaskCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
321✔
469
        }
321✔
470
        err = c.executeWithRedirect(ctx, peer, op)
321✔
471
        return err
321✔
472
}
473

474
func (c *clientImpl) RespondActivityTaskFailed(
475
        ctx context.Context,
476
        request *types.HistoryRespondActivityTaskFailedRequest,
477
        opts ...yarpc.CallOption,
478
) error {
12✔
479
        taskToken, err := c.tokenSerializer.Deserialize(request.FailedRequest.TaskToken)
12✔
480
        if err != nil {
12✔
481
                return err
×
482
        }
×
483
        peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID)
12✔
484
        if err != nil {
12✔
485
                return err
×
486
        }
×
487
        op := func(ctx context.Context, peer string) error {
24✔
488
                ctx, cancel := c.createContext(ctx)
12✔
489
                defer cancel()
12✔
490
                return c.client.RespondActivityTaskFailed(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
12✔
491
        }
12✔
492
        err = c.executeWithRedirect(ctx, peer, op)
12✔
493
        return err
12✔
494
}
495

496
func (c *clientImpl) RespondActivityTaskCanceled(
497
        ctx context.Context,
498
        request *types.HistoryRespondActivityTaskCanceledRequest,
499
        opts ...yarpc.CallOption,
500
) error {
×
501
        taskToken, err := c.tokenSerializer.Deserialize(request.CancelRequest.TaskToken)
×
502
        if err != nil {
×
503
                return err
×
504
        }
×
505
        peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID)
×
506
        if err != nil {
×
507
                return err
×
508
        }
×
509
        op := func(ctx context.Context, peer string) error {
×
510
                ctx, cancel := c.createContext(ctx)
×
511
                defer cancel()
×
512
                return c.client.RespondActivityTaskCanceled(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
513
        }
×
514
        err = c.executeWithRedirect(ctx, peer, op)
×
515
        return err
×
516
}
517

518
func (c *clientImpl) RecordActivityTaskHeartbeat(
519
        ctx context.Context,
520
        request *types.HistoryRecordActivityTaskHeartbeatRequest,
521
        opts ...yarpc.CallOption,
522
) (*types.RecordActivityTaskHeartbeatResponse, error) {
381✔
523
        taskToken, err := c.tokenSerializer.Deserialize(request.HeartbeatRequest.TaskToken)
381✔
524
        if err != nil {
381✔
525
                return nil, err
×
526
        }
×
527
        peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID)
381✔
528
        if err != nil {
381✔
529
                return nil, err
×
530
        }
×
531
        var response *types.RecordActivityTaskHeartbeatResponse
381✔
532
        op := func(ctx context.Context, peer string) error {
762✔
533
                var err error
381✔
534
                ctx, cancel := c.createContext(ctx)
381✔
535
                defer cancel()
381✔
536
                response, err = c.client.RecordActivityTaskHeartbeat(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
381✔
537
                return err
381✔
538
        }
381✔
539
        err = c.executeWithRedirect(ctx, peer, op)
381✔
540
        if err != nil {
381✔
541
                return nil, err
×
542
        }
×
543
        return response, nil
381✔
544
}
545

546
func (c *clientImpl) RequestCancelWorkflowExecution(
547
        ctx context.Context,
548
        request *types.HistoryRequestCancelWorkflowExecutionRequest,
549
        opts ...yarpc.CallOption,
550
) error {
12✔
551
        peer, err := c.peerResolver.FromWorkflowID(request.CancelRequest.WorkflowExecution.WorkflowID)
12✔
552
        if err != nil {
12✔
553
                return err
×
554
        }
×
555
        op := func(ctx context.Context, peer string) error {
24✔
556
                ctx, cancel := c.createContext(ctx)
12✔
557
                defer cancel()
12✔
558
                return c.client.RequestCancelWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
12✔
559
        }
12✔
560
        return c.executeWithRedirect(ctx, peer, op)
12✔
561
}
562

563
func (c *clientImpl) SignalWorkflowExecution(
564
        ctx context.Context,
565
        request *types.HistorySignalWorkflowExecutionRequest,
566
        opts ...yarpc.CallOption,
567
) error {
732✔
568
        peer, err := c.peerResolver.FromWorkflowID(request.SignalRequest.WorkflowExecution.WorkflowID)
732✔
569
        if err != nil {
732✔
570
                return err
×
571
        }
×
572
        op := func(ctx context.Context, peer string) error {
1,464✔
573
                ctx, cancel := c.createContext(ctx)
732✔
574
                defer cancel()
732✔
575
                return c.client.SignalWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
732✔
576
        }
732✔
577
        err = c.executeWithRedirect(ctx, peer, op)
732✔
578

732✔
579
        return err
732✔
580
}
581

582
func (c *clientImpl) SignalWithStartWorkflowExecution(
583
        ctx context.Context,
584
        request *types.HistorySignalWithStartWorkflowExecutionRequest,
585
        opts ...yarpc.CallOption,
586
) (*types.StartWorkflowExecutionResponse, error) {
33✔
587
        peer, err := c.peerResolver.FromWorkflowID(request.SignalWithStartRequest.WorkflowID)
33✔
588
        if err != nil {
33✔
589
                return nil, err
×
590
        }
×
591
        var response *types.StartWorkflowExecutionResponse
33✔
592
        op := func(ctx context.Context, peer string) error {
66✔
593
                var err error
33✔
594
                ctx, cancel := c.createContext(ctx)
33✔
595
                defer cancel()
33✔
596
                response, err = c.client.SignalWithStartWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
33✔
597
                return err
33✔
598
        }
33✔
599
        err = c.executeWithRedirect(ctx, peer, op)
33✔
600
        if err != nil {
39✔
601
                return nil, err
6✔
602
        }
6✔
603

604
        return response, err
27✔
605
}
606

607
func (c *clientImpl) RemoveSignalMutableState(
608
        ctx context.Context,
609
        request *types.RemoveSignalMutableStateRequest,
610
        opts ...yarpc.CallOption,
611
) error {
6✔
612
        peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID)
6✔
613
        if err != nil {
6✔
614
                return err
×
615
        }
×
616
        op := func(ctx context.Context, peer string) error {
12✔
617
                ctx, cancel := c.createContext(ctx)
6✔
618
                defer cancel()
6✔
619
                return c.client.RemoveSignalMutableState(ctx, request, yarpc.WithShardKey(peer))
6✔
620
        }
6✔
621
        err = c.executeWithRedirect(ctx, peer, op)
6✔
622

6✔
623
        return err
6✔
624
}
625

626
func (c *clientImpl) TerminateWorkflowExecution(
627
        ctx context.Context,
628
        request *types.HistoryTerminateWorkflowExecutionRequest,
629
        opts ...yarpc.CallOption,
630
) error {
51✔
631
        peer, err := c.peerResolver.FromWorkflowID(request.TerminateRequest.WorkflowExecution.WorkflowID)
51✔
632
        if err != nil {
51✔
633
                return err
×
634
        }
×
635
        op := func(ctx context.Context, peer string) error {
102✔
636
                ctx, cancel := c.createContext(ctx)
51✔
637
                defer cancel()
51✔
638
                return c.client.TerminateWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
51✔
639
        }
51✔
640
        err = c.executeWithRedirect(ctx, peer, op)
51✔
641
        return err
51✔
642
}
643

644
func (c *clientImpl) ResetWorkflowExecution(
645
        ctx context.Context,
646
        request *types.HistoryResetWorkflowExecutionRequest,
647
        opts ...yarpc.CallOption,
648
) (*types.ResetWorkflowExecutionResponse, error) {
15✔
649
        peer, err := c.peerResolver.FromWorkflowID(request.ResetRequest.WorkflowExecution.WorkflowID)
15✔
650
        if err != nil {
15✔
651
                return nil, err
×
652
        }
×
653
        var response *types.ResetWorkflowExecutionResponse
15✔
654
        op := func(ctx context.Context, peer string) error {
30✔
655
                ctx, cancel := c.createContext(ctx)
15✔
656
                defer cancel()
15✔
657
                response, err = c.client.ResetWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
15✔
658
                return err
15✔
659
        }
15✔
660
        err = c.executeWithRedirect(ctx, peer, op)
15✔
661
        if err != nil {
15✔
662
                return nil, err
×
663
        }
×
664
        return response, err
15✔
665
}
666

667
func (c *clientImpl) ScheduleDecisionTask(
668
        ctx context.Context,
669
        request *types.ScheduleDecisionTaskRequest,
670
        opts ...yarpc.CallOption,
671
) error {
18✔
672
        peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID)
18✔
673
        if err != nil {
18✔
674
                return err
×
675
        }
×
676
        op := func(ctx context.Context, peer string) error {
36✔
677
                ctx, cancel := c.createContext(ctx)
18✔
678
                defer cancel()
18✔
679
                return c.client.ScheduleDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
18✔
680
        }
18✔
681
        err = c.executeWithRedirect(ctx, peer, op)
18✔
682
        return err
18✔
683
}
684

685
func (c *clientImpl) RecordChildExecutionCompleted(
686
        ctx context.Context,
687
        request *types.RecordChildExecutionCompletedRequest,
688
        opts ...yarpc.CallOption,
689
) error {
18✔
690
        peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID)
18✔
691
        if err != nil {
18✔
692
                return err
×
693
        }
×
694
        op := func(ctx context.Context, peer string) error {
36✔
695
                ctx, cancel := c.createContext(ctx)
18✔
696
                defer cancel()
18✔
697
                return c.client.RecordChildExecutionCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
18✔
698
        }
18✔
699
        err = c.executeWithRedirect(ctx, peer, op)
18✔
700
        return err
18✔
701
}
702

703
func (c *clientImpl) ReplicateEventsV2(
704
        ctx context.Context,
705
        request *types.ReplicateEventsV2Request,
706
        opts ...yarpc.CallOption,
707
) error {
×
708
        peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.GetWorkflowID())
×
709
        if err != nil {
×
710
                return err
×
711
        }
×
712
        op := func(ctx context.Context, peer string) error {
×
713
                ctx, cancel := c.createContext(ctx)
×
714
                defer cancel()
×
715
                return c.client.ReplicateEventsV2(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
716
        }
×
717
        err = c.executeWithRedirect(ctx, peer, op)
×
718
        return err
×
719
}
720

721
func (c *clientImpl) SyncShardStatus(
722
        ctx context.Context,
723
        request *types.SyncShardStatusRequest,
724
        opts ...yarpc.CallOption,
725
) error {
×
726

×
727
        // we do not have a workflow ID here, instead, we have something even better
×
728
        peer, err := c.peerResolver.FromShardID(int(request.GetShardID()))
×
729
        if err != nil {
×
730
                return err
×
731
        }
×
732

733
        op := func(ctx context.Context, peer string) error {
×
734
                ctx, cancel := c.createContext(ctx)
×
735
                defer cancel()
×
736
                return c.client.SyncShardStatus(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
737
        }
×
738
        err = c.executeWithRedirect(ctx, peer, op)
×
739
        return err
×
740
}
741

742
func (c *clientImpl) SyncActivity(
743
        ctx context.Context,
744
        request *types.SyncActivityRequest,
745
        opts ...yarpc.CallOption,
746
) error {
×
747

×
748
        peer, err := c.peerResolver.FromWorkflowID(request.GetWorkflowID())
×
749
        if err != nil {
×
750
                return err
×
751
        }
×
752
        op := func(ctx context.Context, peer string) error {
×
753
                ctx, cancel := c.createContext(ctx)
×
754
                defer cancel()
×
755
                return c.client.SyncActivity(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
756
        }
×
757
        err = c.executeWithRedirect(ctx, peer, op)
×
758
        return err
×
759
}
760

761
func (c *clientImpl) QueryWorkflow(
762
        ctx context.Context,
763
        request *types.HistoryQueryWorkflowRequest,
764
        opts ...yarpc.CallOption,
765
) (*types.HistoryQueryWorkflowResponse, error) {
45✔
766
        peer, err := c.peerResolver.FromWorkflowID(request.GetRequest().GetExecution().GetWorkflowID())
45✔
767
        if err != nil {
45✔
768
                return nil, err
×
769
        }
×
770
        var response *types.HistoryQueryWorkflowResponse
45✔
771
        op := func(ctx context.Context, peer string) error {
90✔
772
                var err error
45✔
773
                ctx, cancel := c.createContext(ctx)
45✔
774
                defer cancel()
45✔
775
                response, err = c.client.QueryWorkflow(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
45✔
776
                return err
45✔
777
        }
45✔
778
        err = c.executeWithRedirect(ctx, peer, op)
45✔
779
        if err != nil {
57✔
780
                return nil, err
12✔
781
        }
12✔
782
        return response, nil
33✔
783
}
784

785
func (c *clientImpl) GetReplicationMessages(
786
        ctx context.Context,
787
        request *types.GetReplicationMessagesRequest,
788
        opts ...yarpc.CallOption,
789
) (*types.GetReplicationMessagesResponse, error) {
×
790
        requestsByPeer := make(map[string]*types.GetReplicationMessagesRequest)
×
791

×
792
        for _, token := range request.Tokens {
×
793
                peer, err := c.peerResolver.FromShardID(int(token.GetShardID()))
×
794
                if err != nil {
×
795
                        return nil, err
×
796
                }
×
797

798
                if _, ok := requestsByPeer[peer]; !ok {
×
799
                        requestsByPeer[peer] = &types.GetReplicationMessagesRequest{
×
800
                                ClusterName: request.ClusterName,
×
801
                        }
×
802
                }
×
803

804
                req := requestsByPeer[peer]
×
805
                req.Tokens = append(req.Tokens, token)
×
806
        }
807

808
        g := &errgroup.Group{}
×
809
        var responseMutex sync.Mutex
×
810
        peerResponses := make([]*getReplicationMessagesWithSize, 0, len(requestsByPeer))
×
811

×
812
        for peer, req := range requestsByPeer {
×
813
                peer, req := peer, req
×
814
                g.Go(func() (e error) {
×
815
                        defer func() { log.CapturePanic(recover(), c.logger, &e) }()
×
816

817
                        requestContext, cancel := common.CreateChildContext(ctx, 0.05)
×
818
                        defer cancel()
×
819

×
820
                        requestContext, responseInfo := rpc.ContextWithResponseInfo(requestContext)
×
821
                        resp, err := c.client.GetReplicationMessages(requestContext, req, append(opts, yarpc.WithShardKey(peer))...)
×
822
                        if err != nil {
×
823
                                c.logger.Warn("Failed to get replication tasks from client",
×
824
                                        tag.Error(err),
×
825
                                        tag.ShardReplicationToken(req),
×
826
                                )
×
827
                                // Returns service busy error to notify replication
×
828
                                if _, ok := err.(*types.ServiceBusyError); ok {
×
829
                                        return err
×
830
                                }
×
831
                                return nil
×
832
                        }
833
                        responseMutex.Lock()
×
834
                        peerResponses = append(peerResponses, &getReplicationMessagesWithSize{
×
835
                                response: resp,
×
836
                                size:     responseInfo.Size,
×
837
                                peer:     peer,
×
838
                        })
×
839
                        responseMutex.Unlock()
×
840
                        return nil
×
841
                })
842
        }
843

844
        if err := g.Wait(); err != nil {
×
845
                return nil, err
×
846
        }
×
847

848
        // Peers with largest responses can be slowest to return data.
849
        // They end up in the end of array and have a possibility of not fitting in the response message.
850
        // Skipped peers grow their responses even more and next they will be even slower and end up in the end again.
851
        // This can lead to starving peers.
852
        // Shuffle the slice of responses to prevent such scenario. All peer will have equal chance to be pick up first.
853
        r := rand.New(rand.NewSource(time.Now().UnixNano()))
×
854
        for i := range peerResponses {
×
855
                j := r.Intn(i + 1)
×
856
                peerResponses[i], peerResponses[j] = peerResponses[j], peerResponses[i]
×
857
        }
×
858

859
        response := &types.GetReplicationMessagesResponse{MessagesByShard: make(map[int32]*types.ReplicationMessages)}
×
860
        responseTotalSize := 0
×
861
        rpcMaxResponseSize := c.rpcMaxSizeInBytes
×
862
        for _, resp := range peerResponses {
×
863
                if (responseTotalSize + resp.size) >= rpcMaxResponseSize {
×
864
                        // Log shards that did not fit for debugging purposes
×
865
                        for shardID := range resp.response.GetMessagesByShard() {
×
866
                                c.logger.Warn("Replication messages did not fit in the response",
×
867
                                        tag.ShardID(int(shardID)),
×
868
                                        tag.Address(resp.peer),
×
869
                                        tag.ResponseSize(resp.size),
×
870
                                        tag.ResponseTotalSize(responseTotalSize),
×
871
                                        tag.ResponseMaxSize(rpcMaxResponseSize),
×
872
                                )
×
873
                        }
×
874

875
                        // return partial response if the response size exceeded supported max size
876
                        // but continue with next peer response, as it may still fit
877
                        continue
×
878
                }
879

880
                responseTotalSize += resp.size
×
881

×
882
                for shardID, tasks := range resp.response.GetMessagesByShard() {
×
883
                        response.MessagesByShard[shardID] = tasks
×
884
                }
×
885
        }
886
        return response, nil
×
887
}
888

889
func (c *clientImpl) GetDLQReplicationMessages(
890
        ctx context.Context,
891
        request *types.GetDLQReplicationMessagesRequest,
892
        opts ...yarpc.CallOption,
893
) (*types.GetDLQReplicationMessagesResponse, error) {
×
894
        // All workflow IDs are in the same shard per request
×
895
        workflowID := request.GetTaskInfos()[0].GetWorkflowID()
×
896
        peer, err := c.peerResolver.FromWorkflowID(workflowID)
×
897
        if err != nil {
×
898
                return nil, err
×
899
        }
×
900

901
        return c.client.GetDLQReplicationMessages(
×
902
                ctx,
×
903
                request,
×
904
                append(opts, yarpc.WithShardKey(peer))...,
×
905
        )
×
906
}
907

908
func (c *clientImpl) ReapplyEvents(
909
        ctx context.Context,
910
        request *types.HistoryReapplyEventsRequest,
911
        opts ...yarpc.CallOption,
912
) error {
×
913
        peer, err := c.peerResolver.FromWorkflowID(request.GetRequest().GetWorkflowExecution().GetWorkflowID())
×
914
        if err != nil {
×
915
                return err
×
916
        }
×
917
        op := func(ctx context.Context, peer string) error {
×
918
                ctx, cancel := c.createContext(ctx)
×
919
                defer cancel()
×
920
                return c.client.ReapplyEvents(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
921
        }
×
922
        err = c.executeWithRedirect(ctx, peer, op)
×
923
        return err
×
924
}
925

926
func (c *clientImpl) CountDLQMessages(
927
        ctx context.Context,
928
        request *types.CountDLQMessagesRequest,
929
        opts ...yarpc.CallOption,
930
) (*types.HistoryCountDLQMessagesResponse, error) {
×
931

×
932
        peers, err := c.peerResolver.GetAllPeers()
×
933
        if err != nil {
×
934
                return nil, err
×
935
        }
×
936

937
        var mu sync.Mutex
×
938
        responses := make([]*types.HistoryCountDLQMessagesResponse, 0, len(peers))
×
939

×
940
        g := &errgroup.Group{}
×
941
        for _, peer := range peers {
×
942
                peer := peer
×
943
                g.Go(func() (e error) {
×
944
                        defer func() { log.CapturePanic(recover(), c.logger, &e) }()
×
945

946
                        response, err := c.client.CountDLQMessages(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
947
                        if err == nil {
×
948
                                mu.Lock()
×
949
                                responses = append(responses, response)
×
950
                                mu.Unlock()
×
951
                        }
×
952

953
                        return err
×
954
                })
955
        }
956

957
        err = g.Wait()
×
958

×
959
        entries := map[types.HistoryDLQCountKey]int64{}
×
960
        for _, response := range responses {
×
961
                for key, count := range response.Entries {
×
962
                        entries[key] = count
×
963
                }
×
964
        }
965
        return &types.HistoryCountDLQMessagesResponse{Entries: entries}, err
×
966
}
967

968
func (c *clientImpl) ReadDLQMessages(
969
        ctx context.Context,
970
        request *types.ReadDLQMessagesRequest,
971
        opts ...yarpc.CallOption,
972
) (*types.ReadDLQMessagesResponse, error) {
×
973

×
974
        peer, err := c.peerResolver.FromShardID(int(request.GetShardID()))
×
975
        if err != nil {
×
976
                return nil, err
×
977
        }
×
978
        return c.client.ReadDLQMessages(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
979
}
980

981
func (c *clientImpl) PurgeDLQMessages(
982
        ctx context.Context,
983
        request *types.PurgeDLQMessagesRequest,
984
        opts ...yarpc.CallOption,
985
) error {
×
986

×
987
        peer, err := c.peerResolver.FromShardID(int(request.GetShardID()))
×
988
        if err != nil {
×
989
                return err
×
990
        }
×
991
        return c.client.PurgeDLQMessages(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
992
}
993

994
func (c *clientImpl) MergeDLQMessages(
995
        ctx context.Context,
996
        request *types.MergeDLQMessagesRequest,
997
        opts ...yarpc.CallOption,
998
) (*types.MergeDLQMessagesResponse, error) {
×
999

×
1000
        peer, err := c.peerResolver.FromShardID(int(request.GetShardID()))
×
1001
        if err != nil {
×
1002
                return nil, err
×
1003
        }
×
1004
        return c.client.MergeDLQMessages(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
1005
}
1006

1007
func (c *clientImpl) RefreshWorkflowTasks(
1008
        ctx context.Context,
1009
        request *types.HistoryRefreshWorkflowTasksRequest,
1010
        opts ...yarpc.CallOption,
1011
) error {
×
1012
        peer, err := c.peerResolver.FromWorkflowID(request.GetRequest().GetExecution().GetWorkflowID())
×
1013
        if err != nil {
×
1014
                return err
×
1015
        }
×
1016
        op := func(ctx context.Context, peer string) error {
×
1017
                ctx, cancel := c.createContext(ctx)
×
1018
                defer cancel()
×
1019
                return c.client.RefreshWorkflowTasks(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
1020
        }
×
1021
        err = c.executeWithRedirect(ctx, peer, op)
×
1022
        return err
×
1023
}
1024

1025
func (c *clientImpl) NotifyFailoverMarkers(
1026
        ctx context.Context,
1027
        request *types.NotifyFailoverMarkersRequest,
1028
        opts ...yarpc.CallOption,
1029
) error {
×
1030
        requestsByPeer := make(map[string]*types.NotifyFailoverMarkersRequest)
×
1031

×
1032
        for _, token := range request.GetFailoverMarkerTokens() {
×
1033
                marker := token.GetFailoverMarker()
×
1034
                peer, err := c.peerResolver.FromDomainID(marker.GetDomainID())
×
1035
                if err != nil {
×
1036
                        return err
×
1037
                }
×
1038
                if _, ok := requestsByPeer[peer]; !ok {
×
1039
                        requestsByPeer[peer] = &types.NotifyFailoverMarkersRequest{
×
1040
                                FailoverMarkerTokens: []*types.FailoverMarkerToken{},
×
1041
                        }
×
1042
                }
×
1043

1044
                req := requestsByPeer[peer]
×
1045
                req.FailoverMarkerTokens = append(req.FailoverMarkerTokens, token)
×
1046
        }
1047

1048
        g := &errgroup.Group{}
×
1049
        for peer, req := range requestsByPeer {
×
1050
                peer, req := peer, req
×
1051
                g.Go(func() (e error) {
×
1052
                        defer func() { log.CapturePanic(recover(), c.logger, &e) }()
×
1053

1054
                        ctx, cancel := c.createContext(ctx)
×
1055
                        defer cancel()
×
1056

×
1057
                        return c.client.NotifyFailoverMarkers(ctx, req, append(opts, yarpc.WithShardKey(peer))...)
×
1058
                })
1059
        }
1060

1061
        return g.Wait()
×
1062
}
1063

1064
func (c *clientImpl) GetCrossClusterTasks(
1065
        ctx context.Context,
1066
        request *types.GetCrossClusterTasksRequest,
1067
        opts ...yarpc.CallOption,
1068
) (*types.GetCrossClusterTasksResponse, error) {
×
1069
        requestByPeer := make(map[string]*types.GetCrossClusterTasksRequest)
×
1070
        for _, shardID := range request.GetShardIDs() {
×
1071
                peer, err := c.peerResolver.FromShardID(int(shardID))
×
1072
                if err != nil {
×
1073
                        return nil, err
×
1074
                }
×
1075

1076
                if _, ok := requestByPeer[peer]; !ok {
×
1077
                        requestByPeer[peer] = &types.GetCrossClusterTasksRequest{
×
1078
                                TargetCluster: request.TargetCluster,
×
1079
                        }
×
1080
                }
×
1081
                requestByPeer[peer].ShardIDs = append(requestByPeer[peer].ShardIDs, shardID)
×
1082
        }
1083

1084
        // preserve 5% timeout to return partial of the result if context is timing out
1085
        ctx, cancel := common.CreateChildContext(ctx, 0.05)
×
1086
        defer cancel()
×
1087

×
1088
        futureByPeer := make(map[string]future.Future, len(requestByPeer))
×
1089
        for peer, req := range requestByPeer {
×
1090
                future, settable := future.NewFuture()
×
1091
                go func(ctx context.Context, peer string, req *types.GetCrossClusterTasksRequest) {
×
1092
                        settable.Set(c.client.GetCrossClusterTasks(ctx, req, yarpc.WithShardKey(peer)))
×
1093
                }(ctx, peer, req)
×
1094

1095
                futureByPeer[peer] = future
×
1096
        }
1097

1098
        response := &types.GetCrossClusterTasksResponse{
×
1099
                TasksByShard:       make(map[int32][]*types.CrossClusterTaskRequest),
×
1100
                FailedCauseByShard: make(map[int32]types.GetTaskFailedCause),
×
1101
        }
×
1102
        for peer, future := range futureByPeer {
×
1103
                var resp *types.GetCrossClusterTasksResponse
×
1104
                if futureErr := future.Get(ctx, &resp); futureErr != nil {
×
1105
                        c.logger.Error("Failed to get cross cluster tasks", tag.Error(futureErr))
×
1106
                        for _, failedShardID := range requestByPeer[peer].ShardIDs {
×
1107
                                response.FailedCauseByShard[failedShardID] = common.ConvertErrToGetTaskFailedCause(futureErr)
×
1108
                        }
×
1109
                } else {
×
1110
                        for shardID, tasks := range resp.TasksByShard {
×
1111
                                response.TasksByShard[shardID] = tasks
×
1112
                        }
×
1113
                        for shardID, failedCause := range resp.FailedCauseByShard {
×
1114
                                response.FailedCauseByShard[shardID] = failedCause
×
1115
                        }
×
1116
                }
1117
        }
1118
        // not using a waitGroup for created goroutines as once all futures are unblocked,
1119
        // those goroutines will eventually be completed
1120

1121
        return response, nil
×
1122
}
1123

1124
func (c *clientImpl) RespondCrossClusterTasksCompleted(
1125
        ctx context.Context,
1126
        request *types.RespondCrossClusterTasksCompletedRequest,
1127
        opts ...yarpc.CallOption,
1128
) (*types.RespondCrossClusterTasksCompletedResponse, error) {
×
1129
        peer, err := c.peerResolver.FromShardID(int(request.GetShardID()))
×
1130
        if err != nil {
×
1131
                return nil, err
×
1132
        }
×
1133

1134
        var response *types.RespondCrossClusterTasksCompletedResponse
×
1135
        op := func(ctx context.Context, peer string) error {
×
1136
                var err error
×
1137
                ctx, cancel := c.createContext(ctx)
×
1138
                defer cancel()
×
1139
                response, err = c.client.RespondCrossClusterTasksCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
1140
                return err
×
1141
        }
×
1142

1143
        err = c.executeWithRedirect(ctx, peer, op)
×
1144
        if err != nil {
×
1145
                return nil, err
×
1146
        }
×
1147
        return response, nil
×
1148
}
1149

1150
func (c *clientImpl) GetFailoverInfo(
1151
        ctx context.Context,
1152
        request *types.GetFailoverInfoRequest,
1153
        opts ...yarpc.CallOption,
1154
) (*types.GetFailoverInfoResponse, error) {
×
1155
        peer, err := c.peerResolver.FromDomainID(request.GetDomainID())
×
1156
        if err != nil {
×
1157
                return nil, err
×
1158
        }
×
1159
        return c.client.GetFailoverInfo(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
×
1160
}
1161

1162
func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
5,543✔
1163
        if parent == nil {
5,543✔
1164
                return context.WithTimeout(context.Background(), c.timeout)
×
1165
        }
×
1166
        return context.WithTimeout(parent, c.timeout)
5,543✔
1167
}
1168

1169
func (c *clientImpl) executeWithRedirect(
1170
        ctx context.Context,
1171
        peer string,
1172
        op func(ctx context.Context, peer string) error,
1173
) error {
5,543✔
1174
        var err error
5,543✔
1175
        if ctx == nil {
5,543✔
1176
                ctx = context.Background()
×
1177
        }
×
1178
redirectLoop:
5,543✔
1179
        for {
11,086✔
1180
                err = common.IsValidContext(ctx)
5,543✔
1181
                if err != nil {
5,543✔
1182
                        break redirectLoop
×
1183
                }
1184
                err = op(ctx, peer)
5,543✔
1185
                if err != nil {
5,704✔
1186
                        if s, ok := err.(*types.ShardOwnershipLostError); ok {
161✔
1187
                                // TODO: consider emitting a metric for number of redirects
×
1188
                                peer, err = c.peerResolver.FromHostAddress(s.GetOwner())
×
1189
                                if err != nil {
×
1190
                                        return err
×
1191
                                }
×
1192
                                continue redirectLoop
×
1193
                        }
1194
                }
1195
                break redirectLoop
5,543✔
1196
        }
1197
        return err
5,543✔
1198
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc