• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01907562-f5f0-40b6-8908-451d758b6138

02 Jul 2024 09:37PM UTC coverage: 71.512% (+0.003%) from 71.509%
01907562-f5f0-40b6-8908-451d758b6138

Pull #6155

buildkite

Groxx
Stop the ratelimiter collections when stopping the service
Pull Request #6155: Stop the ratelimiter collections when stopping the service

9 of 17 new or added lines in 1 file covered. (52.94%)

22 existing lines in 7 files now uncovered.

105315 of 147269 relevant lines covered (71.51%)

2600.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.13
/service/history/shard/context.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination context_mock.go -package shard github.com/uber/cadence/history/shard/context Context
22

23
package shard
24

25
import (
26
        "context"
27
        "errors"
28
        "fmt"
29
        "math"
30
        "sync"
31
        "sync/atomic"
32
        "time"
33

34
        "github.com/uber/cadence/common"
35
        "github.com/uber/cadence/common/backoff"
36
        "github.com/uber/cadence/common/cache"
37
        "github.com/uber/cadence/common/clock"
38
        "github.com/uber/cadence/common/cluster"
39
        "github.com/uber/cadence/common/log"
40
        "github.com/uber/cadence/common/log/tag"
41
        "github.com/uber/cadence/common/metrics"
42
        "github.com/uber/cadence/common/persistence"
43
        "github.com/uber/cadence/common/types"
44
        "github.com/uber/cadence/service/history/config"
45
        "github.com/uber/cadence/service/history/engine"
46
        "github.com/uber/cadence/service/history/events"
47
        "github.com/uber/cadence/service/history/resource"
48
)
49

50
type (
51
        // Context represents a history engine shard
52
        Context interface {
53
                GetShardID() int
54
                GetService() resource.Resource
55
                GetExecutionManager() persistence.ExecutionManager
56
                GetHistoryManager() persistence.HistoryManager
57
                GetDomainCache() cache.DomainCache
58
                GetClusterMetadata() cluster.Metadata
59
                GetConfig() *config.Config
60
                GetEventsCache() events.Cache
61
                GetLogger() log.Logger
62
                GetThrottledLogger() log.Logger
63
                GetMetricsClient() metrics.Client
64
                GetTimeSource() clock.TimeSource
65
                PreviousShardOwnerWasDifferent() bool
66

67
                GetEngine() engine.Engine
68
                SetEngine(engine.Engine)
69

70
                GenerateTransferTaskID() (int64, error)
71
                GenerateTransferTaskIDs(number int) ([]int64, error)
72

73
                GetTransferMaxReadLevel() int64
74
                UpdateTimerMaxReadLevel(cluster string) time.Time
75

76
                SetCurrentTime(cluster string, currentTime time.Time)
77
                GetCurrentTime(cluster string) time.Time
78
                GetLastUpdatedTime() time.Time
79
                GetTimerMaxReadLevel(cluster string) time.Time
80

81
                GetTransferAckLevel() int64
82
                UpdateTransferAckLevel(ackLevel int64) error
83
                GetTransferClusterAckLevel(cluster string) int64
84
                UpdateTransferClusterAckLevel(cluster string, ackLevel int64) error
85
                GetTransferProcessingQueueStates(cluster string) []*types.ProcessingQueueState
86
                UpdateTransferProcessingQueueStates(cluster string, states []*types.ProcessingQueueState) error
87

88
                GetClusterReplicationLevel(cluster string) int64
89
                UpdateClusterReplicationLevel(cluster string, lastTaskID int64) error
90

91
                GetTimerAckLevel() time.Time
92
                UpdateTimerAckLevel(ackLevel time.Time) error
93
                GetTimerClusterAckLevel(cluster string) time.Time
94
                UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error
95
                GetTimerProcessingQueueStates(cluster string) []*types.ProcessingQueueState
96
                UpdateTimerProcessingQueueStates(cluster string, states []*types.ProcessingQueueState) error
97

98
                UpdateTransferFailoverLevel(failoverID string, level TransferFailoverLevel) error
99
                DeleteTransferFailoverLevel(failoverID string) error
100
                GetAllTransferFailoverLevels() map[string]TransferFailoverLevel
101

102
                UpdateTimerFailoverLevel(failoverID string, level TimerFailoverLevel) error
103
                DeleteTimerFailoverLevel(failoverID string) error
104
                GetAllTimerFailoverLevels() map[string]TimerFailoverLevel
105

106
                GetDomainNotificationVersion() int64
107
                UpdateDomainNotificationVersion(domainNotificationVersion int64) error
108

109
                GetWorkflowExecution(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error)
110
                CreateWorkflowExecution(ctx context.Context, request *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error)
111
                UpdateWorkflowExecution(ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error)
112
                ConflictResolveWorkflowExecution(ctx context.Context, request *persistence.ConflictResolveWorkflowExecutionRequest) (*persistence.ConflictResolveWorkflowExecutionResponse, error)
113
                AppendHistoryV2Events(ctx context.Context, request *persistence.AppendHistoryNodesRequest, domainID string, execution types.WorkflowExecution) (*persistence.AppendHistoryNodesResponse, error)
114

115
                ReplicateFailoverMarkers(ctx context.Context, markers []*persistence.FailoverMarkerTask) error
116
                AddingPendingFailoverMarker(*types.FailoverMarkerAttributes) error
117
                ValidateAndUpdateFailoverMarkers() ([]*types.FailoverMarkerAttributes, error)
118
        }
119

120
        // TransferFailoverLevel contains corresponding start / end level
121
        TransferFailoverLevel struct {
122
                StartTime    time.Time
123
                MinLevel     int64
124
                CurrentLevel int64
125
                MaxLevel     int64
126
                DomainIDs    map[string]struct{}
127
        }
128

129
        // TimerFailoverLevel contains domain IDs and corresponding start / end level
130
        TimerFailoverLevel struct {
131
                StartTime    time.Time
132
                MinLevel     time.Time
133
                CurrentLevel time.Time
134
                MaxLevel     time.Time
135
                DomainIDs    map[string]struct{}
136
        }
137

138
        contextImpl struct {
139
                resource.Resource
140

141
                shardItem        *historyShardsItem
142
                shardID          int
143
                rangeID          int64
144
                executionManager persistence.ExecutionManager
145
                eventsCache      events.Cache
146
                closeCallback    func(int, *historyShardsItem)
147
                closedAt         atomic.Pointer[time.Time]
148
                config           *config.Config
149
                logger           log.Logger
150
                throttledLogger  log.Logger
151
                engine           engine.Engine
152

153
                sync.RWMutex
154
                lastUpdated               time.Time
155
                shardInfo                 *persistence.ShardInfo
156
                transferSequenceNumber    int64
157
                maxTransferSequenceNumber int64
158
                transferMaxReadLevel      int64
159
                timerMaxReadLevelMap      map[string]time.Time             // cluster -> timerMaxReadLevel
160
                transferFailoverLevels    map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel
161
                timerFailoverLevels       map[string]TimerFailoverLevel    // uuid -> TimerFailoverLevel
162

163
                // exist only in memory
164
                remoteClusterCurrentTime map[string]time.Time
165

166
                // true if previous owner was different from the acquirer's identity.
167
                previousShardOwnerWasDifferent bool
168
        }
169
)
170

171
var _ Context = (*contextImpl)(nil)
172

173
type ErrShardClosed struct {
174
        Msg      string
175
        ClosedAt time.Time
176
}
177

178
var _ error = (*ErrShardClosed)(nil)
179

180
func (e *ErrShardClosed) Error() string {
8✔
181
        return e.Msg
8✔
182
}
8✔
183

184
const (
185
        TimeBeforeShardClosedIsError = 10 * time.Second
186
)
187

188
const (
189
        // transfer/cross cluster diff/lag is in terms of taskID, which is calculated based on shard rangeID
190
        // on shard movement, taskID will increase by around 1 million
191
        logWarnTransferLevelDiff    = 3000000 // 3 million
192
        logWarnCrossClusterLevelLag = 3000000 // 3 million
193
        logWarnTimerLevelDiff       = time.Duration(30 * time.Minute)
194
        historySizeLogThreshold     = 10 * 1024 * 1024
195
        minContextTimeout           = 1 * time.Second
196
)
197

198
func (s *contextImpl) GetShardID() int {
20,956✔
199
        return s.shardID
20,956✔
200
}
20,956✔
201

202
func (s *contextImpl) GetService() resource.Resource {
10,750✔
203
        return s.Resource
10,750✔
204
}
10,750✔
205

206
func (s *contextImpl) GetExecutionManager() persistence.ExecutionManager {
6,374✔
207
        return s.executionManager
6,374✔
208
}
6,374✔
209

210
func (s *contextImpl) GetEngine() engine.Engine {
13,795✔
211
        return s.engine
13,795✔
212
}
13,795✔
213

214
func (s *contextImpl) SetEngine(engine engine.Engine) {
76✔
215
        s.engine = engine
76✔
216
}
76✔
217

218
func (s *contextImpl) GenerateTransferTaskID() (int64, error) {
3,743✔
219
        s.Lock()
3,743✔
220
        defer s.Unlock()
3,743✔
221

3,743✔
222
        return s.generateTransferTaskIDLocked()
3,743✔
223
}
3,743✔
224

225
func (s *contextImpl) GenerateTransferTaskIDs(number int) ([]int64, error) {
4,292✔
226
        s.Lock()
4,292✔
227
        defer s.Unlock()
4,292✔
228

4,292✔
229
        result := []int64{}
4,292✔
230
        for i := 0; i < number; i++ {
12,243✔
231
                id, err := s.generateTransferTaskIDLocked()
7,951✔
232
                if err != nil {
7,951✔
233
                        return nil, err
×
234
                }
×
235
                result = append(result, id)
7,951✔
236
        }
237
        return result, nil
4,292✔
238
}
239

240
func (s *contextImpl) GetTransferMaxReadLevel() int64 {
2,975✔
241
        s.RLock()
2,975✔
242
        defer s.RUnlock()
2,975✔
243
        return s.transferMaxReadLevel
2,975✔
244
}
2,975✔
245

246
func (s *contextImpl) GetTransferAckLevel() int64 {
77✔
247
        s.RLock()
77✔
248
        defer s.RUnlock()
77✔
249

77✔
250
        return s.shardInfo.TransferAckLevel
77✔
251
}
77✔
252

253
func (s *contextImpl) UpdateTransferAckLevel(ackLevel int64) error {
103✔
254
        s.Lock()
103✔
255
        defer s.Unlock()
103✔
256

103✔
257
        s.shardInfo.TransferAckLevel = ackLevel
103✔
258
        s.shardInfo.StolenSinceRenew = 0
103✔
259
        return s.updateShardInfoLocked()
103✔
260
}
103✔
261

262
func (s *contextImpl) GetTransferClusterAckLevel(cluster string) int64 {
150✔
263
        s.RLock()
150✔
264
        defer s.RUnlock()
150✔
265

150✔
266
        // if we can find corresponding ack level
150✔
267
        if ackLevel, ok := s.shardInfo.ClusterTransferAckLevel[cluster]; ok {
206✔
268
                return ackLevel
56✔
269
        }
56✔
270
        // otherwise, default to existing ack level, which belongs to local cluster
271
        // this can happen if you add more cluster
272
        return s.shardInfo.TransferAckLevel
94✔
273
}
274

275
func (s *contextImpl) UpdateTransferClusterAckLevel(cluster string, ackLevel int64) error {
1✔
276
        s.Lock()
1✔
277
        defer s.Unlock()
1✔
278

1✔
279
        s.shardInfo.ClusterTransferAckLevel[cluster] = ackLevel
1✔
280
        s.shardInfo.StolenSinceRenew = 0
1✔
281
        return s.updateShardInfoLocked()
1✔
282
}
1✔
283

284
func (s *contextImpl) GetTransferProcessingQueueStates(cluster string) []*types.ProcessingQueueState {
149✔
285
        s.RLock()
149✔
286
        defer s.RUnlock()
149✔
287

149✔
288
        // if we can find corresponding processing queue states
149✔
289
        if states, ok := s.shardInfo.TransferProcessingQueueStates.StatesByCluster[cluster]; ok {
150✔
290
                return states
1✔
291
        }
1✔
292

293
        // check if we can find corresponding ack level
294
        var ackLevel int64
148✔
295
        var ok bool
148✔
296
        if ackLevel, ok = s.shardInfo.ClusterTransferAckLevel[cluster]; !ok {
242✔
297
                // otherwise, default to existing ack level, which belongs to local cluster
94✔
298
                // this can happen if you add more cluster
94✔
299
                ackLevel = s.shardInfo.TransferAckLevel
94✔
300
        }
94✔
301

302
        // otherwise, create default queue state based on existing ack level,
303
        // which belongs to local cluster. this can happen if you add more cluster
304
        return []*types.ProcessingQueueState{
148✔
305
                {
148✔
306
                        Level:    common.Int32Ptr(0),
148✔
307
                        AckLevel: common.Int64Ptr(ackLevel),
148✔
308
                        MaxLevel: common.Int64Ptr(math.MaxInt64),
148✔
309
                        DomainFilter: &types.DomainFilter{
148✔
310
                                ReverseMatch: true,
148✔
311
                        },
148✔
312
                },
148✔
313
        }
148✔
314
}
315

316
func (s *contextImpl) UpdateTransferProcessingQueueStates(cluster string, states []*types.ProcessingQueueState) error {
454✔
317
        s.Lock()
454✔
318
        defer s.Unlock()
454✔
319

454✔
320
        if len(states) == 0 {
454✔
321
                return errors.New("empty transfer processing queue states")
×
322
        }
×
323

324
        if s.shardInfo.TransferProcessingQueueStates.StatesByCluster == nil {
454✔
325
                s.shardInfo.TransferProcessingQueueStates.StatesByCluster = make(map[string][]*types.ProcessingQueueState)
×
326
        }
×
327
        s.shardInfo.TransferProcessingQueueStates.StatesByCluster[cluster] = states
454✔
328

454✔
329
        // for backward compatibility
454✔
330
        ackLevel := states[0].GetAckLevel()
454✔
331
        for _, state := range states {
908✔
332
                ackLevel = common.MinInt64(ackLevel, state.GetAckLevel())
454✔
333
        }
454✔
334
        s.shardInfo.ClusterTransferAckLevel[cluster] = ackLevel
454✔
335

454✔
336
        s.shardInfo.StolenSinceRenew = 0
454✔
337
        return s.updateShardInfoLocked()
454✔
338
}
339

340
func (s *contextImpl) GetClusterReplicationLevel(cluster string) int64 {
218✔
341
        s.RLock()
218✔
342
        defer s.RUnlock()
218✔
343

218✔
344
        // if we can find corresponding replication level
218✔
345
        if replicationLevel, ok := s.shardInfo.ClusterReplicationLevel[cluster]; ok {
219✔
346
                return replicationLevel
1✔
347
        }
1✔
348

349
        // New cluster always starts from -1
350
        return -1
217✔
351
}
352

353
func (s *contextImpl) UpdateClusterReplicationLevel(cluster string, lastTaskID int64) error {
2✔
354
        s.Lock()
2✔
355
        defer s.Unlock()
2✔
356

2✔
357
        s.shardInfo.ClusterReplicationLevel[cluster] = lastTaskID
2✔
358
        s.shardInfo.StolenSinceRenew = 0
2✔
359
        return s.updateShardInfoLocked()
2✔
360
}
2✔
361

362
func (s *contextImpl) GetTimerAckLevel() time.Time {
77✔
363
        s.RLock()
77✔
364
        defer s.RUnlock()
77✔
365

77✔
366
        return s.shardInfo.TimerAckLevel
77✔
367
}
77✔
368

369
func (s *contextImpl) UpdateTimerAckLevel(ackLevel time.Time) error {
38✔
370
        s.Lock()
38✔
371
        defer s.Unlock()
38✔
372

38✔
373
        s.shardInfo.TimerAckLevel = ackLevel
38✔
374
        s.shardInfo.StolenSinceRenew = 0
38✔
375
        return s.updateShardInfoLocked()
38✔
376
}
38✔
377

378
func (s *contextImpl) GetTimerClusterAckLevel(cluster string) time.Time {
150✔
379
        s.RLock()
150✔
380
        defer s.RUnlock()
150✔
381

150✔
382
        // if we can find corresponding ack level
150✔
383
        if ackLevel, ok := s.shardInfo.ClusterTimerAckLevel[cluster]; ok {
206✔
384
                return ackLevel
56✔
385
        }
56✔
386
        // otherwise, default to existing ack level, which belongs to local cluster
387
        // this can happen if you add more cluster
388
        return s.shardInfo.TimerAckLevel
94✔
389
}
390

391
func (s *contextImpl) UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error {
1✔
392
        s.Lock()
1✔
393
        defer s.Unlock()
1✔
394

1✔
395
        s.shardInfo.ClusterTimerAckLevel[cluster] = ackLevel
1✔
396
        s.shardInfo.StolenSinceRenew = 0
1✔
397
        return s.updateShardInfoLocked()
1✔
398
}
1✔
399

400
func (s *contextImpl) GetTimerProcessingQueueStates(cluster string) []*types.ProcessingQueueState {
149✔
401
        s.RLock()
149✔
402
        defer s.RUnlock()
149✔
403

149✔
404
        // if we can find corresponding processing queue states
149✔
405
        if states, ok := s.shardInfo.TimerProcessingQueueStates.StatesByCluster[cluster]; ok {
150✔
406
                return states
1✔
407
        }
1✔
408

409
        // check if we can find corresponding ack level
410
        var ackLevel time.Time
148✔
411
        var ok bool
148✔
412
        if ackLevel, ok = s.shardInfo.ClusterTimerAckLevel[cluster]; !ok {
242✔
413
                // otherwise, default to existing ack level, which belongs to local cluster
94✔
414
                // this can happen if you add more cluster
94✔
415
                ackLevel = s.shardInfo.TimerAckLevel
94✔
416
        }
94✔
417

418
        // otherwise, create default queue state based on existing ack level,
419
        // which belongs to local cluster. this can happen if you add more cluster
420
        return []*types.ProcessingQueueState{
148✔
421
                {
148✔
422
                        Level:    common.Int32Ptr(0),
148✔
423
                        AckLevel: common.Int64Ptr(ackLevel.UnixNano()),
148✔
424
                        MaxLevel: common.Int64Ptr(math.MaxInt64),
148✔
425
                        DomainFilter: &types.DomainFilter{
148✔
426
                                ReverseMatch: true,
148✔
427
                        },
148✔
428
                },
148✔
429
        }
148✔
430
}
431

432
func (s *contextImpl) UpdateTimerProcessingQueueStates(cluster string, states []*types.ProcessingQueueState) error {
455✔
433
        s.Lock()
455✔
434
        defer s.Unlock()
455✔
435

455✔
436
        if len(states) == 0 {
455✔
437
                return errors.New("empty transfer processing queue states")
×
438
        }
×
439

440
        if s.shardInfo.TimerProcessingQueueStates.StatesByCluster == nil {
455✔
441
                s.shardInfo.TimerProcessingQueueStates.StatesByCluster = make(map[string][]*types.ProcessingQueueState)
×
442
        }
×
443
        s.shardInfo.TimerProcessingQueueStates.StatesByCluster[cluster] = states
455✔
444

455✔
445
        // for backward compatibility
455✔
446
        ackLevel := states[0].GetAckLevel()
455✔
447
        for _, state := range states {
910✔
448
                ackLevel = common.MinInt64(ackLevel, state.GetAckLevel())
455✔
449
        }
455✔
450
        s.shardInfo.ClusterTimerAckLevel[cluster] = time.Unix(0, ackLevel)
455✔
451

455✔
452
        s.shardInfo.StolenSinceRenew = 0
455✔
453
        return s.updateShardInfoLocked()
455✔
454
}
455

456
func (s *contextImpl) UpdateTransferFailoverLevel(failoverID string, level TransferFailoverLevel) error {
2✔
457
        s.Lock()
2✔
458
        defer s.Unlock()
2✔
459

2✔
460
        s.transferFailoverLevels[failoverID] = level
2✔
461
        return nil
2✔
462
}
2✔
463

464
func (s *contextImpl) DeleteTransferFailoverLevel(failoverID string) error {
1✔
465
        s.Lock()
1✔
466
        defer s.Unlock()
1✔
467

1✔
468
        if level, ok := s.transferFailoverLevels[failoverID]; ok {
2✔
469
                s.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardInfoTransferFailoverLatencyTimer, time.Since(level.StartTime))
1✔
470
                delete(s.transferFailoverLevels, failoverID)
1✔
471
        }
1✔
472
        return nil
1✔
473
}
474

475
func (s *contextImpl) GetAllTransferFailoverLevels() map[string]TransferFailoverLevel {
111✔
476
        s.RLock()
111✔
477
        defer s.RUnlock()
111✔
478

111✔
479
        ret := map[string]TransferFailoverLevel{}
111✔
480
        for k, v := range s.transferFailoverLevels {
114✔
481
                ret[k] = v
3✔
482
        }
3✔
483
        return ret
111✔
484
}
485

486
func (s *contextImpl) UpdateTimerFailoverLevel(failoverID string, level TimerFailoverLevel) error {
2✔
487
        s.Lock()
2✔
488
        defer s.Unlock()
2✔
489

2✔
490
        s.timerFailoverLevels[failoverID] = level
2✔
491
        return nil
2✔
492
}
2✔
493

494
func (s *contextImpl) DeleteTimerFailoverLevel(failoverID string) error {
1✔
495
        s.Lock()
1✔
496
        defer s.Unlock()
1✔
497

1✔
498
        if level, ok := s.timerFailoverLevels[failoverID]; ok {
2✔
499
                s.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardInfoTimerFailoverLatencyTimer, time.Since(level.StartTime))
1✔
500
                delete(s.timerFailoverLevels, failoverID)
1✔
501
        }
1✔
502
        return nil
1✔
503
}
504

505
func (s *contextImpl) GetAllTimerFailoverLevels() map[string]TimerFailoverLevel {
111✔
506
        s.RLock()
111✔
507
        defer s.RUnlock()
111✔
508

111✔
509
        ret := map[string]TimerFailoverLevel{}
111✔
510
        for k, v := range s.timerFailoverLevels {
114✔
511
                ret[k] = v
3✔
512
        }
3✔
513
        return ret
111✔
514
}
515

516
func (s *contextImpl) GetDomainNotificationVersion() int64 {
125✔
517
        s.RLock()
125✔
518
        defer s.RUnlock()
125✔
519

125✔
520
        return s.shardInfo.DomainNotificationVersion
125✔
521
}
125✔
522

523
func (s *contextImpl) UpdateDomainNotificationVersion(domainNotificationVersion int64) error {
51✔
524
        s.Lock()
51✔
525
        defer s.Unlock()
51✔
526

51✔
527
        s.shardInfo.DomainNotificationVersion = domainNotificationVersion
51✔
528
        return s.updateShardInfoLocked()
51✔
529
}
51✔
530

531
func (s *contextImpl) GetTimerMaxReadLevel(cluster string) time.Time {
2✔
532
        s.RLock()
2✔
533
        defer s.RUnlock()
2✔
534

2✔
535
        return s.timerMaxReadLevelMap[cluster]
2✔
536
}
2✔
537

538
func (s *contextImpl) UpdateTimerMaxReadLevel(cluster string) time.Time {
2,470✔
539
        s.Lock()
2,470✔
540
        defer s.Unlock()
2,470✔
541

2,470✔
542
        currentTime := s.GetTimeSource().Now()
2,470✔
543
        if cluster != "" && cluster != s.GetClusterMetadata().GetCurrentClusterName() {
2,546✔
544
                currentTime = s.remoteClusterCurrentTime[cluster]
76✔
545
        }
76✔
546

547
        s.timerMaxReadLevelMap[cluster] = currentTime.Add(s.config.TimerProcessorMaxTimeShift()).Truncate(time.Millisecond)
2,470✔
548
        return s.timerMaxReadLevelMap[cluster]
2,470✔
549
}
550

551
func (s *contextImpl) GetWorkflowExecution(
552
        ctx context.Context,
553
        request *persistence.GetWorkflowExecutionRequest,
554
) (*persistence.GetWorkflowExecutionResponse, error) {
1,141✔
555
        request.RangeID = atomic.LoadInt64(&s.rangeID) // This is to make sure read is not blocked by write, s.rangeID is synced with s.shardInfo.RangeID
1,141✔
556
        if err := s.closedError(); err != nil {
1,142✔
557
                return nil, err
1✔
558
        }
1✔
559
        return s.executionManager.GetWorkflowExecution(ctx, request)
1,140✔
560
}
561

562
func (s *contextImpl) CreateWorkflowExecution(
563
        ctx context.Context,
564
        request *persistence.CreateWorkflowExecutionRequest,
565
) (*persistence.CreateWorkflowExecutionResponse, error) {
527✔
566
        if err := s.closedError(); err != nil {
528✔
567
                return nil, err
1✔
568
        }
1✔
569

570
        ctx, cancel, err := s.ensureMinContextTimeout(ctx)
526✔
571
        if err != nil {
526✔
572
                return nil, err
×
573
        }
×
574
        if cancel != nil {
529✔
575
                defer cancel()
3✔
576
        }
3✔
577

578
        domainID := request.NewWorkflowSnapshot.ExecutionInfo.DomainID
526✔
579
        workflowID := request.NewWorkflowSnapshot.ExecutionInfo.WorkflowID
526✔
580

526✔
581
        // do not try to get domain cache within shard lock
526✔
582
        domainEntry, err := s.GetDomainCache().GetDomainByID(domainID)
526✔
583
        if err != nil {
526✔
584
                return nil, err
×
585
        }
×
586

587
        s.Lock()
526✔
588
        defer s.Unlock()
526✔
589

526✔
590
        transferMaxReadLevel := int64(0)
526✔
591
        if err := s.allocateTaskIDsLocked(
526✔
592
                domainEntry,
526✔
593
                workflowID,
526✔
594
                request.NewWorkflowSnapshot.TransferTasks,
526✔
595
                request.NewWorkflowSnapshot.CrossClusterTasks,
526✔
596
                request.NewWorkflowSnapshot.ReplicationTasks,
526✔
597
                request.NewWorkflowSnapshot.TimerTasks,
526✔
598
                &transferMaxReadLevel,
526✔
599
        ); err != nil {
526✔
600
                return nil, err
×
601
        }
×
602

603
        if err := s.closedError(); err != nil {
526✔
604
                return nil, err
×
605
        }
×
606
        currentRangeID := s.getRangeID()
526✔
607
        request.RangeID = currentRangeID
526✔
608

526✔
609
        response, err := s.executionManager.CreateWorkflowExecution(ctx, request)
526✔
610
        switch err.(type) {
526✔
611
        case nil:
469✔
612
                // Update MaxReadLevel if write to DB succeeds
469✔
613
                s.updateMaxReadLevelLocked(transferMaxReadLevel)
469✔
614
                return response, nil
469✔
615
        case *types.WorkflowExecutionAlreadyStartedError,
616
                *persistence.WorkflowExecutionAlreadyStartedError,
617
                *persistence.CurrentWorkflowConditionFailedError,
618
                *persistence.DuplicateRequestError,
619
                *types.ServiceBusyError:
60✔
620
                // No special handling required for these errors
60✔
621
                // We know write to DB fails if these errors are returned
60✔
622
                return nil, err
60✔
623
        case *persistence.ShardOwnershipLostError:
×
624
                {
×
625
                        // Shard is stolen, trigger shutdown of history engine
×
626
                        s.logger.Warn(
×
627
                                "Closing shard: CreateWorkflowExecution failed due to stolen shard.",
×
628
                                tag.Error(err),
×
629
                        )
×
630
                        s.closeShard()
×
631
                        return nil, err
×
632
                }
×
633
        default:
×
634
                {
×
635
                        // We have no idea if the write failed or will eventually make it to
×
636
                        // persistence. Increment RangeID to guarantee that subsequent reads
×
637
                        // will either see that write, or know for certain that it failed.
×
638
                        // This allows the callers to reliably check the outcome by performing
×
639
                        // a read.
×
640
                        err1 := s.renewRangeLocked(false)
×
641
                        if err1 != nil {
×
642
                                // At this point we have no choice but to unload the shard, so that it
×
643
                                // gets a new RangeID when it's reloaded.
×
644
                                s.logger.Warn(
×
645
                                        "Closing shard: CreateWorkflowExecution failed due to unknown error.",
×
646
                                        tag.Error(err),
×
647
                                )
×
648
                                s.closeShard()
×
649
                        }
×
650
                        return nil, err
×
651
                }
652
        }
653
}
654

655
func (s *contextImpl) getDefaultEncoding(domainName string) common.EncodingType {
8,182✔
656
        return common.EncodingType(s.config.EventEncodingType(domainName))
8,182✔
657
}
8,182✔
658

659
func (s *contextImpl) UpdateWorkflowExecution(
660
        ctx context.Context,
661
        request *persistence.UpdateWorkflowExecutionRequest,
662
) (*persistence.UpdateWorkflowExecutionResponse, error) {
4,446✔
663
        if err := s.closedError(); err != nil {
4,447✔
664
                return nil, err
1✔
665
        }
1✔
666
        ctx, cancel, err := s.ensureMinContextTimeout(ctx)
4,445✔
667
        if err != nil {
4,445✔
668
                return nil, err
×
669
        }
×
670
        if cancel != nil {
4,445✔
671
                defer cancel()
×
672
        }
×
673

674
        domainID := request.UpdateWorkflowMutation.ExecutionInfo.DomainID
4,445✔
675
        workflowID := request.UpdateWorkflowMutation.ExecutionInfo.WorkflowID
4,445✔
676

4,445✔
677
        // do not try to get domain cache within shard lock
4,445✔
678
        domainEntry, err := s.GetDomainCache().GetDomainByID(domainID)
4,445✔
679
        if err != nil {
4,445✔
680
                return nil, err
×
681
        }
×
682
        request.Encoding = s.getDefaultEncoding(domainEntry.GetInfo().Name)
4,445✔
683

4,445✔
684
        s.Lock()
4,445✔
685
        defer s.Unlock()
4,445✔
686

4,445✔
687
        transferMaxReadLevel := int64(0)
4,445✔
688
        if err := s.allocateTaskIDsLocked(
4,445✔
689
                domainEntry,
4,445✔
690
                workflowID,
4,445✔
691
                request.UpdateWorkflowMutation.TransferTasks,
4,445✔
692
                request.UpdateWorkflowMutation.CrossClusterTasks,
4,445✔
693
                request.UpdateWorkflowMutation.ReplicationTasks,
4,445✔
694
                request.UpdateWorkflowMutation.TimerTasks,
4,445✔
695
                &transferMaxReadLevel,
4,445✔
696
        ); err != nil {
4,445✔
697
                return nil, err
×
698
        }
×
699
        if request.NewWorkflowSnapshot != nil {
4,619✔
700
                if err := s.allocateTaskIDsLocked(
174✔
701
                        domainEntry,
174✔
702
                        workflowID,
174✔
703
                        request.NewWorkflowSnapshot.TransferTasks,
174✔
704
                        request.NewWorkflowSnapshot.CrossClusterTasks,
174✔
705
                        request.NewWorkflowSnapshot.ReplicationTasks,
174✔
706
                        request.NewWorkflowSnapshot.TimerTasks,
174✔
707
                        &transferMaxReadLevel,
174✔
708
                ); err != nil {
174✔
709
                        return nil, err
×
710
                }
×
711
        }
712

713
        if err := s.closedError(); err != nil {
4,445✔
714
                return nil, err
×
715
        }
×
716
        currentRangeID := s.getRangeID()
4,445✔
717
        request.RangeID = currentRangeID
4,445✔
718

4,445✔
719
        resp, err := s.executionManager.UpdateWorkflowExecution(ctx, request)
4,445✔
720
        switch err.(type) {
4,445✔
721
        case nil:
4,445✔
722
                // Update MaxReadLevel if write to DB succeeds
4,445✔
723
                s.updateMaxReadLevelLocked(transferMaxReadLevel)
4,445✔
724
                return resp, nil
4,445✔
725
        case *persistence.ConditionFailedError,
726
                *persistence.DuplicateRequestError,
727
                *types.ServiceBusyError:
×
728
                // No special handling required for these errors
×
729
                // We know write to DB fails if these errors are returned
×
730
                return nil, err
×
731
        case *persistence.ShardOwnershipLostError:
×
732
                {
×
733
                        // Shard is stolen, trigger shutdown of history engine
×
734
                        s.logger.Warn(
×
735
                                "Closing shard: UpdateWorkflowExecution failed due to stolen shard.",
×
736
                                tag.Error(err),
×
737
                        )
×
738
                        s.closeShard()
×
739
                        return nil, err
×
740
                }
×
741
        default:
×
742
                {
×
743
                        // We have no idea if the write failed or will eventually make it to
×
744
                        // persistence. Increment RangeID to guarantee that subsequent reads
×
745
                        // will either see that write, or know for certain that it failed.
×
746
                        // This allows the callers to reliably check the outcome by performing
×
747
                        // a read.
×
748
                        err1 := s.renewRangeLocked(false)
×
749
                        if err1 != nil {
×
750
                                // At this point we have no choice but to unload the shard, so that it
×
751
                                // gets a new RangeID when it's reloaded.
×
752
                                s.logger.Warn(
×
753
                                        "Closing shard: UpdateWorkflowExecution failed due to unknown error.",
×
754
                                        tag.Error(err),
×
755
                                )
×
756
                                s.closeShard()
×
757
                        }
×
758
                        return nil, err
×
759
                }
760
        }
761
}
762

763
func (s *contextImpl) ConflictResolveWorkflowExecution(
764
        ctx context.Context,
765
        request *persistence.ConflictResolveWorkflowExecutionRequest,
766
) (*persistence.ConflictResolveWorkflowExecutionResponse, error) {
4✔
767
        if err := s.closedError(); err != nil {
5✔
768
                return nil, err
1✔
769
        }
1✔
770

771
        ctx, cancel, err := s.ensureMinContextTimeout(ctx)
3✔
772
        if err != nil {
3✔
773
                return nil, err
×
774
        }
×
775
        if cancel != nil {
3✔
776
                defer cancel()
×
777
        }
×
778

779
        domainID := request.ResetWorkflowSnapshot.ExecutionInfo.DomainID
3✔
780
        workflowID := request.ResetWorkflowSnapshot.ExecutionInfo.WorkflowID
3✔
781

3✔
782
        // do not try to get domain cache within shard lock
3✔
783
        domainEntry, err := s.GetDomainCache().GetDomainByID(domainID)
3✔
784
        if err != nil {
3✔
785
                return nil, err
×
786
        }
×
787
        request.Encoding = s.getDefaultEncoding(domainEntry.GetInfo().Name)
3✔
788

3✔
789
        s.Lock()
3✔
790
        defer s.Unlock()
3✔
791

3✔
792
        transferMaxReadLevel := int64(0)
3✔
793
        if request.CurrentWorkflowMutation != nil {
3✔
UNCOV
794
                if err := s.allocateTaskIDsLocked(
×
UNCOV
795
                        domainEntry,
×
UNCOV
796
                        workflowID,
×
UNCOV
797
                        request.CurrentWorkflowMutation.TransferTasks,
×
UNCOV
798
                        request.CurrentWorkflowMutation.CrossClusterTasks,
×
UNCOV
799
                        request.CurrentWorkflowMutation.ReplicationTasks,
×
UNCOV
800
                        request.CurrentWorkflowMutation.TimerTasks,
×
UNCOV
801
                        &transferMaxReadLevel,
×
UNCOV
802
                ); err != nil {
×
803
                        return nil, err
×
804
                }
×
805
        }
806
        if err := s.allocateTaskIDsLocked(
3✔
807
                domainEntry,
3✔
808
                workflowID,
3✔
809
                request.ResetWorkflowSnapshot.TransferTasks,
3✔
810
                request.ResetWorkflowSnapshot.CrossClusterTasks,
3✔
811
                request.ResetWorkflowSnapshot.ReplicationTasks,
3✔
812
                request.ResetWorkflowSnapshot.TimerTasks,
3✔
813
                &transferMaxReadLevel,
3✔
814
        ); err != nil {
3✔
815
                return nil, err
×
816
        }
×
817
        if request.NewWorkflowSnapshot != nil {
3✔
818
                if err := s.allocateTaskIDsLocked(
×
819
                        domainEntry,
×
820
                        workflowID,
×
821
                        request.NewWorkflowSnapshot.TransferTasks,
×
822
                        request.NewWorkflowSnapshot.CrossClusterTasks,
×
823
                        request.NewWorkflowSnapshot.ReplicationTasks,
×
824
                        request.NewWorkflowSnapshot.TimerTasks,
×
825
                        &transferMaxReadLevel,
×
826
                ); err != nil {
×
827
                        return nil, err
×
828
                }
×
829
        }
830

831
        if err := s.closedError(); err != nil {
3✔
832
                return nil, err
×
833
        }
×
834
        currentRangeID := s.getRangeID()
3✔
835
        request.RangeID = currentRangeID
3✔
836
        resp, err := s.executionManager.ConflictResolveWorkflowExecution(ctx, request)
3✔
837
        switch err.(type) {
3✔
838
        case nil:
3✔
839
                // Update MaxReadLevel if write to DB succeeds
3✔
840
                s.updateMaxReadLevelLocked(transferMaxReadLevel)
3✔
841
                return resp, nil
3✔
842
        case *persistence.ConditionFailedError,
843
                *types.ServiceBusyError:
×
844
                // No special handling required for these errors
×
845
                // We know write to DB fails if these errors are returned
×
846
                return nil, err
×
847
        case *persistence.ShardOwnershipLostError:
×
848
                {
×
849
                        // RangeID might have been renewed by the same host while this update was in flight
×
850
                        // Retry the operation if we still have the shard ownership
×
851
                        // Shard is stolen, trigger shutdown of history engine
×
852
                        s.logger.Warn(
×
853
                                "Closing shard: ConflictResolveWorkflowExecution failed due to stolen shard.",
×
854
                                tag.Error(err),
×
855
                        )
×
856
                        s.closeShard()
×
857
                        return nil, err
×
858
                }
×
859
        default:
×
860
                {
×
861
                        // We have no idea if the write failed or will eventually make it to
×
862
                        // persistence. Increment RangeID to guarantee that subsequent reads
×
863
                        // will either see that write, or know for certain that it failed.
×
864
                        // This allows the callers to reliably check the outcome by performing
×
865
                        // a read.
×
866
                        err1 := s.renewRangeLocked(false)
×
867
                        if err1 != nil {
×
868
                                // At this point we have no choice but to unload the shard, so that it
×
869
                                // gets a new RangeID when it's reloaded.
×
870
                                s.logger.Warn(
×
871
                                        "Closing shard: ConflictResolveWorkflowExecution failed due to unknown error.",
×
872
                                        tag.Error(err),
×
873
                                )
×
874
                                s.closeShard()
×
875
                        }
×
876
                        return nil, err
×
877
                }
878
        }
879
}
880

881
func (s *contextImpl) ensureMinContextTimeout(
882
        parent context.Context,
883
) (context.Context, context.CancelFunc, error) {
4,968✔
884
        if err := parent.Err(); err != nil {
4,968✔
885
                return nil, nil, err
×
886
        }
×
887

888
        deadline, ok := parent.Deadline()
4,968✔
889
        if !ok || deadline.Sub(s.GetTimeSource().Now()) >= minContextTimeout {
9,933✔
890
                return parent, nil, nil
4,965✔
891
        }
4,965✔
892

893
        childCtx, cancel := context.WithTimeout(context.Background(), minContextTimeout)
3✔
894
        return childCtx, cancel, nil
3✔
895
}
896

897
func (s *contextImpl) AppendHistoryV2Events(
898
        ctx context.Context,
899
        request *persistence.AppendHistoryNodesRequest,
900
        domainID string,
901
        execution types.WorkflowExecution,
902
) (*persistence.AppendHistoryNodesResponse, error) {
3,741✔
903
        if err := s.closedError(); err != nil {
3,742✔
904
                return nil, err
1✔
905
        }
1✔
906

907
        domainName, err := s.GetDomainCache().GetDomainName(domainID)
3,740✔
908
        if err != nil {
3,740✔
909
                return nil, err
×
910
        }
×
911

912
        // NOTE: do not use generateNextTransferTaskIDLocked since
913
        // generateNextTransferTaskIDLocked is not guarded by lock
914
        transactionID, err := s.GenerateTransferTaskID()
3,740✔
915
        if err != nil {
3,740✔
916
                return nil, err
×
917
        }
×
918

919
        request.Encoding = s.getDefaultEncoding(domainName)
3,740✔
920
        request.ShardID = common.IntPtr(s.shardID)
3,740✔
921
        request.TransactionID = transactionID
3,740✔
922

3,740✔
923
        size := 0
3,740✔
924
        defer func() {
7,480✔
925
                s.GetMetricsClient().Scope(metrics.SessionSizeStatsScope, metrics.DomainTag(domainName)).
3,740✔
926
                        RecordTimer(metrics.HistorySize, time.Duration(size))
3,740✔
927
                if size >= historySizeLogThreshold {
3,740✔
928
                        s.throttledLogger.Warn("history size threshold breached",
×
929
                                tag.WorkflowID(execution.GetWorkflowID()),
×
930
                                tag.WorkflowRunID(execution.GetRunID()),
×
931
                                tag.WorkflowDomainID(domainID),
×
932
                                tag.WorkflowHistorySizeBytes(size))
×
933
                }
×
934
        }()
935
        resp, err0 := s.GetHistoryManager().AppendHistoryNodes(ctx, request)
3,740✔
936
        if resp != nil {
7,480✔
937
                size = len(resp.DataBlob.Data)
3,740✔
938
        }
3,740✔
939
        return resp, err0
3,740✔
940
}
941

942
func (s *contextImpl) GetConfig() *config.Config {
53,704✔
943
        return s.config
53,704✔
944
}
53,704✔
945

946
func (s *contextImpl) PreviousShardOwnerWasDifferent() bool {
93✔
947
        return s.previousShardOwnerWasDifferent
93✔
948
}
93✔
949

950
func (s *contextImpl) GetEventsCache() events.Cache {
1,441✔
951
        // the shard needs to be restarted to release the shard cache once global mode is on.
1,441✔
952
        if s.config.EventsCacheGlobalEnable() {
1,441✔
953
                return s.GetEventCache()
×
954
        }
×
955
        return s.eventsCache
1,441✔
956
}
957

958
func (s *contextImpl) GetLogger() log.Logger {
1,372✔
959
        return s.logger
1,372✔
960
}
1,372✔
961

962
func (s *contextImpl) GetThrottledLogger() log.Logger {
148✔
963
        return s.throttledLogger
148✔
964
}
148✔
965

966
func (s *contextImpl) getRangeID() int64 {
4,970✔
967
        return s.shardInfo.RangeID
4,970✔
968
}
4,970✔
969

970
func (s *contextImpl) closedError() error {
16,006✔
971
        closedAt := s.closedAt.Load()
16,006✔
972
        if closedAt == nil {
32,004✔
973
                return nil
15,998✔
974
        }
15,998✔
975

976
        return &ErrShardClosed{
8✔
977
                Msg:      "shard closed",
8✔
978
                ClosedAt: *closedAt,
8✔
979
        }
8✔
980
}
981

982
func (s *contextImpl) closeShard() {
3✔
983
        if !s.closedAt.CompareAndSwap(nil, common.TimePtr(time.Now())) {
4✔
984
                return
1✔
985
        }
1✔
986

987
        go func() {
4✔
988
                s.closeCallback(s.shardID, s.shardItem)
2✔
989
        }()
2✔
990

991
        // fails any writes that may start after this point.
992
        s.shardInfo.RangeID = -1
2✔
993
        atomic.StoreInt64(&s.rangeID, s.shardInfo.RangeID)
2✔
994
}
995

996
func (s *contextImpl) generateTransferTaskIDLocked() (int64, error) {
17,990✔
997
        if err := s.updateRangeIfNeededLocked(); err != nil {
17,990✔
998
                return -1, err
×
999
        }
×
1000

1001
        taskID := s.transferSequenceNumber
17,990✔
1002
        s.transferSequenceNumber++
17,990✔
1003

17,990✔
1004
        return taskID, nil
17,990✔
1005
}
1006

1007
func (s *contextImpl) updateRangeIfNeededLocked() error {
17,990✔
1008
        if s.transferSequenceNumber < s.maxTransferSequenceNumber {
35,979✔
1009
                return nil
17,989✔
1010
        }
17,989✔
1011

1012
        return s.renewRangeLocked(false)
1✔
1013
}
1014

1015
func (s *contextImpl) renewRangeLocked(isStealing bool) error {
96✔
1016
        updatedShardInfo := s.shardInfo.Copy()
96✔
1017
        updatedShardInfo.RangeID++
96✔
1018
        if isStealing {
189✔
1019
                updatedShardInfo.StolenSinceRenew++
93✔
1020
        }
93✔
1021

1022
        var err error
96✔
1023
        if err := s.closedError(); err != nil {
97✔
1024
                return err
1✔
1025
        }
1✔
1026
        err = s.GetShardManager().UpdateShard(context.Background(), &persistence.UpdateShardRequest{
95✔
1027
                ShardInfo:       updatedShardInfo,
95✔
1028
                PreviousRangeID: s.shardInfo.RangeID})
95✔
1029
        switch err.(type) {
95✔
1030
        case nil:
94✔
1031
        case *persistence.ShardOwnershipLostError:
×
1032
                // Shard is stolen, trigger history engine shutdown
×
1033
                s.logger.Warn(
×
1034
                        "Closing shard: renewRangeLocked failed due to stolen shard.",
×
1035
                        tag.Error(err),
×
1036
                )
×
1037
                s.closeShard()
×
1038
        default:
1✔
1039
                s.logger.Warn("UpdateShard failed with an unknown error.",
1✔
1040
                        tag.Error(err),
1✔
1041
                        tag.ShardRangeID(updatedShardInfo.RangeID),
1✔
1042
                        tag.PreviousShardRangeID(s.shardInfo.RangeID))
1✔
1043
        }
1044
        if err != nil {
96✔
1045
                // Failure in updating shard to grab new RangeID
1✔
1046
                s.logger.Error("renewRangeLocked failed.",
1✔
1047
                        tag.StoreOperationUpdateShard,
1✔
1048
                        tag.Error(err),
1✔
1049
                        tag.ShardRangeID(updatedShardInfo.RangeID),
1✔
1050
                        tag.PreviousShardRangeID(s.shardInfo.RangeID))
1✔
1051
                return err
1✔
1052
        }
1✔
1053

1054
        // Range is successfully updated in cassandra now update shard context to reflect new range
1055
        s.transferSequenceNumber = updatedShardInfo.RangeID << s.config.RangeSizeBits
94✔
1056
        s.maxTransferSequenceNumber = (updatedShardInfo.RangeID + 1) << s.config.RangeSizeBits
94✔
1057
        s.transferMaxReadLevel = s.transferSequenceNumber - 1
94✔
1058
        atomic.StoreInt64(&s.rangeID, updatedShardInfo.RangeID)
94✔
1059
        s.shardInfo = updatedShardInfo
94✔
1060

94✔
1061
        s.logger.Info("Range updated for shardID",
94✔
1062
                tag.ShardRangeID(s.shardInfo.RangeID),
94✔
1063
                tag.Number(s.transferSequenceNumber),
94✔
1064
                tag.NextNumber(s.maxTransferSequenceNumber))
94✔
1065
        return nil
94✔
1066
}
1067

1068
func (s *contextImpl) updateMaxReadLevelLocked(rl int64) {
4,912✔
1069
        if rl > s.transferMaxReadLevel {
7,035✔
1070
                s.logger.Debug(fmt.Sprintf("Updating MaxReadLevel: %v", rl))
2,123✔
1071
                s.transferMaxReadLevel = rl
2,123✔
1072
        }
2,123✔
1073
}
1074

1075
func (s *contextImpl) updateShardInfoLocked() error {
1,099✔
1076
        return s.persistShardInfoLocked(false)
1,099✔
1077
}
1,099✔
1078

1079
func (s *contextImpl) forceUpdateShardInfoLocked() error {
1✔
1080
        return s.persistShardInfoLocked(true)
1✔
1081
}
1✔
1082

1083
func (s *contextImpl) persistShardInfoLocked(
1084
        isForced bool,
1085
) error {
1,101✔
1086

1,101✔
1087
        if err := s.closedError(); err != nil {
1,102✔
1088
                return err
1✔
1089
        }
1✔
1090

1091
        var err error
1,100✔
1092
        now := clock.NewRealTimeSource().Now()
1,100✔
1093
        if !isForced && s.lastUpdated.Add(s.config.ShardUpdateMinInterval()).After(now) {
2,119✔
1094
                return nil
1,019✔
1095
        }
1,019✔
1096
        updatedShardInfo := s.shardInfo.Copy()
84✔
1097
        s.emitShardInfoMetricsLogsLocked()
84✔
1098

84✔
1099
        err = s.GetShardManager().UpdateShard(context.Background(), &persistence.UpdateShardRequest{
84✔
1100
                ShardInfo:       updatedShardInfo,
84✔
1101
                PreviousRangeID: s.shardInfo.RangeID,
84✔
1102
        })
84✔
1103

84✔
1104
        if err != nil {
85✔
1105
                // Shard is stolen, trigger history engine shutdown
1✔
1106
                if _, ok := err.(*persistence.ShardOwnershipLostError); ok {
2✔
1107
                        s.logger.Warn(
1✔
1108
                                "Closing shard: updateShardInfoLocked failed due to stolen shard.",
1✔
1109
                                tag.Error(err),
1✔
1110
                        )
1✔
1111
                        s.closeShard()
1✔
1112
                }
1✔
1113
        } else {
83✔
1114
                s.lastUpdated = now
83✔
1115
        }
83✔
1116

1117
        return err
84✔
1118
}
1119

1120
func (s *contextImpl) emitShardInfoMetricsLogsLocked() {
84✔
1121
        currentCluster := s.GetClusterMetadata().GetCurrentClusterName()
84✔
1122
        clusterInfo := s.GetClusterMetadata().GetAllClusterInfo()
84✔
1123

84✔
1124
        minTransferLevel := s.shardInfo.ClusterTransferAckLevel[currentCluster]
84✔
1125
        maxTransferLevel := s.shardInfo.ClusterTransferAckLevel[currentCluster]
84✔
1126
        for clusterName, v := range s.shardInfo.ClusterTransferAckLevel {
162✔
1127
                if !clusterInfo[clusterName].Enabled {
78✔
1128
                        continue
×
1129
                }
1130

1131
                if v < minTransferLevel {
84✔
1132
                        minTransferLevel = v
6✔
1133
                }
6✔
1134
                if v > maxTransferLevel {
84✔
1135
                        maxTransferLevel = v
6✔
1136
                }
6✔
1137
        }
1138
        diffTransferLevel := maxTransferLevel - minTransferLevel
84✔
1139

84✔
1140
        minTimerLevel := s.shardInfo.ClusterTimerAckLevel[currentCluster]
84✔
1141
        maxTimerLevel := s.shardInfo.ClusterTimerAckLevel[currentCluster]
84✔
1142
        for clusterName, v := range s.shardInfo.ClusterTimerAckLevel {
156✔
1143
                if !clusterInfo[clusterName].Enabled {
72✔
1144
                        continue
×
1145
                }
1146

1147
                if v.Before(minTimerLevel) {
84✔
1148
                        minTimerLevel = v
12✔
1149
                }
12✔
1150
                if v.After(maxTimerLevel) {
74✔
1151
                        maxTimerLevel = v
2✔
1152
                }
2✔
1153
        }
1154
        diffTimerLevel := maxTimerLevel.Sub(minTimerLevel)
84✔
1155

84✔
1156
        replicationLag := s.transferMaxReadLevel - s.shardInfo.ReplicationAckLevel
84✔
1157
        transferLag := s.transferMaxReadLevel - s.shardInfo.TransferAckLevel
84✔
1158
        timerLag := time.Since(s.shardInfo.TimerAckLevel)
84✔
1159

84✔
1160
        transferFailoverInProgress := len(s.transferFailoverLevels)
84✔
1161
        timerFailoverInProgress := len(s.timerFailoverLevels)
84✔
1162

84✔
1163
        if s.config.EmitShardDiffLog() &&
84✔
1164
                (logWarnTransferLevelDiff < diffTransferLevel ||
84✔
1165
                        logWarnTimerLevelDiff < diffTimerLevel ||
84✔
1166
                        logWarnTransferLevelDiff < transferLag ||
84✔
1167
                        logWarnTimerLevelDiff < timerLag) {
84✔
1168

×
1169
                logger := s.logger.WithTags(
×
1170
                        tag.ShardTime(s.remoteClusterCurrentTime),
×
1171
                        tag.ShardReplicationAck(s.shardInfo.ReplicationAckLevel),
×
1172
                        tag.ShardTimerAcks(s.shardInfo.ClusterTimerAckLevel),
×
1173
                        tag.ShardTransferAcks(s.shardInfo.ClusterTransferAckLevel),
×
1174
                )
×
1175

×
1176
                logger.Warn("Shard ack levels diff exceeds warn threshold.")
×
1177
        }
×
1178

1179
        metricsScope := s.GetMetricsClient().Scope(metrics.ShardInfoScope)
84✔
1180
        metricsScope.RecordTimer(metrics.ShardInfoTransferDiffTimer, time.Duration(diffTransferLevel))
84✔
1181
        metricsScope.RecordTimer(metrics.ShardInfoTimerDiffTimer, diffTimerLevel)
84✔
1182

84✔
1183
        metricsScope.RecordTimer(metrics.ShardInfoReplicationLagTimer, time.Duration(replicationLag))
84✔
1184
        metricsScope.RecordTimer(metrics.ShardInfoTransferLagTimer, time.Duration(transferLag))
84✔
1185
        metricsScope.RecordTimer(metrics.ShardInfoTimerLagTimer, timerLag)
84✔
1186

84✔
1187
        metricsScope.RecordTimer(metrics.ShardInfoTransferFailoverInProgressTimer, time.Duration(transferFailoverInProgress))
84✔
1188
        metricsScope.RecordTimer(metrics.ShardInfoTimerFailoverInProgressTimer, time.Duration(timerFailoverInProgress))
84✔
1189
}
1190

1191
func (s *contextImpl) allocateTaskIDsLocked(
1192
        domainEntry *cache.DomainCacheEntry,
1193
        workflowID string,
1194
        transferTasks []persistence.Task,
1195
        crossClusterTasks []persistence.Task,
1196
        replicationTasks []persistence.Task,
1197
        timerTasks []persistence.Task,
1198
        transferMaxReadLevel *int64,
1199
) error {
5,139✔
1200

5,139✔
1201
        if err := s.allocateTransferIDsLocked(
5,139✔
1202
                transferTasks,
5,139✔
1203
                transferMaxReadLevel,
5,139✔
1204
        ); err != nil {
5,139✔
1205
                return err
×
1206
        }
×
1207
        if err := s.allocateTransferIDsLocked(
5,139✔
1208
                crossClusterTasks,
5,139✔
1209
                transferMaxReadLevel,
5,139✔
1210
        ); err != nil {
5,139✔
1211
                return err
×
1212
        }
×
1213
        if err := s.allocateTimerIDsLocked(
5,139✔
1214
                domainEntry,
5,139✔
1215
                workflowID,
5,139✔
1216
                timerTasks,
5,139✔
1217
        ); err != nil {
5,139✔
1218
                return err
×
1219
        }
×
1220

1221
        // Ensure that task IDs for replication tasks are generated last.
1222
        // This allows optimizing replication by checking whether there no potential tasks to read.
1223
        return s.allocateTransferIDsLocked(
5,139✔
1224
                replicationTasks,
5,139✔
1225
                transferMaxReadLevel,
5,139✔
1226
        )
5,139✔
1227
}
1228

1229
func (s *contextImpl) allocateTransferIDsLocked(
1230
        tasks []persistence.Task,
1231
        transferMaxReadLevel *int64,
1232
) error {
15,412✔
1233
        now := s.GetTimeSource().Now()
15,412✔
1234

15,412✔
1235
        for _, task := range tasks {
18,450✔
1236
                id, err := s.generateTransferTaskIDLocked()
3,038✔
1237
                if err != nil {
3,038✔
1238
                        return err
×
1239
                }
×
1240
                s.logger.Debug(fmt.Sprintf("Assigning task ID: %v", id))
3,038✔
1241
                task.SetTaskID(id)
3,038✔
1242
                // only set task visibility timestamp if it's not set
3,038✔
1243
                if task.GetVisibilityTimestamp().IsZero() {
6,046✔
1244
                        task.SetVisibilityTimestamp(now)
3,008✔
1245
                }
3,008✔
1246
                *transferMaxReadLevel = id
3,038✔
1247
        }
1248
        return nil
15,412✔
1249
}
1250

1251
// NOTE: allocateTimerIDsLocked should always been called after assigning taskID for transferTasks when assigning taskID together,
1252
// because Cadence Indexer assume timer taskID of deleteWorkflowExecution is larger than transfer taskID of closeWorkflowExecution
1253
// for a given workflow.
1254
func (s *contextImpl) allocateTimerIDsLocked(
1255
        domainEntry *cache.DomainCacheEntry,
1256
        workflowID string,
1257
        timerTasks []persistence.Task,
1258
) error {
5,139✔
1259

5,139✔
1260
        // assign IDs for the timer tasks. They need to be assigned under shard lock.
5,139✔
1261
        currentCluster := s.GetClusterMetadata().GetCurrentClusterName()
5,139✔
1262
        for _, task := range timerTasks {
8,403✔
1263
                ts := task.GetVisibilityTimestamp()
3,264✔
1264
                if task.GetVersion() != common.EmptyVersion {
3,267✔
1265
                        // cannot use version to determine the corresponding cluster for timer task
3✔
1266
                        // this is because during failover, timer task should be created as active
3✔
1267
                        // or otherwise, failover + active processing logic may not pick up the task.
3✔
1268
                        currentCluster = domainEntry.GetReplicationConfig().ActiveClusterName
3✔
1269
                }
3✔
1270
                readCursorTS := s.timerMaxReadLevelMap[currentCluster]
3,264✔
1271
                if ts.Before(readCursorTS) {
3,329✔
1272
                        // This can happen if shard move and new host have a time SKU, or there is db write delay.
65✔
1273
                        // We generate a new timer ID using timerMaxReadLevel.
65✔
1274
                        s.logger.Warn("New timer generated is less than read level",
65✔
1275
                                tag.WorkflowDomainID(domainEntry.GetInfo().ID),
65✔
1276
                                tag.WorkflowID(workflowID),
65✔
1277
                                tag.Timestamp(ts),
65✔
1278
                                tag.CursorTimestamp(readCursorTS),
65✔
1279
                                tag.ValueShardAllocateTimerBeforeRead)
65✔
1280
                        task.SetVisibilityTimestamp(s.timerMaxReadLevelMap[currentCluster].Add(time.Millisecond))
65✔
1281
                }
65✔
1282

1283
                seqNum, err := s.generateTransferTaskIDLocked()
3,264✔
1284
                if err != nil {
3,264✔
1285
                        return err
×
1286
                }
×
1287
                task.SetTaskID(seqNum)
3,264✔
1288
                visibilityTs := task.GetVisibilityTimestamp()
3,264✔
1289
                s.logger.Debug(fmt.Sprintf("Assigning new timer (timestamp: %v, seq: %v)) ackLeveL: %v",
3,264✔
1290
                        visibilityTs, task.GetTaskID(), s.shardInfo.TimerAckLevel))
3,264✔
1291
        }
1292
        return nil
5,139✔
1293
}
1294

1295
func (s *contextImpl) SetCurrentTime(cluster string, currentTime time.Time) {
4✔
1296
        s.Lock()
4✔
1297
        defer s.Unlock()
4✔
1298
        if cluster != s.GetClusterMetadata().GetCurrentClusterName() {
8✔
1299
                prevTime := s.remoteClusterCurrentTime[cluster]
4✔
1300
                if prevTime.Before(currentTime) {
8✔
1301
                        s.remoteClusterCurrentTime[cluster] = currentTime
4✔
1302
                }
4✔
1303
        } else {
×
1304
                panic("Cannot set current time for current cluster")
×
1305
        }
1306
}
1307

1308
func (s *contextImpl) GetCurrentTime(cluster string) time.Time {
8,949✔
1309
        s.RLock()
8,949✔
1310
        defer s.RUnlock()
8,949✔
1311
        if cluster != s.GetClusterMetadata().GetCurrentClusterName() {
9,168✔
1312
                return s.remoteClusterCurrentTime[cluster]
219✔
1313
        }
219✔
1314
        return s.GetTimeSource().Now()
8,733✔
1315
}
1316

1317
func (s *contextImpl) GetLastUpdatedTime() time.Time {
75✔
1318
        s.RLock()
75✔
1319
        defer s.RUnlock()
75✔
1320
        return s.lastUpdated
75✔
1321
}
75✔
1322

1323
func (s *contextImpl) ReplicateFailoverMarkers(
1324
        ctx context.Context,
1325
        markers []*persistence.FailoverMarkerTask,
1326
) error {
2✔
1327
        if err := s.closedError(); err != nil {
3✔
1328
                return err
1✔
1329
        }
1✔
1330

1331
        tasks := make([]persistence.Task, 0, len(markers))
1✔
1332
        for _, marker := range markers {
1✔
1333
                tasks = append(tasks, marker)
×
1334
        }
×
1335

1336
        s.Lock()
1✔
1337
        defer s.Unlock()
1✔
1338

1✔
1339
        transferMaxReadLevel := int64(0)
1✔
1340
        if err := s.allocateTransferIDsLocked(
1✔
1341
                tasks,
1✔
1342
                &transferMaxReadLevel,
1✔
1343
        ); err != nil {
1✔
1344
                return err
×
1345
        }
×
1346

1347
        var err error
1✔
1348
        if err := s.closedError(); err != nil {
1✔
1349
                return err
×
1350
        }
×
1351
        err = s.executionManager.CreateFailoverMarkerTasks(
1✔
1352
                ctx,
1✔
1353
                &persistence.CreateFailoverMarkersRequest{
1✔
1354
                        RangeID: s.getRangeID(),
1✔
1355
                        Markers: markers,
1✔
1356
                },
1✔
1357
        )
1✔
1358
        switch err.(type) {
1✔
1359
        case nil:
1✔
1360
                // Update MaxReadLevel if write to DB succeeds
1✔
1361
                s.updateMaxReadLevelLocked(transferMaxReadLevel)
1✔
1362
        case *persistence.ShardOwnershipLostError:
×
1363
                // do not retry on ShardOwnershipLostError
×
1364
                s.logger.Warn(
×
1365
                        "Closing shard: ReplicateFailoverMarkers failed due to stolen shard.",
×
1366
                        tag.Error(err),
×
1367
                )
×
1368
                s.closeShard()
×
1369
        default:
×
1370
                s.logger.Error(
×
1371
                        "Failed to insert the failover marker into replication queue.",
×
1372
                        tag.Error(err),
×
1373
                )
×
1374
        }
1375
        return err
1✔
1376
}
1377

1378
func (s *contextImpl) AddingPendingFailoverMarker(
1379
        marker *types.FailoverMarkerAttributes,
1380
) error {
1✔
1381

1✔
1382
        domainEntry, err := s.GetDomainCache().GetDomainByID(marker.GetDomainID())
1✔
1383
        if err != nil {
1✔
1384
                return err
×
1385
        }
×
1386
        // domain is active, the marker is expired
1387
        isActive, _ := domainEntry.IsActiveIn(s.GetClusterMetadata().GetCurrentClusterName())
1✔
1388
        if isActive || domainEntry.GetFailoverVersion() > marker.GetFailoverVersion() {
1✔
1389
                s.logger.Info("Skipped out-of-date failover marker", tag.WorkflowDomainName(domainEntry.GetInfo().Name))
×
1390
                return nil
×
1391
        }
×
1392

1393
        s.Lock()
1✔
1394
        defer s.Unlock()
1✔
1395

1✔
1396
        s.shardInfo.PendingFailoverMarkers = append(s.shardInfo.PendingFailoverMarkers, marker)
1✔
1397
        return s.forceUpdateShardInfoLocked()
1✔
1398
}
1399

1400
func (s *contextImpl) ValidateAndUpdateFailoverMarkers() ([]*types.FailoverMarkerAttributes, error) {
1,488✔
1401

1,488✔
1402
        completedFailoverMarkers := make(map[*types.FailoverMarkerAttributes]struct{})
1,488✔
1403
        s.RLock()
1,488✔
1404
        for _, marker := range s.shardInfo.PendingFailoverMarkers {
1,489✔
1405
                domainEntry, err := s.GetDomainCache().GetDomainByID(marker.GetDomainID())
1✔
1406
                if err != nil {
1✔
1407
                        s.RUnlock()
×
1408
                        return nil, err
×
1409
                }
×
1410
                isActive, _ := domainEntry.IsActiveIn(s.GetClusterMetadata().GetCurrentClusterName())
1✔
1411
                if isActive || domainEntry.GetFailoverVersion() > marker.GetFailoverVersion() {
2✔
1412
                        completedFailoverMarkers[marker] = struct{}{}
1✔
1413
                }
1✔
1414
        }
1415

1416
        if len(completedFailoverMarkers) == 0 {
2,975✔
1417
                s.RUnlock()
1,487✔
1418
                return s.shardInfo.PendingFailoverMarkers, nil
1,487✔
1419
        }
1,487✔
1420
        s.RUnlock()
1✔
1421

1✔
1422
        // clean up all pending failover tasks
1✔
1423
        s.Lock()
1✔
1424
        defer s.Unlock()
1✔
1425

1✔
1426
        for idx, marker := range s.shardInfo.PendingFailoverMarkers {
2✔
1427
                if _, ok := completedFailoverMarkers[marker]; ok {
2✔
1428
                        s.shardInfo.PendingFailoverMarkers[idx] = s.shardInfo.PendingFailoverMarkers[len(s.shardInfo.PendingFailoverMarkers)-1]
1✔
1429
                        s.shardInfo.PendingFailoverMarkers[len(s.shardInfo.PendingFailoverMarkers)-1] = nil
1✔
1430
                        s.shardInfo.PendingFailoverMarkers = s.shardInfo.PendingFailoverMarkers[:len(s.shardInfo.PendingFailoverMarkers)-1]
1✔
1431
                }
1✔
1432
        }
1433
        if err := s.updateShardInfoLocked(); err != nil {
1✔
1434
                return nil, err
×
1435
        }
×
1436

1437
        return s.shardInfo.PendingFailoverMarkers, nil
1✔
1438
}
1439

1440
func acquireShard(
1441
        shardItem *historyShardsItem,
1442
        closeCallback func(int, *historyShardsItem),
1443
) (Context, error) {
93✔
1444

93✔
1445
        var shardInfo *persistence.ShardInfo
93✔
1446

93✔
1447
        retryPolicy := backoff.NewExponentialRetryPolicy(50 * time.Millisecond)
93✔
1448
        retryPolicy.SetMaximumInterval(time.Second)
93✔
1449
        retryPolicy.SetExpirationInterval(5 * time.Second)
93✔
1450

93✔
1451
        retryPredicate := func(err error) bool {
93✔
1452
                if persistence.IsTransientError(err) {
×
1453
                        return true
×
1454
                }
×
1455
                _, ok := err.(*persistence.ShardAlreadyExistError)
×
1456
                return ok
×
1457
        }
1458

1459
        getShard := func() error {
186✔
1460
                resp, err := shardItem.GetShardManager().GetShard(context.Background(), &persistence.GetShardRequest{
93✔
1461
                        ShardID: shardItem.shardID,
93✔
1462
                })
93✔
1463
                if err == nil {
165✔
1464
                        shardInfo = resp.ShardInfo
72✔
1465
                        return nil
72✔
1466
                }
72✔
1467
                if _, ok := err.(*types.EntityNotExistsError); !ok {
21✔
1468
                        return err
×
1469
                }
×
1470

1471
                // EntityNotExistsError error
1472
                shardInfo = &persistence.ShardInfo{
21✔
1473
                        ShardID:          shardItem.shardID,
21✔
1474
                        RangeID:          0,
21✔
1475
                        TransferAckLevel: 0,
21✔
1476
                }
21✔
1477
                return shardItem.GetShardManager().CreateShard(context.Background(), &persistence.CreateShardRequest{ShardInfo: shardInfo})
21✔
1478
        }
1479

1480
        throttleRetry := backoff.NewThrottleRetry(
93✔
1481
                backoff.WithRetryPolicy(retryPolicy),
93✔
1482
                backoff.WithRetryableError(retryPredicate),
93✔
1483
        )
93✔
1484
        err := throttleRetry.Do(context.Background(), getShard)
93✔
1485
        if err != nil {
93✔
1486
                shardItem.logger.Error("Fail to acquire shard.", tag.Error(err))
×
1487
                return nil, err
×
1488
        }
×
1489

1490
        updatedShardInfo := shardInfo.Copy()
93✔
1491
        ownershipChanged := shardInfo.Owner != shardItem.GetHostInfo().Identity()
93✔
1492
        updatedShardInfo.Owner = shardItem.GetHostInfo().Identity()
93✔
1493

93✔
1494
        // initialize the cluster current time to be the same as ack level
93✔
1495
        remoteClusterCurrentTime := make(map[string]time.Time)
93✔
1496
        timerMaxReadLevelMap := make(map[string]time.Time)
93✔
1497
        for clusterName := range shardItem.GetClusterMetadata().GetEnabledClusterInfo() {
276✔
1498
                if clusterName != shardItem.GetClusterMetadata().GetCurrentClusterName() {
276✔
1499
                        if currentTime, ok := shardInfo.ClusterTimerAckLevel[clusterName]; ok {
111✔
1500
                                remoteClusterCurrentTime[clusterName] = currentTime
18✔
1501
                                timerMaxReadLevelMap[clusterName] = currentTime
18✔
1502
                        } else {
93✔
1503
                                remoteClusterCurrentTime[clusterName] = shardInfo.TimerAckLevel
75✔
1504
                                timerMaxReadLevelMap[clusterName] = shardInfo.TimerAckLevel
75✔
1505
                        }
75✔
1506
                } else { // active cluster
93✔
1507
                        timerMaxReadLevelMap[clusterName] = shardInfo.TimerAckLevel
93✔
1508
                }
93✔
1509

1510
                timerMaxReadLevelMap[clusterName] = timerMaxReadLevelMap[clusterName].Truncate(time.Millisecond)
183✔
1511
        }
1512

1513
        if updatedShardInfo.TransferProcessingQueueStates == nil {
186✔
1514
                updatedShardInfo.TransferProcessingQueueStates = &types.ProcessingQueueStates{
93✔
1515
                        StatesByCluster: make(map[string][]*types.ProcessingQueueState),
93✔
1516
                }
93✔
1517
        }
93✔
1518
        if updatedShardInfo.TimerProcessingQueueStates == nil {
186✔
1519
                updatedShardInfo.TimerProcessingQueueStates = &types.ProcessingQueueStates{
93✔
1520
                        StatesByCluster: make(map[string][]*types.ProcessingQueueState),
93✔
1521
                }
93✔
1522
        }
93✔
1523

1524
        executionMgr, err := shardItem.GetExecutionManager(shardItem.shardID)
93✔
1525
        if err != nil {
93✔
1526
                return nil, err
×
1527
        }
×
1528

1529
        context := &contextImpl{
93✔
1530
                Resource:                       shardItem.Resource,
93✔
1531
                shardItem:                      shardItem,
93✔
1532
                shardID:                        shardItem.shardID,
93✔
1533
                executionManager:               executionMgr,
93✔
1534
                shardInfo:                      updatedShardInfo,
93✔
1535
                closeCallback:                  closeCallback,
93✔
1536
                config:                         shardItem.config,
93✔
1537
                remoteClusterCurrentTime:       remoteClusterCurrentTime,
93✔
1538
                timerMaxReadLevelMap:           timerMaxReadLevelMap, // use ack to init read level
93✔
1539
                transferFailoverLevels:         make(map[string]TransferFailoverLevel),
93✔
1540
                timerFailoverLevels:            make(map[string]TimerFailoverLevel),
93✔
1541
                logger:                         shardItem.logger,
93✔
1542
                throttledLogger:                shardItem.throttledLogger,
93✔
1543
                previousShardOwnerWasDifferent: ownershipChanged,
93✔
1544
        }
93✔
1545

93✔
1546
        // TODO remove once migrated to global event cache
93✔
1547
        context.eventsCache = events.NewCache(
93✔
1548
                context.shardID,
93✔
1549
                context.Resource.GetHistoryManager(),
93✔
1550
                context.config,
93✔
1551
                context.logger,
93✔
1552
                context.Resource.GetMetricsClient(),
93✔
1553
                shardItem.GetDomainCache(),
93✔
1554
        )
93✔
1555

93✔
1556
        context.logger.Debug(fmt.Sprintf("Global event cache mode: %v", context.config.EventsCacheGlobalEnable()))
93✔
1557

93✔
1558
        err1 := context.renewRangeLocked(true)
93✔
1559
        if err1 != nil {
93✔
1560
                return nil, err1
×
1561
        }
×
1562

1563
        return context, nil
93✔
1564
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc