• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.24
/service/history/task/fetcher.go
1
// Copyright (c) 2021 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "context"
25
        "errors"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/uber/cadence/client"
31
        "github.com/uber/cadence/common"
32
        "github.com/uber/cadence/common/backoff"
33
        "github.com/uber/cadence/common/cluster"
34
        "github.com/uber/cadence/common/dynamicconfig"
35
        "github.com/uber/cadence/common/future"
36
        "github.com/uber/cadence/common/log"
37
        "github.com/uber/cadence/common/log/tag"
38
        "github.com/uber/cadence/common/metrics"
39
        "github.com/uber/cadence/common/types"
40
)
41

42
type (
43
        // FetcherOptions configures a Fetcher
44
        FetcherOptions struct {
45
                Parallelism                dynamicconfig.IntPropertyFn
46
                AggregationInterval        dynamicconfig.DurationPropertyFn
47
                ServiceBusyBackoffInterval dynamicconfig.DurationPropertyFn
48
                ErrorRetryInterval         dynamicconfig.DurationPropertyFn
49
                TimerJitterCoefficient     dynamicconfig.FloatPropertyFn
50
        }
51

52
        fetchRequest struct {
53
                shardID  int32
54
                params   []interface{}
55
                settable future.Settable
56
        }
57

58
        fetchTaskFunc func(
59
                ctx context.Context,
60
                clientBean client.Bean,
61
                sourceCluster string,
62
                currentCluster string,
63
                requestByShard map[int32]fetchRequest,
64
        ) (map[int32]interface{}, error)
65

66
        fetcherImpl struct {
67
                status         int32
68
                currentCluster string
69
                sourceCluster  string
70
                clientBean     client.Bean
71

72
                options      *FetcherOptions
73
                metricsScope metrics.Scope
74
                logger       log.Logger
75

76
                shutdownWG  sync.WaitGroup
77
                shutdownCh  chan struct{}
78
                requestChan chan fetchRequest
79

80
                fetchCtx       context.Context
81
                fetchCtxCancel context.CancelFunc
82
                fetchTaskFunc  fetchTaskFunc
83
        }
84
)
85

86
const (
87
        defaultFetchTimeout          = 30 * time.Second
88
        defaultRequestChanBufferSize = 1000
89
)
90

91
var (
92
        errTaskFetcherShutdown    = errors.New("task fetcher has already shutdown")
93
        errDuplicatedFetchRequest = errors.New("duplicated task fetch request")
94
)
95

96
// NewCrossClusterTaskFetchers creates a set of task fetchers,
97
// one for each source cluster
98
// The future returned by Fetcher.Get() will have value type []*types.CrossClusterTaskRequest
99
func NewCrossClusterTaskFetchers(
100
        clusterMetadata cluster.Metadata,
101
        clientBean client.Bean,
102
        options *FetcherOptions,
103
        metricsClient metrics.Client,
104
        logger log.Logger,
105
) Fetchers {
18✔
106
        return newTaskFetchers(
18✔
107
                clusterMetadata,
18✔
108
                clientBean,
18✔
109
                crossClusterTaskFetchFn,
18✔
110
                options,
18✔
111
                metricsClient,
18✔
112
                logger,
18✔
113
        )
18✔
114
}
18✔
115

116
func crossClusterTaskFetchFn(
117
        ctx context.Context,
118
        clientBean client.Bean,
119
        sourceCluster string,
120
        currentCluster string,
121
        requestByShard map[int32]fetchRequest,
122
) (map[int32]interface{}, error) {
79✔
123
        adminClient := clientBean.GetRemoteAdminClient(sourceCluster)
79✔
124
        shardIDs := make([]int32, 0, len(requestByShard))
79✔
125
        for shardID := range requestByShard {
376✔
126
                shardIDs = append(shardIDs, shardID)
297✔
127
        }
297✔
128
        // number of tasks returned will be controlled by source cluster.
129
        // if there are lots of tasks in the source cluster, they will be
130
        // returned in batches.
131
        request := &types.GetCrossClusterTasksRequest{
79✔
132
                ShardIDs:      shardIDs,
79✔
133
                TargetCluster: currentCluster,
79✔
134
        }
79✔
135
        ctx, cancel := context.WithTimeout(ctx, defaultFetchTimeout)
79✔
136
        defer cancel()
79✔
137
        resp, err := adminClient.GetCrossClusterTasks(ctx, request)
79✔
138
        if err != nil {
151✔
139
                return nil, err
72✔
140
        }
72✔
141

142
        responseByShard := make(map[int32]interface{}, len(resp.TasksByShard))
7✔
143
        for shardID, tasks := range resp.TasksByShard {
13✔
144
                responseByShard[shardID] = tasks
6✔
145
        }
6✔
146
        for shardID, failedCause := range resp.FailedCauseByShard {
7✔
147
                responseByShard[shardID] = common.ConvertGetTaskFailedCauseToErr(failedCause)
×
148
        }
×
149
        return responseByShard, nil
7✔
150
}
151

152
func newTaskFetchers(
153
        clusterMetadata cluster.Metadata,
154
        clientBean client.Bean,
155
        fetchTaskFunc fetchTaskFunc,
156
        options *FetcherOptions,
157
        metricsClient metrics.Client,
158
        logger log.Logger,
159
) Fetchers {
19✔
160
        currentClusterName := clusterMetadata.GetCurrentClusterName()
19✔
161
        remoteClusters := clusterMetadata.GetRemoteClusterInfo()
19✔
162

19✔
163
        fetchers := make([]Fetcher, 0, len(remoteClusters))
19✔
164
        for clusterName := range remoteClusters {
38✔
165
                fetchers = append(fetchers, newTaskFetcher(
19✔
166
                        currentClusterName,
19✔
167
                        clusterName,
19✔
168
                        clientBean,
19✔
169
                        fetchTaskFunc,
19✔
170
                        options,
19✔
171
                        metricsClient,
19✔
172
                        logger,
19✔
173
                ))
19✔
174
        }
19✔
175

176
        return fetchers
19✔
177
}
178

179
// Start is a util method for starting a group of fetchers
180
func (fetchers Fetchers) Start() {
17✔
181
        for _, fetcher := range fetchers {
34✔
182
                fetcher.Start()
17✔
183
        }
17✔
184
}
185

186
// Stop is a util method for stopping a group of fetchers
187
func (fetchers Fetchers) Stop() {
17✔
188
        for _, fetcher := range fetchers {
34✔
189
                fetcher.Stop()
17✔
190
        }
17✔
191
}
192

193
func newTaskFetcher(
194
        currentCluster string,
195
        sourceCluster string,
196
        clientBean client.Bean,
197
        fetchTaskFunc fetchTaskFunc,
198
        options *FetcherOptions,
199
        metricsClient metrics.Client,
200
        logger log.Logger,
201
) *fetcherImpl {
25✔
202
        fetchCtx, fetchCtxCancel := context.WithCancel(context.Background())
25✔
203
        return &fetcherImpl{
25✔
204
                status:         common.DaemonStatusInitialized,
25✔
205
                currentCluster: currentCluster,
25✔
206
                sourceCluster:  sourceCluster,
25✔
207
                clientBean:     clientBean,
25✔
208
                options:        options,
25✔
209
                metricsScope: metricsClient.Scope(
25✔
210
                        metrics.CrossClusterTaskFetcherScope,
25✔
211
                        metrics.ActiveClusterTag(sourceCluster),
25✔
212
                ),
25✔
213
                logger: logger.WithTags(
25✔
214
                        tag.ComponentCrossClusterTaskFetcher,
25✔
215
                        tag.SourceCluster(sourceCluster),
25✔
216
                ),
25✔
217
                shutdownCh:     make(chan struct{}),
25✔
218
                requestChan:    make(chan fetchRequest, defaultRequestChanBufferSize),
25✔
219
                fetchCtx:       fetchCtx,
25✔
220
                fetchCtxCancel: fetchCtxCancel,
25✔
221
                fetchTaskFunc:  fetchTaskFunc,
25✔
222
        }
25✔
223
}
25✔
224

225
func (f *fetcherImpl) Start() {
21✔
226
        if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
21✔
227
                return
×
228
        }
×
229

230
        parallelism := f.options.Parallelism()
21✔
231
        f.shutdownWG.Add(parallelism)
21✔
232
        for i := 0; i != parallelism; i++ {
52✔
233
                go f.aggregator()
31✔
234
        }
31✔
235

236
        f.logger.Info("Task fetcher started.", tag.LifeCycleStarted, tag.Counter(parallelism))
21✔
237
}
238

239
func (f *fetcherImpl) Stop() {
21✔
240
        if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
21✔
241
                return
×
242
        }
×
243

244
        close(f.shutdownCh)
21✔
245
        f.fetchCtxCancel()
21✔
246
        if success := common.AwaitWaitGroup(&f.shutdownWG, time.Minute); !success {
21✔
247
                f.logger.Warn("Task fetcher timedout on shutdown.", tag.LifeCycleStopTimedout)
×
248
        }
×
249

250
        f.logger.Info("Task fetcher stopped.", tag.LifeCycleStopped)
21✔
251
}
252

253
func (f *fetcherImpl) GetSourceCluster() string {
60✔
254
        return f.sourceCluster
60✔
255
}
60✔
256

257
func (f *fetcherImpl) Fetch(
258
        shardID int,
259
        fetchParams ...interface{},
260
) future.Future {
70✔
261
        future, settable := future.NewFuture()
70✔
262

70✔
263
        select {
70✔
264
        case f.requestChan <- fetchRequest{
265
                shardID:  int32(shardID),
266
                params:   fetchParams,
267
                settable: settable,
268
        }:
69✔
269
                // no-op
270
        case <-f.shutdownCh:
1✔
271
                settable.Set(nil, errTaskFetcherShutdown)
1✔
272
                return future
1✔
273
        }
274

275
        // we need to check again here
276
        // since even if shutdownCh is closed, the above select may still
277
        // push the request to requestChan
278
        select {
69✔
279
        case <-f.shutdownCh:
×
280
                f.drainRequestCh()
×
281
        default:
69✔
282
        }
283

284
        return future
69✔
285
}
286

287
func (f *fetcherImpl) aggregator() {
31✔
288
        defer f.shutdownWG.Done()
31✔
289

31✔
290
        fetchTimer := time.NewTimer(backoff.JitDuration(
31✔
291
                f.options.AggregationInterval(),
31✔
292
                f.options.TimerJitterCoefficient(),
31✔
293
        ))
31✔
294

31✔
295
        outstandingRequests := make(map[int32]fetchRequest)
31✔
296

31✔
297
        for {
215✔
298
                select {
184✔
299
                case <-f.shutdownCh:
31✔
300
                        fetchTimer.Stop()
31✔
301
                        f.drainRequestCh()
31✔
302
                        for _, request := range outstandingRequests {
83✔
303
                                request.settable.Set(nil, errTaskFetcherShutdown)
52✔
304
                        }
52✔
305
                        return
31✔
306
                case request := <-f.requestChan:
69✔
307
                        if existingRequest, ok := outstandingRequests[request.shardID]; ok {
69✔
308
                                existingRequest.settable.Set(nil, errDuplicatedFetchRequest)
×
309
                        }
×
310
                        outstandingRequests[request.shardID] = request
69✔
311
                case <-fetchTimer.C:
90✔
312
                        var nextFetchInterval time.Duration
90✔
313
                        if err := f.fetch(outstandingRequests); err != nil {
162✔
314
                                f.logger.Error("Failed to fetch cross cluster tasks", tag.Error(err))
72✔
315
                                if common.IsServiceBusyError(err) {
72✔
316
                                        nextFetchInterval = f.options.ServiceBusyBackoffInterval()
×
317
                                        f.metricsScope.IncCounter(metrics.CrossClusterFetchServiceBusyFailures)
×
318
                                } else {
72✔
319
                                        nextFetchInterval = f.options.ErrorRetryInterval()
72✔
320
                                        f.metricsScope.IncCounter(metrics.CrossClusterFetchFailures)
72✔
321
                                }
72✔
322
                        } else {
18✔
323
                                nextFetchInterval = f.options.AggregationInterval()
18✔
324
                        }
18✔
325

326
                        fetchTimer.Reset(backoff.JitDuration(
90✔
327
                                nextFetchInterval,
90✔
328
                                f.options.TimerJitterCoefficient(),
90✔
329
                        ))
90✔
330
                }
331
        }
332
}
333

334
func (f *fetcherImpl) fetch(
335
        outstandingRequests map[int32]fetchRequest,
336
) error {
90✔
337
        if len(outstandingRequests) == 0 {
93✔
338
                return nil
3✔
339
        }
3✔
340

341
        f.metricsScope.IncCounter(metrics.CrossClusterFetchRequests)
87✔
342
        sw := f.metricsScope.StartTimer(metrics.CrossClusterFetchLatency)
87✔
343
        defer sw.Stop()
87✔
344

87✔
345
        responseByShard, err := f.fetchTaskFunc(
87✔
346
                f.fetchCtx,
87✔
347
                f.clientBean,
87✔
348
                f.sourceCluster,
87✔
349
                f.currentCluster,
87✔
350
                outstandingRequests,
87✔
351
        )
87✔
352
        if err != nil {
159✔
353
                return err
72✔
354
        }
72✔
355

356
        for shardID, response := range responseByShard {
32✔
357
                if request, ok := outstandingRequests[shardID]; ok {
34✔
358
                        if fetchErr, ok := response.(error); ok {
17✔
359
                                request.settable.Set(nil, fetchErr)
×
360
                        } else {
17✔
361
                                request.settable.Set(response, nil)
17✔
362
                        }
17✔
363

364
                        delete(outstandingRequests, shardID)
17✔
365
                }
366
        }
367

368
        return nil
15✔
369
}
370

371
func (f *fetcherImpl) drainRequestCh() {
31✔
372
        for {
62✔
373
                select {
31✔
374
                case request := <-f.requestChan:
×
375
                        request.settable.Set(nil, errTaskFetcherShutdown)
×
376
                default:
31✔
377
                        return
31✔
378
                }
379
        }
380
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc