• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.33
/common/persistence/persistenceRateLimitedClients.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package persistence
22

23
import (
24
        "context"
25

26
        "github.com/uber/cadence/common/log"
27
        "github.com/uber/cadence/common/quotas"
28
        "github.com/uber/cadence/common/types"
29
)
30

31
var (
32
        // ErrPersistenceLimitExceeded is the error indicating QPS limit reached.
33
        ErrPersistenceLimitExceeded = &types.ServiceBusyError{Message: "Persistence Max QPS Reached."}
34
)
35

36
type (
37
        shardRateLimitedPersistenceClient struct {
38
                rateLimiter quotas.Limiter
39
                persistence ShardManager
40
                logger      log.Logger
41
        }
42

43
        workflowExecutionRateLimitedPersistenceClient struct {
44
                rateLimiter quotas.Limiter
45
                persistence ExecutionManager
46
                logger      log.Logger
47
        }
48

49
        taskRateLimitedPersistenceClient struct {
50
                rateLimiter quotas.Limiter
51
                persistence TaskManager
52
                logger      log.Logger
53
        }
54

55
        historyRateLimitedPersistenceClient struct {
56
                rateLimiter quotas.Limiter
57
                persistence HistoryManager
58
                logger      log.Logger
59
        }
60

61
        metadataRateLimitedPersistenceClient struct {
62
                rateLimiter quotas.Limiter
63
                persistence DomainManager
64
                logger      log.Logger
65
        }
66

67
        visibilityRateLimitedPersistenceClient struct {
68
                rateLimiter quotas.Limiter
69
                persistence VisibilityManager
70
                logger      log.Logger
71
        }
72

73
        queueRateLimitedPersistenceClient struct {
74
                rateLimiter quotas.Limiter
75
                persistence QueueManager
76
                logger      log.Logger
77
        }
78

79
        configStoreRateLimitedPersistenceClient struct {
80
                rateLimiter quotas.Limiter
81
                persistence ConfigStoreManager
82
                logger      log.Logger
83
        }
84
)
85

86
var _ ShardManager = (*shardRateLimitedPersistenceClient)(nil)
87
var _ ExecutionManager = (*workflowExecutionRateLimitedPersistenceClient)(nil)
88
var _ TaskManager = (*taskRateLimitedPersistenceClient)(nil)
89
var _ HistoryManager = (*historyRateLimitedPersistenceClient)(nil)
90
var _ DomainManager = (*metadataRateLimitedPersistenceClient)(nil)
91
var _ VisibilityManager = (*visibilityRateLimitedPersistenceClient)(nil)
92
var _ QueueManager = (*queueRateLimitedPersistenceClient)(nil)
93
var _ ConfigStoreManager = (*configStoreRateLimitedPersistenceClient)(nil)
94

95
// NewShardPersistenceRateLimitedClient creates a client to manage shards
96
func NewShardPersistenceRateLimitedClient(
97
        persistence ShardManager,
98
        rateLimiter quotas.Limiter,
99
        logger log.Logger,
100
) ShardManager {
39✔
101
        return &shardRateLimitedPersistenceClient{
39✔
102
                persistence: persistence,
39✔
103
                rateLimiter: rateLimiter,
39✔
104
                logger:      logger,
39✔
105
        }
39✔
106
}
39✔
107

108
// NewWorkflowExecutionPersistenceRateLimitedClient creates a client to manage executions
109
func NewWorkflowExecutionPersistenceRateLimitedClient(
110
        persistence ExecutionManager,
111
        rateLimiter quotas.Limiter,
112
        logger log.Logger,
113
) ExecutionManager {
51✔
114
        return &workflowExecutionRateLimitedPersistenceClient{
51✔
115
                persistence: persistence,
51✔
116
                rateLimiter: rateLimiter,
51✔
117
                logger:      logger,
51✔
118
        }
51✔
119
}
51✔
120

121
// NewTaskPersistenceRateLimitedClient creates a client to manage tasks
122
func NewTaskPersistenceRateLimitedClient(
123
        persistence TaskManager,
124
        rateLimiter quotas.Limiter,
125
        logger log.Logger,
126
) TaskManager {
39✔
127
        return &taskRateLimitedPersistenceClient{
39✔
128
                persistence: persistence,
39✔
129
                rateLimiter: rateLimiter,
39✔
130
                logger:      logger,
39✔
131
        }
39✔
132
}
39✔
133

134
// NewHistoryPersistenceRateLimitedClient creates a HistoryManager client to manage workflow execution history
135
func NewHistoryPersistenceRateLimitedClient(
136
        persistence HistoryManager,
137
        rateLimiter quotas.Limiter,
138
        logger log.Logger,
139
) HistoryManager {
39✔
140
        return &historyRateLimitedPersistenceClient{
39✔
141
                persistence: persistence,
39✔
142
                rateLimiter: rateLimiter,
39✔
143
                logger:      logger,
39✔
144
        }
39✔
145
}
39✔
146

147
// NewDomainPersistenceRateLimitedClient creates a DomainManager client to manage metadata
148
func NewDomainPersistenceRateLimitedClient(
149
        persistence DomainManager,
150
        rateLimiter quotas.Limiter,
151
        logger log.Logger,
152
) DomainManager {
39✔
153
        return &metadataRateLimitedPersistenceClient{
39✔
154
                persistence: persistence,
39✔
155
                rateLimiter: rateLimiter,
39✔
156
                logger:      logger,
39✔
157
        }
39✔
158
}
39✔
159

160
// NewVisibilityPersistenceRateLimitedClient creates a client to manage visibility
161
func NewVisibilityPersistenceRateLimitedClient(
162
        persistence VisibilityManager,
163
        rateLimiter quotas.Limiter,
164
        logger log.Logger,
165
) VisibilityManager {
33✔
166
        return &visibilityRateLimitedPersistenceClient{
33✔
167
                persistence: persistence,
33✔
168
                rateLimiter: rateLimiter,
33✔
169
                logger:      logger,
33✔
170
        }
33✔
171
}
33✔
172

173
// NewQueuePersistenceRateLimitedClient creates a client to manage queue
174
func NewQueuePersistenceRateLimitedClient(
175
        persistence QueueManager,
176
        rateLimiter quotas.Limiter,
177
        logger log.Logger,
178
) QueueManager {
39✔
179
        return &queueRateLimitedPersistenceClient{
39✔
180
                persistence: persistence,
39✔
181
                rateLimiter: rateLimiter,
39✔
182
                logger:      logger,
39✔
183
        }
39✔
184
}
39✔
185

186
// NewConfigStorePersistenceRateLimitedClient creates a client to manage config store
187
func NewConfigStorePersistenceRateLimitedClient(
188
        persistence ConfigStoreManager,
189
        rateLimiter quotas.Limiter,
190
        logger log.Logger,
191
) ConfigStoreManager {
39✔
192
        return &configStoreRateLimitedPersistenceClient{
39✔
193
                persistence: persistence,
39✔
194
                rateLimiter: rateLimiter,
39✔
195
                logger:      logger,
39✔
196
        }
39✔
197
}
39✔
198

199
func (p *shardRateLimitedPersistenceClient) GetName() string {
×
200
        return p.persistence.GetName()
×
201
}
×
202

203
func (p *shardRateLimitedPersistenceClient) CreateShard(
204
        ctx context.Context,
205
        request *CreateShardRequest,
206
) error {
15✔
207
        if ok := p.rateLimiter.Allow(); !ok {
15✔
208
                return ErrPersistenceLimitExceeded
×
209
        }
×
210

211
        err := p.persistence.CreateShard(ctx, request)
15✔
212
        return err
15✔
213
}
214

215
func (p *shardRateLimitedPersistenceClient) GetShard(
216
        ctx context.Context,
217
        request *GetShardRequest,
218
) (*GetShardResponse, error) {
51✔
219
        if ok := p.rateLimiter.Allow(); !ok {
51✔
220
                return nil, ErrPersistenceLimitExceeded
×
221
        }
×
222

223
        response, err := p.persistence.GetShard(ctx, request)
51✔
224
        return response, err
51✔
225
}
226

227
func (p *shardRateLimitedPersistenceClient) UpdateShard(
228
        ctx context.Context,
229
        request *UpdateShardRequest,
230
) error {
107✔
231
        if ok := p.rateLimiter.Allow(); !ok {
107✔
232
                return ErrPersistenceLimitExceeded
×
233
        }
×
234

235
        err := p.persistence.UpdateShard(ctx, request)
107✔
236
        return err
107✔
237
}
238

239
func (p *shardRateLimitedPersistenceClient) Close() {
39✔
240
        p.persistence.Close()
39✔
241
}
39✔
242

243
func (p *workflowExecutionRateLimitedPersistenceClient) GetName() string {
×
244
        return p.persistence.GetName()
×
245
}
×
246

247
func (p *workflowExecutionRateLimitedPersistenceClient) GetShardID() int {
11,553✔
248
        return p.persistence.GetShardID()
11,553✔
249
}
11,553✔
250

251
func (p *workflowExecutionRateLimitedPersistenceClient) CreateWorkflowExecution(
252
        ctx context.Context,
253
        request *CreateWorkflowExecutionRequest,
254
) (*CreateWorkflowExecutionResponse, error) {
504✔
255
        if ok := p.rateLimiter.Allow(); !ok {
504✔
256
                return nil, ErrPersistenceLimitExceeded
×
257
        }
×
258

259
        response, err := p.persistence.CreateWorkflowExecution(ctx, request)
504✔
260
        return response, err
504✔
261
}
262

263
func (p *workflowExecutionRateLimitedPersistenceClient) GetWorkflowExecution(
264
        ctx context.Context,
265
        request *GetWorkflowExecutionRequest,
266
) (*GetWorkflowExecutionResponse, error) {
1,117✔
267
        if ok := p.rateLimiter.Allow(); !ok {
1,117✔
268
                return nil, ErrPersistenceLimitExceeded
×
269
        }
×
270

271
        response, err := p.persistence.GetWorkflowExecution(ctx, request)
1,117✔
272
        return response, err
1,117✔
273
}
274

275
func (p *workflowExecutionRateLimitedPersistenceClient) UpdateWorkflowExecution(
276
        ctx context.Context,
277
        request *UpdateWorkflowExecutionRequest,
278
) (*UpdateWorkflowExecutionResponse, error) {
4,414✔
279
        if ok := p.rateLimiter.Allow(); !ok {
4,414✔
280
                return nil, ErrPersistenceLimitExceeded
×
281
        }
×
282

283
        resp, err := p.persistence.UpdateWorkflowExecution(ctx, request)
4,414✔
284
        return resp, err
4,414✔
285
}
286

287
func (p *workflowExecutionRateLimitedPersistenceClient) ConflictResolveWorkflowExecution(
288
        ctx context.Context,
289
        request *ConflictResolveWorkflowExecutionRequest,
290
) (*ConflictResolveWorkflowExecutionResponse, error) {
3✔
291
        if ok := p.rateLimiter.Allow(); !ok {
3✔
292
                return nil, ErrPersistenceLimitExceeded
×
293
        }
×
294

295
        resp, err := p.persistence.ConflictResolveWorkflowExecution(ctx, request)
3✔
296
        return resp, err
3✔
297
}
298

299
func (p *workflowExecutionRateLimitedPersistenceClient) DeleteWorkflowExecution(
300
        ctx context.Context,
301
        request *DeleteWorkflowExecutionRequest,
302
) error {
54✔
303
        if ok := p.rateLimiter.Allow(); !ok {
54✔
304
                return ErrPersistenceLimitExceeded
×
305
        }
×
306

307
        err := p.persistence.DeleteWorkflowExecution(ctx, request)
54✔
308
        return err
54✔
309
}
310

311
func (p *workflowExecutionRateLimitedPersistenceClient) DeleteCurrentWorkflowExecution(
312
        ctx context.Context,
313
        request *DeleteCurrentWorkflowExecutionRequest,
314
) error {
54✔
315
        if ok := p.rateLimiter.Allow(); !ok {
54✔
316
                return ErrPersistenceLimitExceeded
×
317
        }
×
318

319
        err := p.persistence.DeleteCurrentWorkflowExecution(ctx, request)
54✔
320
        return err
54✔
321
}
322

323
func (p *workflowExecutionRateLimitedPersistenceClient) GetCurrentExecution(
324
        ctx context.Context,
325
        request *GetCurrentExecutionRequest,
326
) (*GetCurrentExecutionResponse, error) {
183✔
327
        if ok := p.rateLimiter.Allow(); !ok {
183✔
328
                return nil, ErrPersistenceLimitExceeded
×
329
        }
×
330

331
        response, err := p.persistence.GetCurrentExecution(ctx, request)
183✔
332
        return response, err
183✔
333
}
334

335
func (p *workflowExecutionRateLimitedPersistenceClient) ListCurrentExecutions(
336
        ctx context.Context,
337
        request *ListCurrentExecutionsRequest,
338
) (*ListCurrentExecutionsResponse, error) {
×
339
        if ok := p.rateLimiter.Allow(); !ok {
×
340
                return nil, ErrPersistenceLimitExceeded
×
341
        }
×
342

343
        response, err := p.persistence.ListCurrentExecutions(ctx, request)
×
344
        return response, err
×
345
}
346

347
func (p *workflowExecutionRateLimitedPersistenceClient) IsWorkflowExecutionExists(
348
        ctx context.Context,
349
        request *IsWorkflowExecutionExistsRequest,
350
) (*IsWorkflowExecutionExistsResponse, error) {
×
351
        if ok := p.rateLimiter.Allow(); !ok {
×
352
                return nil, ErrPersistenceLimitExceeded
×
353
        }
×
354

355
        response, err := p.persistence.IsWorkflowExecutionExists(ctx, request)
×
356
        return response, err
×
357
}
358

359
func (p *workflowExecutionRateLimitedPersistenceClient) ListConcreteExecutions(
360
        ctx context.Context,
361
        request *ListConcreteExecutionsRequest,
362
) (*ListConcreteExecutionsResponse, error) {
×
363
        if ok := p.rateLimiter.Allow(); !ok {
×
364
                return nil, ErrPersistenceLimitExceeded
×
365
        }
×
366

367
        response, err := p.persistence.ListConcreteExecutions(ctx, request)
×
368
        return response, err
×
369
}
370

371
func (p *workflowExecutionRateLimitedPersistenceClient) GetTransferTasks(
372
        ctx context.Context,
373
        request *GetTransferTasksRequest,
374
) (*GetTransferTasksResponse, error) {
2,317✔
375
        if ok := p.rateLimiter.Allow(); !ok {
2,317✔
376
                return nil, ErrPersistenceLimitExceeded
×
377
        }
×
378

379
        response, err := p.persistence.GetTransferTasks(ctx, request)
2,317✔
380
        return response, err
2,317✔
381
}
382

383
func (p *workflowExecutionRateLimitedPersistenceClient) GetCrossClusterTasks(
384
        ctx context.Context,
385
        request *GetCrossClusterTasksRequest,
386
) (*GetCrossClusterTasksResponse, error) {
163✔
387
        if ok := p.rateLimiter.Allow(); !ok {
163✔
388
                return nil, ErrPersistenceLimitExceeded
×
389
        }
×
390

391
        response, err := p.persistence.GetCrossClusterTasks(ctx, request)
163✔
392
        return response, err
163✔
393
}
394

395
func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTasks(
396
        ctx context.Context,
397
        request *GetReplicationTasksRequest,
398
) (*GetReplicationTasksResponse, error) {
122✔
399
        if ok := p.rateLimiter.Allow(); !ok {
122✔
400
                return nil, ErrPersistenceLimitExceeded
×
401
        }
×
402

403
        response, err := p.persistence.GetReplicationTasks(ctx, request)
122✔
404
        return response, err
122✔
405
}
406

407
func (p *workflowExecutionRateLimitedPersistenceClient) CompleteTransferTask(
408
        ctx context.Context,
409
        request *CompleteTransferTaskRequest,
410
) error {
×
411
        if ok := p.rateLimiter.Allow(); !ok {
×
412
                return ErrPersistenceLimitExceeded
×
413
        }
×
414

415
        err := p.persistence.CompleteTransferTask(ctx, request)
×
416
        return err
×
417
}
418

419
func (p *workflowExecutionRateLimitedPersistenceClient) RangeCompleteTransferTask(
420
        ctx context.Context,
421
        request *RangeCompleteTransferTaskRequest,
422
) (*RangeCompleteTransferTaskResponse, error) {
112✔
423
        if ok := p.rateLimiter.Allow(); !ok {
112✔
424
                return nil, ErrPersistenceLimitExceeded
×
425
        }
×
426

427
        return p.persistence.RangeCompleteTransferTask(ctx, request)
112✔
428
}
429

430
func (p *workflowExecutionRateLimitedPersistenceClient) CompleteCrossClusterTask(
431
        ctx context.Context,
432
        request *CompleteCrossClusterTaskRequest,
433
) error {
×
434
        if ok := p.rateLimiter.Allow(); !ok {
×
435
                return ErrPersistenceLimitExceeded
×
436
        }
×
437

438
        err := p.persistence.CompleteCrossClusterTask(ctx, request)
×
439
        return err
×
440
}
441

442
func (p *workflowExecutionRateLimitedPersistenceClient) RangeCompleteCrossClusterTask(
443
        ctx context.Context,
444
        request *RangeCompleteCrossClusterTaskRequest,
445
) (*RangeCompleteCrossClusterTaskResponse, error) {
137✔
446
        if ok := p.rateLimiter.Allow(); !ok {
137✔
447
                return nil, ErrPersistenceLimitExceeded
×
448
        }
×
449

450
        return p.persistence.RangeCompleteCrossClusterTask(ctx, request)
137✔
451
}
452

453
func (p *workflowExecutionRateLimitedPersistenceClient) CompleteReplicationTask(
454
        ctx context.Context,
455
        request *CompleteReplicationTaskRequest,
456
) error {
×
457
        if ok := p.rateLimiter.Allow(); !ok {
×
458
                return ErrPersistenceLimitExceeded
×
459
        }
×
460

461
        err := p.persistence.CompleteReplicationTask(ctx, request)
×
462
        return err
×
463
}
464

465
func (p *workflowExecutionRateLimitedPersistenceClient) RangeCompleteReplicationTask(
466
        ctx context.Context,
467
        request *RangeCompleteReplicationTaskRequest,
468
) (*RangeCompleteReplicationTaskResponse, error) {
120✔
469
        if ok := p.rateLimiter.Allow(); !ok {
120✔
470
                return nil, ErrPersistenceLimitExceeded
×
471
        }
×
472

473
        return p.persistence.RangeCompleteReplicationTask(ctx, request)
120✔
474
}
475

476
func (p *workflowExecutionRateLimitedPersistenceClient) PutReplicationTaskToDLQ(
477
        ctx context.Context,
478
        request *PutReplicationTaskToDLQRequest,
479
) error {
3✔
480
        if ok := p.rateLimiter.Allow(); !ok {
3✔
481
                return ErrPersistenceLimitExceeded
×
482
        }
×
483

484
        return p.persistence.PutReplicationTaskToDLQ(ctx, request)
3✔
485
}
486

487
func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTasksFromDLQ(
488
        ctx context.Context,
489
        request *GetReplicationTasksFromDLQRequest,
490
) (*GetReplicationTasksFromDLQResponse, error) {
×
491
        if ok := p.rateLimiter.Allow(); !ok {
×
492
                return nil, ErrPersistenceLimitExceeded
×
493
        }
×
494

495
        return p.persistence.GetReplicationTasksFromDLQ(ctx, request)
×
496
}
497

498
func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationDLQSize(
499
        ctx context.Context,
500
        request *GetReplicationDLQSizeRequest,
501
) (*GetReplicationDLQSizeResponse, error) {
12✔
502
        if ok := p.rateLimiter.Allow(); !ok {
12✔
503
                return nil, ErrPersistenceLimitExceeded
×
504
        }
×
505

506
        return p.persistence.GetReplicationDLQSize(ctx, request)
12✔
507
}
508

509
func (p *workflowExecutionRateLimitedPersistenceClient) DeleteReplicationTaskFromDLQ(
510
        ctx context.Context,
511
        request *DeleteReplicationTaskFromDLQRequest,
512
) error {
×
513
        if ok := p.rateLimiter.Allow(); !ok {
×
514
                return ErrPersistenceLimitExceeded
×
515
        }
×
516

517
        return p.persistence.DeleteReplicationTaskFromDLQ(ctx, request)
×
518
}
519

520
func (p *workflowExecutionRateLimitedPersistenceClient) RangeDeleteReplicationTaskFromDLQ(
521
        ctx context.Context,
522
        request *RangeDeleteReplicationTaskFromDLQRequest,
523
) (*RangeDeleteReplicationTaskFromDLQResponse, error) {
×
524
        if ok := p.rateLimiter.Allow(); !ok {
×
525
                return nil, ErrPersistenceLimitExceeded
×
526
        }
×
527

528
        return p.persistence.RangeDeleteReplicationTaskFromDLQ(ctx, request)
×
529
}
530

531
func (p *workflowExecutionRateLimitedPersistenceClient) CreateFailoverMarkerTasks(
532
        ctx context.Context,
533
        request *CreateFailoverMarkersRequest,
534
) error {
×
535
        if ok := p.rateLimiter.Allow(); !ok {
×
536
                return ErrPersistenceLimitExceeded
×
537
        }
×
538

539
        err := p.persistence.CreateFailoverMarkerTasks(ctx, request)
×
540
        return err
×
541
}
542

543
func (p *workflowExecutionRateLimitedPersistenceClient) GetTimerIndexTasks(
544
        ctx context.Context,
545
        request *GetTimerIndexTasksRequest,
546
) (*GetTimerIndexTasksResponse, error) {
3,249✔
547
        if ok := p.rateLimiter.Allow(); !ok {
3,249✔
548
                return nil, ErrPersistenceLimitExceeded
×
549
        }
×
550

551
        response, err := p.persistence.GetTimerIndexTasks(ctx, request)
3,249✔
552
        return response, err
3,249✔
553
}
554

555
func (p *workflowExecutionRateLimitedPersistenceClient) CompleteTimerTask(
556
        ctx context.Context,
557
        request *CompleteTimerTaskRequest,
558
) error {
×
559
        if ok := p.rateLimiter.Allow(); !ok {
×
560
                return ErrPersistenceLimitExceeded
×
561
        }
×
562

563
        err := p.persistence.CompleteTimerTask(ctx, request)
×
564
        return err
×
565
}
566

567
func (p *workflowExecutionRateLimitedPersistenceClient) RangeCompleteTimerTask(
568
        ctx context.Context,
569
        request *RangeCompleteTimerTaskRequest,
570
) (*RangeCompleteTimerTaskResponse, error) {
38✔
571
        if ok := p.rateLimiter.Allow(); !ok {
38✔
572
                return nil, ErrPersistenceLimitExceeded
×
573
        }
×
574

575
        return p.persistence.RangeCompleteTimerTask(ctx, request)
38✔
576
}
577

578
func (p *workflowExecutionRateLimitedPersistenceClient) Close() {
51✔
579
        p.persistence.Close()
51✔
580
}
51✔
581

582
func (p *taskRateLimitedPersistenceClient) GetName() string {
×
583
        return p.persistence.GetName()
×
584
}
×
585

586
func (p *taskRateLimitedPersistenceClient) CreateTasks(
587
        ctx context.Context,
588
        request *CreateTasksRequest,
589
) (*CreateTasksResponse, error) {
934✔
590
        if ok := p.rateLimiter.Allow(); !ok {
934✔
591
                return nil, ErrPersistenceLimitExceeded
×
592
        }
×
593

594
        response, err := p.persistence.CreateTasks(ctx, request)
934✔
595
        return response, err
934✔
596
}
597

598
func (p *taskRateLimitedPersistenceClient) GetTasks(
599
        ctx context.Context,
600
        request *GetTasksRequest,
601
) (*GetTasksResponse, error) {
980✔
602
        if ok := p.rateLimiter.Allow(); !ok {
980✔
603
                return nil, ErrPersistenceLimitExceeded
×
604
        }
×
605

606
        response, err := p.persistence.GetTasks(ctx, request)
980✔
607
        return response, err
980✔
608
}
609

610
func (p *taskRateLimitedPersistenceClient) CompleteTask(
611
        ctx context.Context,
612
        request *CompleteTaskRequest,
613
) error {
×
614
        if ok := p.rateLimiter.Allow(); !ok {
×
615
                return ErrPersistenceLimitExceeded
×
616
        }
×
617

618
        err := p.persistence.CompleteTask(ctx, request)
×
619
        return err
×
620
}
621

622
func (p *taskRateLimitedPersistenceClient) CompleteTasksLessThan(
623
        ctx context.Context,
624
        request *CompleteTasksLessThanRequest,
625
) (*CompleteTasksLessThanResponse, error) {
539✔
626
        if ok := p.rateLimiter.Allow(); !ok {
539✔
627
                return nil, ErrPersistenceLimitExceeded
×
628
        }
×
629
        return p.persistence.CompleteTasksLessThan(ctx, request)
539✔
630
}
631

632
func (p *taskRateLimitedPersistenceClient) GetOrphanTasks(ctx context.Context, request *GetOrphanTasksRequest) (*GetOrphanTasksResponse, error) {
×
633
        if ok := p.rateLimiter.Allow(); !ok {
×
634
                return nil, ErrPersistenceLimitExceeded
×
635
        }
×
636
        return p.persistence.GetOrphanTasks(ctx, request)
×
637
}
638

639
func (p *taskRateLimitedPersistenceClient) LeaseTaskList(
640
        ctx context.Context,
641
        request *LeaseTaskListRequest,
642
) (*LeaseTaskListResponse, error) {
1,343✔
643
        if ok := p.rateLimiter.Allow(); !ok {
1,343✔
644
                return nil, ErrPersistenceLimitExceeded
×
645
        }
×
646

647
        response, err := p.persistence.LeaseTaskList(ctx, request)
1,343✔
648
        return response, err
1,343✔
649
}
650

651
func (p *taskRateLimitedPersistenceClient) UpdateTaskList(
652
        ctx context.Context,
653
        request *UpdateTaskListRequest,
654
) (*UpdateTaskListResponse, error) {
5,888✔
655
        if ok := p.rateLimiter.Allow(); !ok {
5,888✔
656
                return nil, ErrPersistenceLimitExceeded
×
657
        }
×
658

659
        response, err := p.persistence.UpdateTaskList(ctx, request)
5,888✔
660
        return response, err
5,888✔
661
}
662

663
func (p *taskRateLimitedPersistenceClient) ListTaskList(
664
        ctx context.Context,
665
        request *ListTaskListRequest,
666
) (*ListTaskListResponse, error) {
×
667
        if ok := p.rateLimiter.Allow(); !ok {
×
668
                return nil, ErrPersistenceLimitExceeded
×
669
        }
×
670
        return p.persistence.ListTaskList(ctx, request)
×
671
}
672

673
func (p *taskRateLimitedPersistenceClient) DeleteTaskList(
674
        ctx context.Context,
675
        request *DeleteTaskListRequest,
676
) error {
×
677
        if ok := p.rateLimiter.Allow(); !ok {
×
678
                return ErrPersistenceLimitExceeded
×
679
        }
×
680
        return p.persistence.DeleteTaskList(ctx, request)
×
681
}
682

683
func (p *taskRateLimitedPersistenceClient) Close() {
39✔
684
        p.persistence.Close()
39✔
685
}
39✔
686

687
func (p *metadataRateLimitedPersistenceClient) GetName() string {
×
688
        return p.persistence.GetName()
×
689
}
×
690

691
func (p *metadataRateLimitedPersistenceClient) CreateDomain(
692
        ctx context.Context,
693
        request *CreateDomainRequest,
694
) (*CreateDomainResponse, error) {
39✔
695
        if ok := p.rateLimiter.Allow(); !ok {
39✔
696
                return nil, ErrPersistenceLimitExceeded
×
697
        }
×
698

699
        response, err := p.persistence.CreateDomain(ctx, request)
39✔
700
        return response, err
39✔
701
}
702

703
func (p *metadataRateLimitedPersistenceClient) GetDomain(
704
        ctx context.Context,
705
        request *GetDomainRequest,
706
) (*GetDomainResponse, error) {
773✔
707
        if ok := p.rateLimiter.Allow(); !ok {
773✔
708
                return nil, ErrPersistenceLimitExceeded
×
709
        }
×
710

711
        response, err := p.persistence.GetDomain(ctx, request)
773✔
712
        return response, err
773✔
713
}
714

715
func (p *metadataRateLimitedPersistenceClient) UpdateDomain(
716
        ctx context.Context,
717
        request *UpdateDomainRequest,
718
) error {
×
719
        if ok := p.rateLimiter.Allow(); !ok {
×
720
                return ErrPersistenceLimitExceeded
×
721
        }
×
722

723
        err := p.persistence.UpdateDomain(ctx, request)
×
724
        return err
×
725
}
726

727
func (p *metadataRateLimitedPersistenceClient) DeleteDomain(
728
        ctx context.Context,
729
        request *DeleteDomainRequest,
730
) error {
×
731
        if ok := p.rateLimiter.Allow(); !ok {
×
732
                return ErrPersistenceLimitExceeded
×
733
        }
×
734

735
        err := p.persistence.DeleteDomain(ctx, request)
×
736
        return err
×
737
}
738

739
func (p *metadataRateLimitedPersistenceClient) DeleteDomainByName(
740
        ctx context.Context,
741
        request *DeleteDomainByNameRequest,
742
) error {
×
743
        if ok := p.rateLimiter.Allow(); !ok {
×
744
                return ErrPersistenceLimitExceeded
×
745
        }
×
746

747
        err := p.persistence.DeleteDomainByName(ctx, request)
×
748
        return err
×
749
}
750

751
func (p *metadataRateLimitedPersistenceClient) ListDomains(
752
        ctx context.Context,
753
        request *ListDomainsRequest,
754
) (*ListDomainsResponse, error) {
617✔
755
        if ok := p.rateLimiter.Allow(); !ok {
617✔
756
                return nil, ErrPersistenceLimitExceeded
×
757
        }
×
758

759
        response, err := p.persistence.ListDomains(ctx, request)
617✔
760
        return response, err
617✔
761
}
762

763
func (p *metadataRateLimitedPersistenceClient) GetMetadata(
764
        ctx context.Context,
765
) (*GetMetadataResponse, error) {
617✔
766
        if ok := p.rateLimiter.Allow(); !ok {
617✔
767
                return nil, ErrPersistenceLimitExceeded
×
768
        }
×
769

770
        response, err := p.persistence.GetMetadata(ctx)
617✔
771
        return response, err
617✔
772
}
773

774
func (p *metadataRateLimitedPersistenceClient) Close() {
39✔
775
        p.persistence.Close()
39✔
776
}
39✔
777

778
func (p *visibilityRateLimitedPersistenceClient) GetName() string {
×
779
        return p.persistence.GetName()
×
780
}
×
781

782
func (p *visibilityRateLimitedPersistenceClient) RecordWorkflowExecutionStarted(
783
        ctx context.Context,
784
        request *RecordWorkflowExecutionStartedRequest,
785
) error {
759✔
786
        if ok := p.rateLimiter.Allow(); !ok {
759✔
787
                return ErrPersistenceLimitExceeded
×
788
        }
×
789

790
        err := p.persistence.RecordWorkflowExecutionStarted(ctx, request)
759✔
791
        return err
759✔
792
}
793

794
func (p *visibilityRateLimitedPersistenceClient) RecordWorkflowExecutionClosed(
795
        ctx context.Context,
796
        request *RecordWorkflowExecutionClosedRequest,
797
) error {
500✔
798
        if ok := p.rateLimiter.Allow(); !ok {
500✔
799
                return ErrPersistenceLimitExceeded
×
800
        }
×
801

802
        err := p.persistence.RecordWorkflowExecutionClosed(ctx, request)
500✔
803
        return err
500✔
804
}
805

806
func (p *visibilityRateLimitedPersistenceClient) RecordWorkflowExecutionUninitialized(
807
        ctx context.Context,
808
        request *RecordWorkflowExecutionUninitializedRequest,
809
) error {
×
810
        err := p.persistence.RecordWorkflowExecutionUninitialized(ctx, request)
×
811
        return err
×
812
}
×
813

814
func (p *visibilityRateLimitedPersistenceClient) UpsertWorkflowExecution(
815
        ctx context.Context,
816
        request *UpsertWorkflowExecutionRequest,
817
) error {
240✔
818
        if ok := p.rateLimiter.Allow(); !ok {
240✔
819
                return ErrPersistenceLimitExceeded
×
820
        }
×
821

822
        err := p.persistence.UpsertWorkflowExecution(ctx, request)
240✔
823
        return err
240✔
824
}
825

826
func (p *visibilityRateLimitedPersistenceClient) ListOpenWorkflowExecutions(
827
        ctx context.Context,
828
        request *ListWorkflowExecutionsRequest,
829
) (*ListWorkflowExecutionsResponse, error) {
5✔
830
        if ok := p.rateLimiter.Allow(); !ok {
5✔
831
                return nil, ErrPersistenceLimitExceeded
×
832
        }
×
833

834
        response, err := p.persistence.ListOpenWorkflowExecutions(ctx, request)
5✔
835
        return response, err
5✔
836
}
837

838
func (p *visibilityRateLimitedPersistenceClient) ListClosedWorkflowExecutions(
839
        ctx context.Context,
840
        request *ListWorkflowExecutionsRequest,
841
) (*ListWorkflowExecutionsResponse, error) {
9✔
842
        if ok := p.rateLimiter.Allow(); !ok {
9✔
843
                return nil, ErrPersistenceLimitExceeded
×
844
        }
×
845

846
        response, err := p.persistence.ListClosedWorkflowExecutions(ctx, request)
9✔
847
        return response, err
9✔
848
}
849

850
func (p *visibilityRateLimitedPersistenceClient) ListOpenWorkflowExecutionsByType(
851
        ctx context.Context,
852
        request *ListWorkflowExecutionsByTypeRequest,
853
) (*ListWorkflowExecutionsResponse, error) {
×
854
        if ok := p.rateLimiter.Allow(); !ok {
×
855
                return nil, ErrPersistenceLimitExceeded
×
856
        }
×
857

858
        response, err := p.persistence.ListOpenWorkflowExecutionsByType(ctx, request)
×
859
        return response, err
×
860
}
861

862
func (p *visibilityRateLimitedPersistenceClient) ListClosedWorkflowExecutionsByType(
863
        ctx context.Context,
864
        request *ListWorkflowExecutionsByTypeRequest,
865
) (*ListWorkflowExecutionsResponse, error) {
×
866
        if ok := p.rateLimiter.Allow(); !ok {
×
867
                return nil, ErrPersistenceLimitExceeded
×
868
        }
×
869

870
        response, err := p.persistence.ListClosedWorkflowExecutionsByType(ctx, request)
×
871
        return response, err
×
872
}
873

874
func (p *visibilityRateLimitedPersistenceClient) ListOpenWorkflowExecutionsByWorkflowID(
875
        ctx context.Context,
876
        request *ListWorkflowExecutionsByWorkflowIDRequest,
877
) (*ListWorkflowExecutionsResponse, error) {
96✔
878
        if ok := p.rateLimiter.Allow(); !ok {
96✔
879
                return nil, ErrPersistenceLimitExceeded
×
880
        }
×
881

882
        response, err := p.persistence.ListOpenWorkflowExecutionsByWorkflowID(ctx, request)
96✔
883
        return response, err
96✔
884
}
885

886
func (p *visibilityRateLimitedPersistenceClient) ListClosedWorkflowExecutionsByWorkflowID(
887
        ctx context.Context,
888
        request *ListWorkflowExecutionsByWorkflowIDRequest,
889
) (*ListWorkflowExecutionsResponse, error) {
15✔
890
        if ok := p.rateLimiter.Allow(); !ok {
15✔
891
                return nil, ErrPersistenceLimitExceeded
×
892
        }
×
893

894
        response, err := p.persistence.ListClosedWorkflowExecutionsByWorkflowID(ctx, request)
15✔
895
        return response, err
15✔
896
}
897

898
func (p *visibilityRateLimitedPersistenceClient) ListClosedWorkflowExecutionsByStatus(
899
        ctx context.Context,
900
        request *ListClosedWorkflowExecutionsByStatusRequest,
901
) (*ListWorkflowExecutionsResponse, error) {
×
902
        if ok := p.rateLimiter.Allow(); !ok {
×
903
                return nil, ErrPersistenceLimitExceeded
×
904
        }
×
905

906
        response, err := p.persistence.ListClosedWorkflowExecutionsByStatus(ctx, request)
×
907
        return response, err
×
908
}
909

910
func (p *visibilityRateLimitedPersistenceClient) GetClosedWorkflowExecution(
911
        ctx context.Context,
912
        request *GetClosedWorkflowExecutionRequest,
913
) (*GetClosedWorkflowExecutionResponse, error) {
×
914
        if ok := p.rateLimiter.Allow(); !ok {
×
915
                return nil, ErrPersistenceLimitExceeded
×
916
        }
×
917

918
        response, err := p.persistence.GetClosedWorkflowExecution(ctx, request)
×
919
        return response, err
×
920
}
921

922
func (p *visibilityRateLimitedPersistenceClient) DeleteWorkflowExecution(
923
        ctx context.Context,
924
        request *VisibilityDeleteWorkflowExecutionRequest,
925
) error {
54✔
926
        if ok := p.rateLimiter.Allow(); !ok {
54✔
927
                return ErrPersistenceLimitExceeded
×
928
        }
×
929
        return p.persistence.DeleteWorkflowExecution(ctx, request)
54✔
930
}
931

932
func (p *visibilityRateLimitedPersistenceClient) DeleteUninitializedWorkflowExecution(
933
        ctx context.Context,
934
        request *VisibilityDeleteWorkflowExecutionRequest,
935
) error {
×
936
        if ok := p.rateLimiter.Allow(); !ok {
×
937
                return ErrPersistenceLimitExceeded
×
938
        }
×
939
        return p.persistence.DeleteUninitializedWorkflowExecution(ctx, request)
×
940
}
941

942
func (p *visibilityRateLimitedPersistenceClient) ListWorkflowExecutions(
943
        ctx context.Context,
944
        request *ListWorkflowExecutionsByQueryRequest,
945
) (*ListWorkflowExecutionsResponse, error) {
139✔
946
        if ok := p.rateLimiter.Allow(); !ok {
139✔
947
                return nil, ErrPersistenceLimitExceeded
×
948
        }
×
949
        return p.persistence.ListWorkflowExecutions(ctx, request)
139✔
950
}
951

952
func (p *visibilityRateLimitedPersistenceClient) ScanWorkflowExecutions(
953
        ctx context.Context,
954
        request *ListWorkflowExecutionsByQueryRequest,
955
) (*ListWorkflowExecutionsResponse, error) {
30✔
956
        if ok := p.rateLimiter.Allow(); !ok {
30✔
957
                return nil, ErrPersistenceLimitExceeded
×
958
        }
×
959
        return p.persistence.ScanWorkflowExecutions(ctx, request)
30✔
960
}
961

962
func (p *visibilityRateLimitedPersistenceClient) CountWorkflowExecutions(
963
        ctx context.Context,
964
        request *CountWorkflowExecutionsRequest,
965
) (*CountWorkflowExecutionsResponse, error) {
12✔
966
        if ok := p.rateLimiter.Allow(); !ok {
12✔
967
                return nil, ErrPersistenceLimitExceeded
×
968
        }
×
969
        return p.persistence.CountWorkflowExecutions(ctx, request)
12✔
970
}
971

972
func (p *visibilityRateLimitedPersistenceClient) Close() {
33✔
973
        p.persistence.Close()
33✔
974
}
33✔
975

976
func (p *historyRateLimitedPersistenceClient) GetName() string {
×
977
        return p.persistence.GetName()
×
978
}
×
979

980
func (p *historyRateLimitedPersistenceClient) Close() {
39✔
981
        p.persistence.Close()
39✔
982
}
39✔
983

984
// AppendHistoryNodes add(or override) a node to a history branch
985
func (p *historyRateLimitedPersistenceClient) AppendHistoryNodes(
986
        ctx context.Context,
987
        request *AppendHistoryNodesRequest,
988
) (*AppendHistoryNodesResponse, error) {
3,682✔
989
        if ok := p.rateLimiter.Allow(); !ok {
3,682✔
990
                return nil, ErrPersistenceLimitExceeded
×
991
        }
×
992
        return p.persistence.AppendHistoryNodes(ctx, request)
3,682✔
993
}
994

995
// ReadHistoryBranch returns history node data for a branch
996
func (p *historyRateLimitedPersistenceClient) ReadHistoryBranch(
997
        ctx context.Context,
998
        request *ReadHistoryBranchRequest,
999
) (*ReadHistoryBranchResponse, error) {
1,621✔
1000
        if ok := p.rateLimiter.Allow(); !ok {
1,621✔
1001
                return nil, ErrPersistenceLimitExceeded
×
1002
        }
×
1003
        response, err := p.persistence.ReadHistoryBranch(ctx, request)
1,621✔
1004
        return response, err
1,621✔
1005
}
1006

1007
// ReadHistoryBranchByBatch returns history node data for a branch
1008
func (p *historyRateLimitedPersistenceClient) ReadHistoryBranchByBatch(
1009
        ctx context.Context,
1010
        request *ReadHistoryBranchRequest,
1011
) (*ReadHistoryBranchByBatchResponse, error) {
129✔
1012
        if ok := p.rateLimiter.Allow(); !ok {
129✔
1013
                return nil, ErrPersistenceLimitExceeded
×
1014
        }
×
1015
        response, err := p.persistence.ReadHistoryBranchByBatch(ctx, request)
129✔
1016
        return response, err
129✔
1017
}
1018

1019
// ReadHistoryBranchByBatch returns history node data for a branch
1020
func (p *historyRateLimitedPersistenceClient) ReadRawHistoryBranch(
1021
        ctx context.Context,
1022
        request *ReadHistoryBranchRequest,
1023
) (*ReadRawHistoryBranchResponse, error) {
3✔
1024
        if ok := p.rateLimiter.Allow(); !ok {
3✔
1025
                return nil, ErrPersistenceLimitExceeded
×
1026
        }
×
1027
        response, err := p.persistence.ReadRawHistoryBranch(ctx, request)
3✔
1028
        return response, err
3✔
1029
}
1030

1031
// ForkHistoryBranch forks a new branch from a old branch
1032
func (p *historyRateLimitedPersistenceClient) ForkHistoryBranch(
1033
        ctx context.Context,
1034
        request *ForkHistoryBranchRequest,
1035
) (*ForkHistoryBranchResponse, error) {
18✔
1036
        if ok := p.rateLimiter.Allow(); !ok {
18✔
1037
                return nil, ErrPersistenceLimitExceeded
×
1038
        }
×
1039
        response, err := p.persistence.ForkHistoryBranch(ctx, request)
18✔
1040
        return response, err
18✔
1041
}
1042

1043
// DeleteHistoryBranch removes a branch
1044
func (p *historyRateLimitedPersistenceClient) DeleteHistoryBranch(
1045
        ctx context.Context,
1046
        request *DeleteHistoryBranchRequest,
1047
) error {
51✔
1048
        if ok := p.rateLimiter.Allow(); !ok {
51✔
1049
                return ErrPersistenceLimitExceeded
×
1050
        }
×
1051
        err := p.persistence.DeleteHistoryBranch(ctx, request)
51✔
1052
        return err
51✔
1053
}
1054

1055
// GetHistoryTree returns all branch information of a tree
1056
func (p *historyRateLimitedPersistenceClient) GetHistoryTree(
1057
        ctx context.Context,
1058
        request *GetHistoryTreeRequest,
1059
) (*GetHistoryTreeResponse, error) {
×
1060
        if ok := p.rateLimiter.Allow(); !ok {
×
1061
                return nil, ErrPersistenceLimitExceeded
×
1062
        }
×
1063
        response, err := p.persistence.GetHistoryTree(ctx, request)
×
1064
        return response, err
×
1065
}
1066

1067
func (p *historyRateLimitedPersistenceClient) GetAllHistoryTreeBranches(
1068
        ctx context.Context,
1069
        request *GetAllHistoryTreeBranchesRequest,
1070
) (*GetAllHistoryTreeBranchesResponse, error) {
×
1071
        if ok := p.rateLimiter.Allow(); !ok {
×
1072
                return nil, ErrPersistenceLimitExceeded
×
1073
        }
×
1074
        response, err := p.persistence.GetAllHistoryTreeBranches(ctx, request)
×
1075
        return response, err
×
1076
}
1077

1078
func (p *queueRateLimitedPersistenceClient) EnqueueMessage(
1079
        ctx context.Context,
1080
        message []byte,
1081
) error {
3✔
1082
        if ok := p.rateLimiter.Allow(); !ok {
3✔
1083
                return ErrPersistenceLimitExceeded
×
1084
        }
×
1085

1086
        return p.persistence.EnqueueMessage(ctx, message)
3✔
1087
}
1088

1089
func (p *queueRateLimitedPersistenceClient) ReadMessages(
1090
        ctx context.Context,
1091
        lastMessageID int64,
1092
        maxCount int,
1093
) ([]*QueueMessage, error) {
×
1094
        if ok := p.rateLimiter.Allow(); !ok {
×
1095
                return nil, ErrPersistenceLimitExceeded
×
1096
        }
×
1097

1098
        return p.persistence.ReadMessages(ctx, lastMessageID, maxCount)
×
1099
}
1100

1101
func (p *queueRateLimitedPersistenceClient) UpdateAckLevel(
1102
        ctx context.Context,
1103
        messageID int64,
1104
        clusterName string,
1105
) error {
×
1106
        if ok := p.rateLimiter.Allow(); !ok {
×
1107
                return ErrPersistenceLimitExceeded
×
1108
        }
×
1109

1110
        return p.persistence.UpdateAckLevel(ctx, messageID, clusterName)
×
1111
}
1112

1113
func (p *queueRateLimitedPersistenceClient) GetAckLevels(
1114
        ctx context.Context,
1115
) (map[string]int64, error) {
×
1116
        if ok := p.rateLimiter.Allow(); !ok {
×
1117
                return nil, ErrPersistenceLimitExceeded
×
1118
        }
×
1119

1120
        return p.persistence.GetAckLevels(ctx)
×
1121
}
1122

1123
func (p *queueRateLimitedPersistenceClient) DeleteMessagesBefore(
1124
        ctx context.Context,
1125
        messageID int64,
1126
) error {
×
1127
        if ok := p.rateLimiter.Allow(); !ok {
×
1128
                return ErrPersistenceLimitExceeded
×
1129
        }
×
1130

1131
        return p.persistence.DeleteMessagesBefore(ctx, messageID)
×
1132
}
1133

1134
func (p *queueRateLimitedPersistenceClient) EnqueueMessageToDLQ(
1135
        ctx context.Context,
1136
        message []byte,
1137
) error {
×
1138
        if ok := p.rateLimiter.Allow(); !ok {
×
1139
                return ErrPersistenceLimitExceeded
×
1140
        }
×
1141

1142
        return p.persistence.EnqueueMessageToDLQ(ctx, message)
×
1143
}
1144

1145
func (p *queueRateLimitedPersistenceClient) ReadMessagesFromDLQ(
1146
        ctx context.Context,
1147
        firstMessageID int64,
1148
        lastMessageID int64,
1149
        pageSize int,
1150
        pageToken []byte,
1151
) ([]*QueueMessage, []byte, error) {
×
1152
        if ok := p.rateLimiter.Allow(); !ok {
×
1153
                return nil, nil, ErrPersistenceLimitExceeded
×
1154
        }
×
1155

1156
        return p.persistence.ReadMessagesFromDLQ(ctx, firstMessageID, lastMessageID, pageSize, pageToken)
×
1157
}
1158

1159
func (p *queueRateLimitedPersistenceClient) RangeDeleteMessagesFromDLQ(
1160
        ctx context.Context,
1161
        firstMessageID int64,
1162
        lastMessageID int64,
1163
) error {
×
1164
        if ok := p.rateLimiter.Allow(); !ok {
×
1165
                return ErrPersistenceLimitExceeded
×
1166
        }
×
1167

1168
        return p.persistence.RangeDeleteMessagesFromDLQ(ctx, firstMessageID, lastMessageID)
×
1169
}
1170

1171
func (p *queueRateLimitedPersistenceClient) UpdateDLQAckLevel(
1172
        ctx context.Context,
1173
        messageID int64,
1174
        clusterName string,
1175
) error {
×
1176
        if ok := p.rateLimiter.Allow(); !ok {
×
1177
                return ErrPersistenceLimitExceeded
×
1178
        }
×
1179

1180
        return p.persistence.UpdateDLQAckLevel(ctx, messageID, clusterName)
×
1181
}
1182

1183
func (p *queueRateLimitedPersistenceClient) GetDLQAckLevels(
1184
        ctx context.Context,
1185
) (map[string]int64, error) {
×
1186
        if ok := p.rateLimiter.Allow(); !ok {
×
1187
                return nil, ErrPersistenceLimitExceeded
×
1188
        }
×
1189

1190
        return p.persistence.GetDLQAckLevels(ctx)
×
1191
}
1192

1193
func (p *queueRateLimitedPersistenceClient) GetDLQSize(
1194
        ctx context.Context,
1195
) (int64, error) {
3✔
1196
        if ok := p.rateLimiter.Allow(); !ok {
3✔
1197
                return 0, ErrPersistenceLimitExceeded
×
1198
        }
×
1199

1200
        return p.persistence.GetDLQSize(ctx)
3✔
1201
}
1202

1203
func (p *queueRateLimitedPersistenceClient) DeleteMessageFromDLQ(
1204
        ctx context.Context,
1205
        messageID int64,
1206
) error {
×
1207
        if ok := p.rateLimiter.Allow(); !ok {
×
1208
                return ErrPersistenceLimitExceeded
×
1209
        }
×
1210

1211
        return p.persistence.DeleteMessageFromDLQ(ctx, messageID)
×
1212
}
1213

1214
func (p *queueRateLimitedPersistenceClient) Close() {
39✔
1215
        p.persistence.Close()
39✔
1216
}
39✔
1217

1218
func (p *configStoreRateLimitedPersistenceClient) FetchDynamicConfig(ctx context.Context, configType ConfigType) (*FetchDynamicConfigResponse, error) {
×
1219
        if ok := p.rateLimiter.Allow(); !ok {
×
1220
                return nil, ErrPersistenceLimitExceeded
×
1221
        }
×
1222

1223
        return p.persistence.FetchDynamicConfig(ctx, configType)
×
1224
}
1225

1226
func (p *configStoreRateLimitedPersistenceClient) UpdateDynamicConfig(ctx context.Context, request *UpdateDynamicConfigRequest, configType ConfigType) error {
×
1227
        if ok := p.rateLimiter.Allow(); !ok {
×
1228
                return ErrPersistenceLimitExceeded
×
1229
        }
×
1230
        return p.persistence.UpdateDynamicConfig(ctx, request, configType)
×
1231
}
1232

1233
func (p *configStoreRateLimitedPersistenceClient) Close() {
×
1234
        p.persistence.Close()
×
1235
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc