• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.0
/common/persistence/persistenceMetricClients.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package persistence
22

23
import (
24
        "context"
25
        "strconv"
26
        "time"
27

28
        "github.com/uber/cadence/common/dynamicconfig"
29

30
        "github.com/uber/cadence/common/config"
31
        "github.com/uber/cadence/common/log"
32
        "github.com/uber/cadence/common/log/tag"
33
        "github.com/uber/cadence/common/metrics"
34
        "github.com/uber/cadence/common/types"
35
)
36

37
type (
38
        persistenceMetricsClientBase struct {
39
                metricClient                  metrics.Client
40
                logger                        log.Logger
41
                enableLatencyHistogramMetrics bool
42
                sampleLoggingRate             dynamicconfig.IntPropertyFn
43
                enableShardIDMetrics          dynamicconfig.BoolPropertyFn
44
        }
45

46
        shardPersistenceClient struct {
47
                persistenceMetricsClientBase
48
                persistence ShardManager
49
        }
50

51
        workflowExecutionPersistenceClient struct {
52
                persistenceMetricsClientBase
53
                persistence ExecutionManager
54
        }
55

56
        taskPersistenceClient struct {
57
                persistenceMetricsClientBase
58
                persistence TaskManager
59
        }
60

61
        historyPersistenceClient struct {
62
                persistenceMetricsClientBase
63
                persistence HistoryManager
64
        }
65

66
        metadataPersistenceClient struct {
67
                persistenceMetricsClientBase
68
                persistence DomainManager
69
        }
70

71
        visibilityPersistenceClient struct {
72
                persistenceMetricsClientBase
73
                persistence VisibilityManager
74
        }
75

76
        queuePersistenceClient struct {
77
                persistenceMetricsClientBase
78
                persistence QueueManager
79
        }
80

81
        configStorePersistenceClient struct {
82
                persistenceMetricsClientBase
83
                persistence ConfigStoreManager
84
        }
85
)
86

87
var _ ShardManager = (*shardPersistenceClient)(nil)
88
var _ ExecutionManager = (*workflowExecutionPersistenceClient)(nil)
89
var _ TaskManager = (*taskPersistenceClient)(nil)
90
var _ HistoryManager = (*historyPersistenceClient)(nil)
91
var _ DomainManager = (*metadataPersistenceClient)(nil)
92
var _ VisibilityManager = (*visibilityPersistenceClient)(nil)
93
var _ QueueManager = (*queuePersistenceClient)(nil)
94
var _ ConfigStoreManager = (*configStorePersistenceClient)(nil)
95

96
// NewShardPersistenceMetricsClient creates a client to manage shards
97
func NewShardPersistenceMetricsClient(
98
        persistence ShardManager,
99
        metricClient metrics.Client,
100
        logger log.Logger,
101
        cfg *config.Persistence,
102
) ShardManager {
51✔
103
        return &shardPersistenceClient{
51✔
104
                persistence: persistence,
51✔
105
                persistenceMetricsClientBase: persistenceMetricsClientBase{
51✔
106
                        metricClient:                  metricClient,
51✔
107
                        logger:                        logger,
51✔
108
                        enableLatencyHistogramMetrics: cfg.EnablePersistenceLatencyHistogramMetrics,
51✔
109
                },
51✔
110
        }
51✔
111
}
51✔
112

113
// NewWorkflowExecutionPersistenceMetricsClient creates a client to manage executions
114
func NewWorkflowExecutionPersistenceMetricsClient(
115
        persistence ExecutionManager,
116
        metricClient metrics.Client,
117
        logger log.Logger,
118
        cfg *config.Persistence,
119
        sampleLoggingRate dynamicconfig.IntPropertyFn,
120
        enableShardIDMetrics dynamicconfig.BoolPropertyFn,
121
) ExecutionManager {
63✔
122
        return &workflowExecutionPersistenceClient{
63✔
123
                persistence: persistence,
63✔
124
                persistenceMetricsClientBase: persistenceMetricsClientBase{
63✔
125
                        metricClient:                  metricClient,
63✔
126
                        logger:                        logger.WithTags(tag.ShardID(persistence.GetShardID())),
63✔
127
                        enableLatencyHistogramMetrics: cfg.EnablePersistenceLatencyHistogramMetrics,
63✔
128
                        sampleLoggingRate:             sampleLoggingRate,
63✔
129
                        enableShardIDMetrics:          enableShardIDMetrics,
63✔
130
                },
63✔
131
        }
63✔
132
}
63✔
133

134
// NewTaskPersistenceMetricsClient creates a client to manage tasks
135
func NewTaskPersistenceMetricsClient(
136
        persistence TaskManager,
137
        metricClient metrics.Client,
138
        logger log.Logger,
139
        cfg *config.Persistence,
140
) TaskManager {
51✔
141
        return &taskPersistenceClient{
51✔
142
                persistence: persistence,
51✔
143
                persistenceMetricsClientBase: persistenceMetricsClientBase{
51✔
144
                        metricClient:                  metricClient,
51✔
145
                        logger:                        logger,
51✔
146
                        enableLatencyHistogramMetrics: cfg.EnablePersistenceLatencyHistogramMetrics,
51✔
147
                },
51✔
148
        }
51✔
149
}
51✔
150

151
// NewHistoryPersistenceMetricsClient creates a HistoryManager client to manage workflow execution history
152
func NewHistoryPersistenceMetricsClient(
153
        persistence HistoryManager,
154
        metricClient metrics.Client,
155
        logger log.Logger,
156
        cfg *config.Persistence,
157
) HistoryManager {
51✔
158
        return &historyPersistenceClient{
51✔
159
                persistence: persistence,
51✔
160
                persistenceMetricsClientBase: persistenceMetricsClientBase{
51✔
161
                        metricClient:                  metricClient,
51✔
162
                        logger:                        logger,
51✔
163
                        enableLatencyHistogramMetrics: cfg.EnablePersistenceLatencyHistogramMetrics,
51✔
164
                },
51✔
165
        }
51✔
166
}
51✔
167

168
// NewDomainPersistenceMetricsClient creates a DomainManager client to manage metadata
169
func NewDomainPersistenceMetricsClient(
170
        persistence DomainManager,
171
        metricClient metrics.Client,
172
        logger log.Logger,
173
        cfg *config.Persistence,
174
) DomainManager {
57✔
175
        return &metadataPersistenceClient{
57✔
176
                persistence: persistence,
57✔
177
                persistenceMetricsClientBase: persistenceMetricsClientBase{
57✔
178
                        metricClient:                  metricClient,
57✔
179
                        logger:                        logger,
57✔
180
                        enableLatencyHistogramMetrics: cfg.EnablePersistenceLatencyHistogramMetrics,
57✔
181
                },
57✔
182
        }
57✔
183
}
57✔
184

185
// NewVisibilityPersistenceMetricsClient creates a client to manage visibility
186
func NewVisibilityPersistenceMetricsClient(
187
        persistence VisibilityManager,
188
        metricClient metrics.Client,
189
        logger log.Logger,
190
        cfg *config.Persistence,
191
) VisibilityManager {
27✔
192
        return &visibilityPersistenceClient{
27✔
193
                persistence: persistence,
27✔
194
                persistenceMetricsClientBase: persistenceMetricsClientBase{
27✔
195
                        metricClient:                  metricClient,
27✔
196
                        logger:                        logger,
27✔
197
                        enableLatencyHistogramMetrics: cfg.EnablePersistenceLatencyHistogramMetrics,
27✔
198
                },
27✔
199
        }
27✔
200
}
27✔
201

202
// NewQueuePersistenceMetricsClient creates a client to manage queue
203
func NewQueuePersistenceMetricsClient(
204
        persistence QueueManager,
205
        metricClient metrics.Client,
206
        logger log.Logger,
207
        cfg *config.Persistence,
208
) QueueManager {
51✔
209
        return &queuePersistenceClient{
51✔
210
                persistence: persistence,
51✔
211
                persistenceMetricsClientBase: persistenceMetricsClientBase{
51✔
212
                        metricClient:                  metricClient,
51✔
213
                        logger:                        logger,
51✔
214
                        enableLatencyHistogramMetrics: cfg.EnablePersistenceLatencyHistogramMetrics,
51✔
215
                },
51✔
216
        }
51✔
217
}
51✔
218

219
// NewConfigStorePersistenceMetricsClient creates a client to manage config store
220
func NewConfigStorePersistenceMetricsClient(
221
        persistence ConfigStoreManager,
222
        metricClient metrics.Client,
223
        logger log.Logger,
224
        cfg *config.Persistence,
225
) ConfigStoreManager {
43✔
226
        return &configStorePersistenceClient{
43✔
227
                persistence: persistence,
43✔
228
                persistenceMetricsClientBase: persistenceMetricsClientBase{
43✔
229
                        metricClient:                  metricClient,
43✔
230
                        logger:                        logger,
43✔
231
                        enableLatencyHistogramMetrics: cfg.EnablePersistenceLatencyHistogramMetrics,
43✔
232
                },
43✔
233
        }
43✔
234
}
43✔
235

236
func (p *persistenceMetricsClientBase) updateErrorMetricPerDomain(scope int, err error, scopeWithDomainTag metrics.Scope) {
501✔
237
        switch err.(type) {
501✔
238
        case *types.DomainAlreadyExistsError:
×
239
                scopeWithDomainTag.IncCounter(metrics.PersistenceErrDomainAlreadyExistsCounterPerDomain)
×
240
        case *types.BadRequestError:
×
241
                scopeWithDomainTag.IncCounter(metrics.PersistenceErrBadRequestCounterPerDomain)
×
242
        case *WorkflowExecutionAlreadyStartedError:
45✔
243
                scopeWithDomainTag.IncCounter(metrics.PersistenceErrExecutionAlreadyStartedCounterPerDomain)
45✔
244
        case *ConditionFailedError:
×
245
                scopeWithDomainTag.IncCounter(metrics.PersistenceErrConditionFailedCounterPerDomain)
×
246
        case *CurrentWorkflowConditionFailedError:
×
247
                scopeWithDomainTag.IncCounter(metrics.PersistenceErrCurrentWorkflowConditionFailedCounterPerDomain)
×
248
        case *ShardAlreadyExistError:
×
249
                scopeWithDomainTag.IncCounter(metrics.PersistenceErrShardExistsCounterPerDomain)
×
250
        case *ShardOwnershipLostError:
×
251
                scopeWithDomainTag.IncCounter(metrics.PersistenceErrShardOwnershipLostCounterPerDomain)
×
252
        case *types.EntityNotExistsError:
459✔
253
                scopeWithDomainTag.IncCounter(metrics.PersistenceErrEntityNotExistsCounterPerDomain)
459✔
254
        case *TimeoutError:
×
255
                scopeWithDomainTag.IncCounter(metrics.PersistenceErrTimeoutCounterPerDomain)
×
256
                scopeWithDomainTag.IncCounter(metrics.PersistenceFailuresPerDomain)
×
257
        case *types.ServiceBusyError:
×
258
                scopeWithDomainTag.IncCounter(metrics.PersistenceErrBusyCounterPerDomain)
×
259
                scopeWithDomainTag.IncCounter(metrics.PersistenceFailuresPerDomain)
×
260
        case *DBUnavailableError:
×
261
                scopeWithDomainTag.IncCounter(metrics.PersistenceErrDBUnavailableCounterPerDomain)
×
262
                scopeWithDomainTag.IncCounter(metrics.PersistenceFailuresPerDomain)
×
263
                p.logger.Error("DBUnavailable Error:", tag.Error(err), tag.MetricScope(scope))
×
264
        default:
×
265
                p.logger.Error("Operation failed with internal error.", tag.Error(err), tag.MetricScope(scope))
×
266
                scopeWithDomainTag.IncCounter(metrics.PersistenceFailuresPerDomain)
×
267
        }
268
}
269

270
func (p *persistenceMetricsClientBase) updateErrorMetric(scope int, err error, metricsScope metrics.Scope) {
780✔
271
        switch err.(type) {
780✔
272
        case *types.DomainAlreadyExistsError:
5✔
273
                metricsScope.IncCounter(metrics.PersistenceErrDomainAlreadyExistsCounter)
5✔
274
        case *types.BadRequestError:
120✔
275
                metricsScope.IncCounter(metrics.PersistenceErrBadRequestCounter)
120✔
276
        case *WorkflowExecutionAlreadyStartedError:
×
277
                metricsScope.IncCounter(metrics.PersistenceErrExecutionAlreadyStartedCounter)
×
278
        case *ConditionFailedError:
×
279
                metricsScope.IncCounter(metrics.PersistenceErrConditionFailedCounter)
×
280
        case *CurrentWorkflowConditionFailedError:
×
281
                metricsScope.IncCounter(metrics.PersistenceErrCurrentWorkflowConditionFailedCounter)
×
282
        case *ShardAlreadyExistError:
×
283
                metricsScope.IncCounter(metrics.PersistenceErrShardExistsCounter)
×
284
        case *ShardOwnershipLostError:
×
285
                metricsScope.IncCounter(metrics.PersistenceErrShardOwnershipLostCounter)
×
286
        case *types.EntityNotExistsError:
656✔
287
                metricsScope.IncCounter(metrics.PersistenceErrEntityNotExistsCounter)
656✔
288
        case *TimeoutError:
×
289
                metricsScope.IncCounter(metrics.PersistenceErrTimeoutCounter)
×
290
                metricsScope.IncCounter(metrics.PersistenceFailures)
×
291
        case *types.ServiceBusyError:
×
292
                metricsScope.IncCounter(metrics.PersistenceErrBusyCounter)
×
293
                metricsScope.IncCounter(metrics.PersistenceFailures)
×
294
        case *DBUnavailableError:
×
295
                metricsScope.IncCounter(metrics.PersistenceErrDBUnavailableCounter)
×
296
                metricsScope.IncCounter(metrics.PersistenceFailures)
×
297
                p.logger.Error("DBUnavailable Error:", tag.Error(err), tag.MetricScope(scope))
×
298
        default:
×
299
                p.logger.Error("Operation failed with internal error.", tag.Error(err), tag.MetricScope(scope))
×
300
                metricsScope.IncCounter(metrics.PersistenceFailures)
×
301
        }
302
}
303

304
func (p *persistenceMetricsClientBase) callWithDomainAndShardScope(scope int, op func() error, domainTag metrics.Tag, shardIDTag metrics.Tag) error {
6,332✔
305
        domainMetricsScope := p.metricClient.Scope(scope, domainTag)
6,332✔
306
        shardOperationsMetricsScope := p.metricClient.Scope(scope, shardIDTag)
6,332✔
307
        shardOverallMetricsScope := p.metricClient.Scope(metrics.PersistenceShardRequestCountScope, shardIDTag)
6,332✔
308

6,332✔
309
        domainMetricsScope.IncCounter(metrics.PersistenceRequestsPerDomain)
6,332✔
310
        shardOperationsMetricsScope.IncCounter(metrics.PersistenceRequestsPerShard)
6,332✔
311
        shardOverallMetricsScope.IncCounter(metrics.PersistenceRequestsPerShard)
6,332✔
312

6,332✔
313
        before := time.Now()
6,332✔
314
        err := op()
6,332✔
315
        duration := time.Since(before)
6,332✔
316

6,332✔
317
        domainMetricsScope.RecordTimer(metrics.PersistenceLatencyPerDomain, duration)
6,332✔
318
        shardOperationsMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration)
6,332✔
319
        shardOverallMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration)
6,332✔
320

6,332✔
321
        if p.enableLatencyHistogramMetrics {
6,332✔
322
                domainMetricsScope.RecordHistogramDuration(metrics.PersistenceLatencyHistogram, duration)
×
323
        }
×
324
        if err != nil {
6,782✔
325
                p.updateErrorMetricPerDomain(scope, err, domainMetricsScope)
450✔
326
        }
450✔
327
        return err
6,332✔
328
}
329

330
func (p *persistenceMetricsClientBase) call(scope int, op func() error, tags ...metrics.Tag) error {
26,449✔
331
        metricsScope := p.metricClient.Scope(scope, tags...)
26,449✔
332
        if len(tags) > 0 {
41,654✔
333
                metricsScope.IncCounter(metrics.PersistenceRequestsPerDomain)
15,205✔
334
        } else {
26,452✔
335
                metricsScope.IncCounter(metrics.PersistenceRequests)
11,247✔
336
        }
11,247✔
337
        before := time.Now()
26,449✔
338
        err := op()
26,449✔
339
        duration := time.Since(before)
26,449✔
340
        if len(tags) > 0 {
41,654✔
341
                metricsScope.RecordTimer(metrics.PersistenceLatencyPerDomain, duration)
15,205✔
342
        } else {
26,452✔
343
                metricsScope.RecordTimer(metrics.PersistenceLatency, duration)
11,247✔
344
        }
11,247✔
345

346
        if p.enableLatencyHistogramMetrics {
26,449✔
347
                metricsScope.RecordHistogramDuration(metrics.PersistenceLatencyHistogram, duration)
×
348
        }
×
349
        if err != nil {
27,280✔
350
                if len(tags) > 0 {
882✔
351
                        p.updateErrorMetricPerDomain(scope, err, metricsScope)
51✔
352
                } else {
831✔
353
                        p.updateErrorMetric(scope, err, metricsScope)
780✔
354
                }
780✔
355
        }
356
        return err
26,449✔
357
}
358

359
func (p *shardPersistenceClient) GetName() string {
×
360
        return p.persistence.GetName()
×
361
}
×
362

363
func (p *shardPersistenceClient) CreateShard(
364
        ctx context.Context,
365
        request *CreateShardRequest,
366
) error {
63✔
367
        op := func() error {
126✔
368
                return p.persistence.CreateShard(ctx, request)
63✔
369
        }
63✔
370
        return p.call(metrics.PersistenceCreateShardScope, op)
63✔
371
}
372

373
func (p *shardPersistenceClient) GetShard(
374
        ctx context.Context,
375
        request *GetShardRequest,
376
) (*GetShardResponse, error) {
51✔
377
        var resp *GetShardResponse
51✔
378
        op := func() error {
102✔
379
                var err error
51✔
380
                resp, err = p.persistence.GetShard(ctx, request)
51✔
381
                return err
51✔
382
        }
51✔
383
        err := p.call(metrics.PersistenceGetShardScope, op)
51✔
384
        if err != nil {
66✔
385
                return nil, err
15✔
386
        }
15✔
387
        return resp, nil
36✔
388
}
389

390
func (p *shardPersistenceClient) UpdateShard(
391
        ctx context.Context,
392
        request *UpdateShardRequest,
393
) error {
107✔
394
        op := func() error {
214✔
395
                return p.persistence.UpdateShard(ctx, request)
107✔
396
        }
107✔
397
        return p.call(metrics.PersistenceUpdateShardScope, op)
107✔
398
}
399

400
func (p *shardPersistenceClient) Close() {
39✔
401
        p.persistence.Close()
39✔
402
}
39✔
403

404
func (p *workflowExecutionPersistenceClient) GetName() string {
×
405
        return p.persistence.GetName()
×
406
}
×
407

408
func (p *workflowExecutionPersistenceClient) GetShardID() int {
11,526✔
409
        return p.persistence.GetShardID()
11,526✔
410
}
11,526✔
411

412
func (p *workflowExecutionPersistenceClient) CreateWorkflowExecution(
413
        ctx context.Context,
414
        request *CreateWorkflowExecutionRequest,
415
) (*CreateWorkflowExecutionResponse, error) {
504✔
416
        var resp *CreateWorkflowExecutionResponse
504✔
417
        op := func() error {
1,008✔
418
                var err error
504✔
419
                resp, err = p.persistence.CreateWorkflowExecution(ctx, request)
504✔
420
                return err
504✔
421
        }
504✔
422
        p.logger.SampleInfo("Persistence CreateWorkflowExecution called", p.sampleLoggingRate(),
504✔
423
                tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.NewWorkflowSnapshot.ExecutionInfo.WorkflowID), tag.ShardID(p.GetShardID()))
504✔
424
        var err error
504✔
425
        if p.enableShardIDMetrics() {
1,008✔
426
                err = p.callWithDomainAndShardScope(metrics.PersistenceCreateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
504✔
427
                        metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
504✔
428
        } else {
504✔
429
                err = p.call(metrics.PersistenceCreateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
×
430
        }
×
431
        if err != nil {
549✔
432
                return nil, err
45✔
433
        }
45✔
434
        return resp, nil
462✔
435
}
436

437
func (p *workflowExecutionPersistenceClient) GetWorkflowExecution(
438
        ctx context.Context,
439
        request *GetWorkflowExecutionRequest,
440
) (*GetWorkflowExecutionResponse, error) {
1,138✔
441
        var resp *GetWorkflowExecutionResponse
1,138✔
442
        op := func() error {
2,276✔
443
                var err error
1,138✔
444
                resp, err = p.persistence.GetWorkflowExecution(ctx, request)
1,138✔
445
                return err
1,138✔
446
        }
1,138✔
447
        var err error
1,138✔
448
        if p.enableShardIDMetrics() {
2,276✔
449
                err = p.callWithDomainAndShardScope(metrics.PersistenceGetWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
1,138✔
450
                        metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
1,138✔
451
        } else {
1,138✔
452
                err = p.call(metrics.PersistenceGetWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
×
453
        }
×
454
        if err != nil {
1,537✔
455
                return nil, err
399✔
456
        }
399✔
457
        return resp, nil
742✔
458
}
459

460
func (p *workflowExecutionPersistenceClient) UpdateWorkflowExecution(
461
        ctx context.Context,
462
        request *UpdateWorkflowExecutionRequest,
463
) (*UpdateWorkflowExecutionResponse, error) {
4,414✔
464
        var resp *UpdateWorkflowExecutionResponse
4,414✔
465
        op := func() error {
8,828✔
466
                var err error
4,414✔
467
                resp, err = p.persistence.UpdateWorkflowExecution(ctx, request)
4,414✔
468
                return err
4,414✔
469
        }
4,414✔
470
        p.logger.SampleInfo("Persistence UpdateWorkflowExecution called", p.sampleLoggingRate(),
4,414✔
471
                tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.UpdateWorkflowMutation.ExecutionInfo.WorkflowID), tag.ShardID(p.GetShardID()))
4,414✔
472
        var err error
4,414✔
473
        if p.enableShardIDMetrics() {
8,828✔
474
                err = p.callWithDomainAndShardScope(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
4,414✔
475
                        metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
4,414✔
476
        } else {
4,414✔
477
                err = p.call(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
×
478
        }
×
479
        if err != nil {
4,414✔
480
                return nil, err
×
481
        }
×
482
        return resp, nil
4,414✔
483
}
484

485
func (p *workflowExecutionPersistenceClient) ConflictResolveWorkflowExecution(
486
        ctx context.Context,
487
        request *ConflictResolveWorkflowExecutionRequest,
488
) (*ConflictResolveWorkflowExecutionResponse, error) {
3✔
489
        var resp *ConflictResolveWorkflowExecutionResponse
3✔
490
        op := func() error {
6✔
491
                var err error
3✔
492
                resp, err = p.persistence.ConflictResolveWorkflowExecution(ctx, request)
3✔
493
                return err
3✔
494
        }
3✔
495
        p.logger.SampleInfo("Persistence ConflictResolveWorkflowExecution called", p.sampleLoggingRate(),
3✔
496
                tag.WorkflowDomainName(request.DomainName), tag.ShardID(p.GetShardID()))
3✔
497
        var err error
3✔
498
        if p.enableShardIDMetrics() {
6✔
499
                err = p.callWithDomainAndShardScope(metrics.PersistenceConflictResolveWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
3✔
500
                        metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
3✔
501
        } else {
3✔
502
                err = p.call(metrics.PersistenceConflictResolveWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
×
503
        }
×
504
        if err != nil {
3✔
505
                return nil, err
×
506
        }
×
507
        return resp, nil
3✔
508
}
509

510
func (p *workflowExecutionPersistenceClient) DeleteWorkflowExecution(
511
        ctx context.Context,
512
        request *DeleteWorkflowExecutionRequest,
513
) error {
54✔
514
        op := func() error {
108✔
515
                return p.persistence.DeleteWorkflowExecution(ctx, request)
54✔
516
        }
54✔
517
        p.logger.SampleInfo("Persistence DeleteWorkflowExecution called", p.sampleLoggingRate(),
54✔
518
                tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID()))
54✔
519
        if p.enableShardIDMetrics() {
108✔
520
                return p.callWithDomainAndShardScope(metrics.PersistenceDeleteWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
54✔
521
                        metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
54✔
522
        }
54✔
523
        return p.call(metrics.PersistenceDeleteWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
×
524

525
}
526

527
func (p *workflowExecutionPersistenceClient) DeleteCurrentWorkflowExecution(
528
        ctx context.Context,
529
        request *DeleteCurrentWorkflowExecutionRequest,
530
) error {
54✔
531
        op := func() error {
108✔
532
                return p.persistence.DeleteCurrentWorkflowExecution(ctx, request)
54✔
533
        }
54✔
534
        p.logger.SampleInfo("Persistence DeleteCurrentWorkflowExecution called", p.sampleLoggingRate(),
54✔
535
                tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID()))
54✔
536
        if p.enableShardIDMetrics() {
108✔
537
                return p.callWithDomainAndShardScope(metrics.PersistenceDeleteCurrentWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
54✔
538
                        metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
54✔
539
        }
54✔
540
        return p.call(metrics.PersistenceDeleteCurrentWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
×
541
}
542

543
func (p *workflowExecutionPersistenceClient) GetCurrentExecution(
544
        ctx context.Context,
545
        request *GetCurrentExecutionRequest,
546
) (*GetCurrentExecutionResponse, error) {
183✔
547
        var resp *GetCurrentExecutionResponse
183✔
548
        op := func() error {
366✔
549
                var err error
183✔
550
                resp, err = p.persistence.GetCurrentExecution(ctx, request)
183✔
551
                return err
183✔
552
        }
183✔
553
        p.logger.SampleInfo("Persistence GetCurrentExecution called", p.sampleLoggingRate(),
183✔
554
                tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID()))
183✔
555
        var err error
183✔
556
        if p.enableShardIDMetrics() {
366✔
557
                err = p.callWithDomainAndShardScope(metrics.PersistenceGetCurrentExecutionScope, op, metrics.DomainTag(request.DomainName),
183✔
558
                        metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
183✔
559
        } else {
183✔
560
                err = p.call(metrics.PersistenceGetCurrentExecutionScope, op, metrics.DomainTag(request.DomainName))
×
561
        }
×
562
        if err != nil {
195✔
563
                return nil, err
12✔
564
        }
12✔
565
        return resp, nil
174✔
566
}
567

568
func (p *workflowExecutionPersistenceClient) ListCurrentExecutions(
569
        ctx context.Context,
570
        request *ListCurrentExecutionsRequest,
571
) (*ListCurrentExecutionsResponse, error) {
×
572
        var resp *ListCurrentExecutionsResponse
×
573
        op := func() error {
×
574
                var err error
×
575
                resp, err = p.persistence.ListCurrentExecutions(ctx, request)
×
576
                if err == nil && len(resp.Executions) == 0 {
×
577
                        p.metricClient.IncCounter(metrics.PersistenceListCurrentExecutionsScope, metrics.PersistenceEmptyResponseCounter)
×
578
                }
×
579
                return err
×
580
        }
581
        err := p.call(metrics.PersistenceListCurrentExecutionsScope, op)
×
582
        if err != nil {
×
583
                return nil, err
×
584
        }
×
585
        return resp, nil
×
586
}
587

588
func (p *workflowExecutionPersistenceClient) IsWorkflowExecutionExists(
589
        ctx context.Context,
590
        request *IsWorkflowExecutionExistsRequest,
591
) (*IsWorkflowExecutionExistsResponse, error) {
×
592
        var resp *IsWorkflowExecutionExistsResponse
×
593
        op := func() error {
×
594
                var err error
×
595
                resp, err = p.persistence.IsWorkflowExecutionExists(ctx, request)
×
596
                return err
×
597
        }
×
598
        p.logger.SampleInfo("Persistence IsWorkflowExecutionExists called", p.sampleLoggingRate(),
×
599
                tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID()))
×
600
        var err error
×
601
        if p.enableShardIDMetrics() {
×
602
                err = p.callWithDomainAndShardScope(metrics.PersistenceIsWorkflowExecutionExistsScope, op, metrics.DomainTag(request.DomainName),
×
603
                        metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
×
604
        } else {
×
605
                err = p.call(metrics.PersistenceIsWorkflowExecutionExistsScope, op, metrics.DomainTag(request.DomainName))
×
606
        }
×
607
        if err != nil {
×
608
                return nil, err
×
609
        }
×
610
        return resp, nil
×
611
}
612

613
func (p *workflowExecutionPersistenceClient) ListConcreteExecutions(
614
        ctx context.Context,
615
        request *ListConcreteExecutionsRequest,
616
) (*ListConcreteExecutionsResponse, error) {
×
617
        var resp *ListConcreteExecutionsResponse
×
618
        op := func() error {
×
619
                var err error
×
620
                resp, err = p.persistence.ListConcreteExecutions(ctx, request)
×
621
                return err
×
622
        }
×
623
        err := p.call(metrics.PersistenceListConcreteExecutionsScope, op)
×
624
        if err != nil {
×
625
                return nil, err
×
626
        }
×
627
        return resp, nil
×
628
}
629

630
func (p *workflowExecutionPersistenceClient) GetTransferTasks(
631
        ctx context.Context,
632
        request *GetTransferTasksRequest,
633
) (*GetTransferTasksResponse, error) {
2,317✔
634
        var resp *GetTransferTasksResponse
2,317✔
635
        op := func() error {
4,634✔
636
                var err error
2,317✔
637
                resp, err = p.persistence.GetTransferTasks(ctx, request)
2,317✔
638
                if err == nil && len(resp.Tasks) == 0 {
2,416✔
639
                        p.metricClient.IncCounter(metrics.PersistenceGetTransferTasksScope, metrics.PersistenceEmptyResponseCounter)
99✔
640
                }
99✔
641
                return err
2,317✔
642
        }
643
        err := p.call(metrics.PersistenceGetTransferTasksScope, op)
2,317✔
644
        if err != nil {
2,317✔
645
                return nil, err
×
646
        }
×
647
        return resp, nil
2,317✔
648
}
649

650
func (p *workflowExecutionPersistenceClient) GetCrossClusterTasks(
651
        ctx context.Context,
652
        request *GetCrossClusterTasksRequest,
653
) (*GetCrossClusterTasksResponse, error) {
163✔
654
        var resp *GetCrossClusterTasksResponse
163✔
655
        op := func() error {
326✔
656
                var err error
163✔
657
                resp, err = p.persistence.GetCrossClusterTasks(ctx, request)
163✔
658
                if err == nil && len(resp.Tasks) == 0 {
326✔
659
                        p.metricClient.IncCounter(metrics.PersistenceGetCrossClusterTasksScope, metrics.PersistenceEmptyResponseCounter)
163✔
660
                }
163✔
661
                return err
163✔
662
        }
663
        err := p.call(metrics.PersistenceGetCrossClusterTasksScope, op)
163✔
664
        if err != nil {
163✔
665
                return nil, err
×
666
        }
×
667
        return resp, nil
163✔
668
}
669

670
func (p *workflowExecutionPersistenceClient) GetReplicationTasks(
671
        ctx context.Context,
672
        request *GetReplicationTasksRequest,
673
) (*GetReplicationTasksResponse, error) {
122✔
674
        var resp *GetReplicationTasksResponse
122✔
675
        op := func() error {
244✔
676
                var err error
122✔
677
                resp, err = p.persistence.GetReplicationTasks(ctx, request)
122✔
678
                if err == nil && len(resp.Tasks) == 0 {
244✔
679
                        p.metricClient.IncCounter(metrics.PersistenceGetReplicationTasksScope, metrics.PersistenceEmptyResponseCounter)
122✔
680
                }
122✔
681
                return err
122✔
682
        }
683
        err := p.call(metrics.PersistenceGetReplicationTasksScope, op)
122✔
684
        if err != nil {
122✔
685
                return nil, err
×
686
        }
×
687
        return resp, nil
122✔
688
}
689

690
func (p *workflowExecutionPersistenceClient) CompleteTransferTask(
691
        ctx context.Context,
692
        request *CompleteTransferTaskRequest,
693
) error {
×
694
        op := func() error {
×
695
                return p.persistence.CompleteTransferTask(ctx, request)
×
696
        }
×
697
        return p.call(metrics.PersistenceCompleteTransferTaskScope, op)
×
698
}
699

700
func (p *workflowExecutionPersistenceClient) RangeCompleteTransferTask(
701
        ctx context.Context,
702
        request *RangeCompleteTransferTaskRequest,
703
) (*RangeCompleteTransferTaskResponse, error) {
112✔
704
        var resp *RangeCompleteTransferTaskResponse
112✔
705
        op := func() error {
224✔
706
                var err error
112✔
707
                resp, err = p.persistence.RangeCompleteTransferTask(ctx, request)
112✔
708
                return err
112✔
709
        }
112✔
710
        err := p.call(metrics.PersistenceRangeCompleteTransferTaskScope, op)
112✔
711
        if err != nil {
112✔
712
                return nil, err
×
713
        }
×
714
        return resp, nil
112✔
715
}
716

717
func (p *workflowExecutionPersistenceClient) CompleteCrossClusterTask(
718
        ctx context.Context,
719
        request *CompleteCrossClusterTaskRequest,
720
) error {
×
721
        op := func() error {
×
722
                return p.persistence.CompleteCrossClusterTask(ctx, request)
×
723
        }
×
724
        return p.call(metrics.PersistenceCompleteCrossClusterTaskScope, op)
×
725
}
726

727
func (p *workflowExecutionPersistenceClient) RangeCompleteCrossClusterTask(
728
        ctx context.Context,
729
        request *RangeCompleteCrossClusterTaskRequest,
730
) (*RangeCompleteCrossClusterTaskResponse, error) {
137✔
731
        var resp *RangeCompleteCrossClusterTaskResponse
137✔
732
        op := func() error {
274✔
733
                var err error
137✔
734
                resp, err = p.persistence.RangeCompleteCrossClusterTask(ctx, request)
137✔
735
                return err
137✔
736
        }
137✔
737
        err := p.call(metrics.PersistenceRangeCompleteCrossClusterTaskScope, op)
137✔
738
        if err != nil {
137✔
739
                return nil, err
×
740
        }
×
741
        return resp, nil
137✔
742
}
743

744
func (p *workflowExecutionPersistenceClient) CompleteReplicationTask(
745
        ctx context.Context,
746
        request *CompleteReplicationTaskRequest,
747
) error {
×
748
        op := func() error {
×
749
                return p.persistence.CompleteReplicationTask(ctx, request)
×
750
        }
×
751
        return p.call(metrics.PersistenceCompleteReplicationTaskScope, op)
×
752
}
753

754
func (p *workflowExecutionPersistenceClient) RangeCompleteReplicationTask(
755
        ctx context.Context,
756
        request *RangeCompleteReplicationTaskRequest,
757
) (*RangeCompleteReplicationTaskResponse, error) {
120✔
758
        var resp *RangeCompleteReplicationTaskResponse
120✔
759
        op := func() error {
240✔
760
                var err error
120✔
761
                resp, err = p.persistence.RangeCompleteReplicationTask(ctx, request)
120✔
762
                return err
120✔
763
        }
120✔
764
        err := p.call(metrics.PersistenceRangeCompleteReplicationTaskScope, op)
120✔
765
        if err != nil {
120✔
766
                return nil, err
×
767
        }
×
768
        return resp, nil
120✔
769
}
770

771
func (p *workflowExecutionPersistenceClient) PutReplicationTaskToDLQ(
772
        ctx context.Context,
773
        request *PutReplicationTaskToDLQRequest,
774
) error {
3✔
775
        op := func() error {
6✔
776
                return p.persistence.PutReplicationTaskToDLQ(ctx, request)
3✔
777
        }
3✔
778
        return p.call(metrics.PersistencePutReplicationTaskToDLQScope, op, metrics.DomainTag(request.DomainName))
3✔
779
}
780

781
func (p *workflowExecutionPersistenceClient) GetReplicationTasksFromDLQ(
782
        ctx context.Context,
783
        request *GetReplicationTasksFromDLQRequest,
784
) (*GetReplicationTasksFromDLQResponse, error) {
3✔
785
        var resp *GetReplicationTasksFromDLQResponse
3✔
786
        op := func() error {
6✔
787
                var err error
3✔
788
                resp, err = p.persistence.GetReplicationTasksFromDLQ(ctx, request)
3✔
789
                if err == nil && len(resp.Tasks) == 0 {
6✔
790
                        p.metricClient.IncCounter(metrics.PersistenceGetReplicationTasksFromDLQScope, metrics.PersistenceEmptyResponseCounter)
3✔
791
                }
3✔
792
                return err
3✔
793
        }
794
        err := p.call(metrics.PersistenceGetReplicationTasksFromDLQScope, op)
3✔
795
        if err != nil {
3✔
796
                return nil, err
×
797
        }
×
798
        return resp, nil
3✔
799
}
800

801
func (p *workflowExecutionPersistenceClient) GetReplicationDLQSize(
802
        ctx context.Context,
803
        request *GetReplicationDLQSizeRequest,
804
) (*GetReplicationDLQSizeResponse, error) {
12✔
805
        var resp *GetReplicationDLQSizeResponse
12✔
806
        op := func() error {
24✔
807
                var err error
12✔
808
                resp, err = p.persistence.GetReplicationDLQSize(ctx, request)
12✔
809
                return err
12✔
810
        }
12✔
811
        err := p.call(metrics.PersistenceGetReplicationDLQSizeScope, op)
12✔
812
        if err != nil {
12✔
813
                return nil, err
×
814
        }
×
815
        return resp, nil
12✔
816
}
817

818
func (p *workflowExecutionPersistenceClient) DeleteReplicationTaskFromDLQ(
819
        ctx context.Context,
820
        request *DeleteReplicationTaskFromDLQRequest,
821
) error {
×
822
        op := func() error {
×
823
                return p.persistence.DeleteReplicationTaskFromDLQ(ctx, request)
×
824
        }
×
825
        return p.call(metrics.PersistenceDeleteReplicationTaskFromDLQScope, op)
×
826
}
827

828
func (p *workflowExecutionPersistenceClient) RangeDeleteReplicationTaskFromDLQ(
829
        ctx context.Context,
830
        request *RangeDeleteReplicationTaskFromDLQRequest,
831
) (*RangeDeleteReplicationTaskFromDLQResponse, error) {
×
832
        var resp *RangeDeleteReplicationTaskFromDLQResponse
×
833
        op := func() error {
×
834
                var err error
×
835
                resp, err = p.persistence.RangeDeleteReplicationTaskFromDLQ(ctx, request)
×
836
                return err
×
837
        }
×
838
        err := p.call(metrics.PersistenceRangeDeleteReplicationTaskFromDLQScope, op)
×
839
        if err != nil {
×
840
                return nil, err
×
841
        }
×
842
        return resp, nil
×
843
}
844

845
func (p *workflowExecutionPersistenceClient) CreateFailoverMarkerTasks(
846
        ctx context.Context,
847
        request *CreateFailoverMarkersRequest,
848
) error {
×
849
        op := func() error {
×
850
                return p.persistence.CreateFailoverMarkerTasks(ctx, request)
×
851
        }
×
852
        return p.call(metrics.PersistenceCreateFailoverMarkerTasksScope, op)
×
853
}
854

855
func (p *workflowExecutionPersistenceClient) GetTimerIndexTasks(
856
        ctx context.Context,
857
        request *GetTimerIndexTasksRequest,
858
) (*GetTimerIndexTasksResponse, error) {
3,249✔
859
        var resp *GetTimerIndexTasksResponse
3,249✔
860
        op := func() error {
6,498✔
861
                var err error
3,249✔
862
                resp, err = p.persistence.GetTimerIndexTasks(ctx, request)
3,249✔
863
                if err == nil && len(resp.Timers) == 0 {
3,349✔
864
                        p.metricClient.IncCounter(metrics.PersistenceGetTimerIndexTasksScope, metrics.PersistenceEmptyResponseCounter)
100✔
865
                }
100✔
866
                return err
3,249✔
867
        }
868
        err := p.call(metrics.PersistenceGetTimerIndexTasksScope, op)
3,249✔
869
        if err != nil {
3,249✔
870
                return nil, err
×
871
        }
×
872
        return resp, nil
3,249✔
873
}
874

875
func (p *workflowExecutionPersistenceClient) CompleteTimerTask(
876
        ctx context.Context,
877
        request *CompleteTimerTaskRequest,
878
) error {
×
879
        op := func() error {
×
880
                return p.persistence.CompleteTimerTask(ctx, request)
×
881
        }
×
882
        return p.call(metrics.PersistenceCompleteTimerTaskScope, op)
×
883
}
884

885
func (p *workflowExecutionPersistenceClient) RangeCompleteTimerTask(
886
        ctx context.Context,
887
        request *RangeCompleteTimerTaskRequest,
888
) (*RangeCompleteTimerTaskResponse, error) {
38✔
889
        var resp *RangeCompleteTimerTaskResponse
38✔
890
        op := func() error {
76✔
891
                var err error
38✔
892
                resp, err = p.persistence.RangeCompleteTimerTask(ctx, request)
38✔
893
                return err
38✔
894
        }
38✔
895
        err := p.call(metrics.PersistenceRangeCompleteTimerTaskScope, op)
38✔
896
        if err != nil {
38✔
897
                return nil, err
×
898
        }
×
899
        return resp, nil
38✔
900
}
901

902
func (p *workflowExecutionPersistenceClient) Close() {
51✔
903
        p.persistence.Close()
51✔
904
}
51✔
905

906
func (p *taskPersistenceClient) GetName() string {
×
907
        return p.persistence.GetName()
×
908
}
×
909

910
func (p *taskPersistenceClient) CreateTasks(
911
        ctx context.Context,
912
        request *CreateTasksRequest,
913
) (*CreateTasksResponse, error) {
934✔
914
        var resp *CreateTasksResponse
934✔
915
        op := func() error {
1,868✔
916
                var err error
934✔
917
                resp, err = p.persistence.CreateTasks(ctx, request)
934✔
918
                return err
934✔
919
        }
934✔
920
        err := p.call(metrics.PersistenceCreateTaskScope, op, metrics.DomainTag(request.DomainName))
934✔
921
        if err != nil {
934✔
922
                return nil, err
×
923
        }
×
924
        return resp, nil
934✔
925
}
926

927
func (p *taskPersistenceClient) GetTasks(
928
        ctx context.Context,
929
        request *GetTasksRequest,
930
) (*GetTasksResponse, error) {
980✔
931
        var resp *GetTasksResponse
980✔
932
        op := func() error {
1,960✔
933
                var err error
980✔
934
                resp, err = p.persistence.GetTasks(ctx, request)
980✔
935
                if err == nil && len(resp.Tasks) == 0 {
1,024✔
936
                        p.metricClient.IncCounter(metrics.PersistenceGetTasksScope, metrics.PersistenceEmptyResponseCounter)
44✔
937
                }
44✔
938
                return err
980✔
939
        }
940
        err := p.call(metrics.PersistenceGetTasksScope, op, metrics.DomainTag(request.DomainName))
980✔
941
        if err != nil {
980✔
942
                return nil, err
×
943
        }
×
944
        return resp, nil
980✔
945
}
946

947
func (p *taskPersistenceClient) CompleteTask(
948
        ctx context.Context,
949
        request *CompleteTaskRequest,
950
) error {
×
951
        op := func() error {
×
952
                return p.persistence.CompleteTask(ctx, request)
×
953
        }
×
954
        return p.call(metrics.PersistenceCompleteTaskScope, op, metrics.DomainTag(request.DomainName))
×
955
}
956

957
func (p *taskPersistenceClient) CompleteTasksLessThan(
958
        ctx context.Context,
959
        request *CompleteTasksLessThanRequest,
960
) (*CompleteTasksLessThanResponse, error) {
539✔
961
        var resp *CompleteTasksLessThanResponse
539✔
962
        op := func() error {
1,078✔
963
                var err error
539✔
964
                resp, err = p.persistence.CompleteTasksLessThan(ctx, request)
539✔
965
                return err
539✔
966
        }
539✔
967
        err := p.call(metrics.PersistenceCompleteTasksLessThanScope, op, metrics.DomainTag(request.DomainName))
539✔
968
        if err != nil {
539✔
969
                return nil, err
×
970
        }
×
971
        return resp, nil
539✔
972
}
973

974
func (p *taskPersistenceClient) GetOrphanTasks(ctx context.Context, request *GetOrphanTasksRequest) (*GetOrphanTasksResponse, error) {
×
975
        var resp *GetOrphanTasksResponse
×
976
        op := func() error {
×
977
                var err error
×
978
                resp, err = p.persistence.GetOrphanTasks(ctx, request)
×
979
                return err
×
980
        }
×
981
        err := p.call(metrics.PersistenceGetOrphanTasksScope, op)
×
982
        if err != nil {
×
983
                return nil, err
×
984
        }
×
985
        return resp, nil
×
986
}
987

988
func (p *taskPersistenceClient) LeaseTaskList(
989
        ctx context.Context,
990
        request *LeaseTaskListRequest,
991
) (*LeaseTaskListResponse, error) {
1,343✔
992
        var resp *LeaseTaskListResponse
1,343✔
993
        op := func() error {
2,686✔
994
                var err error
1,343✔
995
                resp, err = p.persistence.LeaseTaskList(ctx, request)
1,343✔
996
                return err
1,343✔
997
        }
1,343✔
998
        err := p.call(metrics.PersistenceLeaseTaskListScope, op, metrics.DomainTag(request.DomainName))
1,343✔
999
        if err != nil {
1,343✔
1000
                return nil, err
×
1001
        }
×
1002
        return resp, nil
1,343✔
1003
}
1004

1005
func (p *taskPersistenceClient) ListTaskList(
1006
        ctx context.Context,
1007
        request *ListTaskListRequest,
1008
) (*ListTaskListResponse, error) {
×
1009
        var resp *ListTaskListResponse
×
1010
        op := func() error {
×
1011
                var err error
×
1012
                resp, err = p.persistence.ListTaskList(ctx, request)
×
1013
                return err
×
1014
        }
×
1015
        err := p.call(metrics.PersistenceListTaskListScope, op)
×
1016
        if err != nil {
×
1017
                return nil, err
×
1018
        }
×
1019
        return resp, nil
×
1020
}
1021

1022
func (p *taskPersistenceClient) DeleteTaskList(
1023
        ctx context.Context,
1024
        request *DeleteTaskListRequest,
1025
) error {
×
1026
        op := func() error {
×
1027
                return p.persistence.DeleteTaskList(ctx, request)
×
1028
        }
×
1029
        return p.call(metrics.PersistenceDeleteTaskListScope, op, metrics.DomainTag(request.DomainName))
×
1030
}
1031

1032
func (p *taskPersistenceClient) UpdateTaskList(
1033
        ctx context.Context,
1034
        request *UpdateTaskListRequest,
1035
) (*UpdateTaskListResponse, error) {
5,888✔
1036
        var resp *UpdateTaskListResponse
5,888✔
1037
        op := func() error {
11,776✔
1038
                var err error
5,888✔
1039
                resp, err = p.persistence.UpdateTaskList(ctx, request)
5,888✔
1040
                return err
5,888✔
1041
        }
5,888✔
1042
        err := p.call(metrics.PersistenceUpdateTaskListScope, op, metrics.DomainTag(request.DomainName))
5,888✔
1043
        if err != nil {
5,888✔
1044
                return nil, err
×
1045
        }
×
1046
        return resp, nil
5,888✔
1047
}
1048

1049
func (p *taskPersistenceClient) Close() {
39✔
1050
        p.persistence.Close()
39✔
1051
}
39✔
1052

1053
func (p *metadataPersistenceClient) GetName() string {
×
1054
        return p.persistence.GetName()
×
1055
}
×
1056

1057
func (p *metadataPersistenceClient) CreateDomain(
1058
        ctx context.Context,
1059
        request *CreateDomainRequest,
1060
) (*CreateDomainResponse, error) {
63✔
1061
        var resp *CreateDomainResponse
63✔
1062
        op := func() error {
126✔
1063
                var err error
63✔
1064
                resp, err = p.persistence.CreateDomain(ctx, request)
63✔
1065
                return err
63✔
1066
        }
63✔
1067
        err := p.call(metrics.PersistenceCreateDomainScope, op)
63✔
1068
        if err != nil {
68✔
1069
                return nil, err
5✔
1070
        }
5✔
1071
        return resp, nil
59✔
1072
}
1073

1074
func (p *metadataPersistenceClient) GetDomain(
1075
        ctx context.Context,
1076
        request *GetDomainRequest,
1077
) (*GetDomainResponse, error) {
773✔
1078
        var resp *GetDomainResponse
773✔
1079
        op := func() error {
1,546✔
1080
                var err error
773✔
1081
                resp, err = p.persistence.GetDomain(ctx, request)
773✔
1082
                return err
773✔
1083
        }
773✔
1084
        err := p.call(metrics.PersistenceGetDomainScope, op)
773✔
1085
        if err != nil {
1,417✔
1086
                return nil, err
644✔
1087
        }
644✔
1088
        return resp, nil
132✔
1089
}
1090

1091
func (p *metadataPersistenceClient) UpdateDomain(
1092
        ctx context.Context,
1093
        request *UpdateDomainRequest,
1094
) error {
×
1095
        op := func() error {
×
1096
                return p.persistence.UpdateDomain(ctx, request)
×
1097
        }
×
1098
        return p.call(metrics.PersistenceUpdateDomainScope, op)
×
1099
}
1100

1101
func (p *metadataPersistenceClient) DeleteDomain(
1102
        ctx context.Context,
1103
        request *DeleteDomainRequest,
1104
) error {
×
1105
        op := func() error {
×
1106
                return p.persistence.DeleteDomain(ctx, request)
×
1107
        }
×
1108
        return p.call(metrics.PersistenceDeleteDomainScope, op)
×
1109
}
1110

1111
func (p *metadataPersistenceClient) DeleteDomainByName(
1112
        ctx context.Context,
1113
        request *DeleteDomainByNameRequest,
1114
) error {
×
1115
        op := func() error {
×
1116
                return p.persistence.DeleteDomainByName(ctx, request)
×
1117
        }
×
1118
        return p.call(metrics.PersistenceDeleteDomainByNameScope, op)
×
1119
}
1120

1121
func (p *metadataPersistenceClient) ListDomains(
1122
        ctx context.Context,
1123
        request *ListDomainsRequest,
1124
) (*ListDomainsResponse, error) {
1,269✔
1125
        var resp *ListDomainsResponse
1,269✔
1126
        op := func() error {
2,538✔
1127
                var err error
1,269✔
1128
                resp, err = p.persistence.ListDomains(ctx, request)
1,269✔
1129
                if err == nil && len(resp.Domains) == 0 {
1,269✔
1130
                        p.metricClient.IncCounter(metrics.PersistenceListDomainScope, metrics.PersistenceEmptyResponseCounter)
×
1131
                }
×
1132
                return err
1,269✔
1133
        }
1134
        err := p.call(metrics.PersistenceListDomainScope, op)
1,269✔
1135
        if err != nil {
1,269✔
1136
                return nil, err
×
1137
        }
×
1138
        return resp, nil
1,269✔
1139
}
1140

1141
func (p *metadataPersistenceClient) GetMetadata(
1142
        ctx context.Context,
1143
) (*GetMetadataResponse, error) {
1,269✔
1144
        var resp *GetMetadataResponse
1,269✔
1145
        op := func() error {
2,538✔
1146
                var err error
1,269✔
1147
                resp, err = p.persistence.GetMetadata(ctx)
1,269✔
1148
                return err
1,269✔
1149
        }
1,269✔
1150
        err := p.call(metrics.PersistenceGetMetadataScope, op)
1,269✔
1151
        if err != nil {
1,269✔
1152
                return nil, err
×
1153
        }
×
1154
        return resp, nil
1,269✔
1155
}
1156

1157
func (p *metadataPersistenceClient) Close() {
39✔
1158
        p.persistence.Close()
39✔
1159
}
39✔
1160

1161
func (p *visibilityPersistenceClient) GetName() string {
×
1162
        return p.persistence.GetName()
×
1163
}
×
1164

1165
func (p *visibilityPersistenceClient) RecordWorkflowExecutionStarted(
1166
        ctx context.Context,
1167
        request *RecordWorkflowExecutionStartedRequest,
1168
) error {
636✔
1169
        op := func() error {
1,272✔
1170
                return p.persistence.RecordWorkflowExecutionStarted(ctx, request)
636✔
1171
        }
636✔
1172
        return p.call(metrics.PersistenceRecordWorkflowExecutionStartedScope, op)
636✔
1173
}
1174

1175
func (p *visibilityPersistenceClient) RecordWorkflowExecutionClosed(
1176
        ctx context.Context,
1177
        request *RecordWorkflowExecutionClosedRequest,
1178
) error {
500✔
1179
        op := func() error {
1,000✔
1180
                return p.persistence.RecordWorkflowExecutionClosed(ctx, request)
500✔
1181
        }
500✔
1182
        return p.call(metrics.PersistenceRecordWorkflowExecutionClosedScope, op)
500✔
1183
}
1184

1185
func (p *visibilityPersistenceClient) RecordWorkflowExecutionUninitialized(
1186
        ctx context.Context,
1187
        request *RecordWorkflowExecutionUninitializedRequest,
1188
) error {
×
1189
        op := func() error {
×
1190
                return p.persistence.RecordWorkflowExecutionUninitialized(ctx, request)
×
1191
        }
×
1192
        return p.call(metrics.PersistenceRecordWorkflowExecutionUninitializedScope, op)
×
1193
}
1194

1195
func (p *visibilityPersistenceClient) UpsertWorkflowExecution(
1196
        ctx context.Context,
1197
        request *UpsertWorkflowExecutionRequest,
1198
) error {
120✔
1199
        op := func() error {
240✔
1200
                return p.persistence.UpsertWorkflowExecution(ctx, request)
120✔
1201
        }
120✔
1202
        return p.call(metrics.PersistenceUpsertWorkflowExecutionScope, op)
120✔
1203
}
1204

1205
func (p *visibilityPersistenceClient) ListOpenWorkflowExecutions(
1206
        ctx context.Context,
1207
        request *ListWorkflowExecutionsRequest,
1208
) (*ListWorkflowExecutionsResponse, error) {
5✔
1209
        var resp *ListWorkflowExecutionsResponse
5✔
1210
        op := func() error {
10✔
1211
                var err error
5✔
1212
                resp, err = p.persistence.ListOpenWorkflowExecutions(ctx, request)
5✔
1213
                return err
5✔
1214
        }
5✔
1215
        err := p.call(metrics.PersistenceListOpenWorkflowExecutionsScope, op)
5✔
1216
        if err != nil {
5✔
1217
                return nil, err
×
1218
        }
×
1219
        return resp, nil
5✔
1220
}
1221

1222
func (p *visibilityPersistenceClient) ListClosedWorkflowExecutions(
1223
        ctx context.Context,
1224
        request *ListWorkflowExecutionsRequest,
1225
) (*ListWorkflowExecutionsResponse, error) {
9✔
1226
        var resp *ListWorkflowExecutionsResponse
9✔
1227
        op := func() error {
18✔
1228
                var err error
9✔
1229
                resp, err = p.persistence.ListClosedWorkflowExecutions(ctx, request)
9✔
1230
                return err
9✔
1231
        }
9✔
1232
        err := p.call(metrics.PersistenceListClosedWorkflowExecutionsScope, op)
9✔
1233
        if err != nil {
9✔
1234
                return nil, err
×
1235
        }
×
1236
        return resp, nil
9✔
1237
}
1238

1239
func (p *visibilityPersistenceClient) ListOpenWorkflowExecutionsByType(
1240
        ctx context.Context,
1241
        request *ListWorkflowExecutionsByTypeRequest,
1242
) (*ListWorkflowExecutionsResponse, error) {
×
1243
        var resp *ListWorkflowExecutionsResponse
×
1244
        op := func() error {
×
1245
                var err error
×
1246
                resp, err = p.persistence.ListOpenWorkflowExecutionsByType(ctx, request)
×
1247
                return err
×
1248
        }
×
1249
        err := p.call(metrics.PersistenceListOpenWorkflowExecutionsByTypeScope, op)
×
1250
        if err != nil {
×
1251
                return nil, err
×
1252
        }
×
1253
        return resp, nil
×
1254
}
1255

1256
func (p *visibilityPersistenceClient) ListClosedWorkflowExecutionsByType(
1257
        ctx context.Context,
1258
        request *ListWorkflowExecutionsByTypeRequest,
1259
) (*ListWorkflowExecutionsResponse, error) {
×
1260
        var resp *ListWorkflowExecutionsResponse
×
1261
        op := func() error {
×
1262
                var err error
×
1263
                resp, err = p.persistence.ListClosedWorkflowExecutionsByType(ctx, request)
×
1264
                return err
×
1265
        }
×
1266
        err := p.call(metrics.PersistenceListClosedWorkflowExecutionsByTypeScope, op)
×
1267
        if err != nil {
×
1268
                return nil, err
×
1269
        }
×
1270
        return resp, nil
×
1271
}
1272

1273
func (p *visibilityPersistenceClient) ListOpenWorkflowExecutionsByWorkflowID(
1274
        ctx context.Context,
1275
        request *ListWorkflowExecutionsByWorkflowIDRequest,
1276
) (*ListWorkflowExecutionsResponse, error) {
87✔
1277
        var resp *ListWorkflowExecutionsResponse
87✔
1278
        op := func() error {
174✔
1279
                var err error
87✔
1280
                resp, err = p.persistence.ListOpenWorkflowExecutionsByWorkflowID(ctx, request)
87✔
1281
                return err
87✔
1282
        }
87✔
1283
        err := p.call(metrics.PersistenceListOpenWorkflowExecutionsByWorkflowIDScope, op)
87✔
1284
        if err != nil {
87✔
1285
                return nil, err
×
1286
        }
×
1287
        return resp, nil
87✔
1288
}
1289

1290
func (p *visibilityPersistenceClient) ListClosedWorkflowExecutionsByWorkflowID(
1291
        ctx context.Context,
1292
        request *ListWorkflowExecutionsByWorkflowIDRequest,
1293
) (*ListWorkflowExecutionsResponse, error) {
15✔
1294
        var resp *ListWorkflowExecutionsResponse
15✔
1295
        op := func() error {
30✔
1296
                var err error
15✔
1297
                resp, err = p.persistence.ListClosedWorkflowExecutionsByWorkflowID(ctx, request)
15✔
1298
                return err
15✔
1299
        }
15✔
1300
        err := p.call(metrics.PersistenceListClosedWorkflowExecutionsByWorkflowIDScope, op)
15✔
1301
        if err != nil {
15✔
1302
                return nil, err
×
1303
        }
×
1304
        return resp, nil
15✔
1305
}
1306

1307
func (p *visibilityPersistenceClient) ListClosedWorkflowExecutionsByStatus(
1308
        ctx context.Context,
1309
        request *ListClosedWorkflowExecutionsByStatusRequest,
1310
) (*ListWorkflowExecutionsResponse, error) {
×
1311
        var resp *ListWorkflowExecutionsResponse
×
1312
        op := func() error {
×
1313
                var err error
×
1314
                resp, err = p.persistence.ListClosedWorkflowExecutionsByStatus(ctx, request)
×
1315
                return err
×
1316
        }
×
1317
        err := p.call(metrics.PersistenceListClosedWorkflowExecutionsByStatusScope, op)
×
1318
        if err != nil {
×
1319
                return nil, err
×
1320
        }
×
1321
        return resp, nil
×
1322
}
1323

1324
func (p *visibilityPersistenceClient) GetClosedWorkflowExecution(
1325
        ctx context.Context,
1326
        request *GetClosedWorkflowExecutionRequest,
1327
) (*GetClosedWorkflowExecutionResponse, error) {
×
1328
        var resp *GetClosedWorkflowExecutionResponse
×
1329
        op := func() error {
×
1330
                var err error
×
1331
                resp, err = p.persistence.GetClosedWorkflowExecution(ctx, request)
×
1332
                return err
×
1333
        }
×
1334
        err := p.call(metrics.PersistenceGetClosedWorkflowExecutionScope, op)
×
1335
        if err != nil {
×
1336
                return nil, err
×
1337
        }
×
1338
        return resp, nil
×
1339
}
1340

1341
func (p *visibilityPersistenceClient) DeleteWorkflowExecution(
1342
        ctx context.Context,
1343
        request *VisibilityDeleteWorkflowExecutionRequest,
1344
) error {
54✔
1345
        op := func() error {
108✔
1346
                return p.persistence.DeleteWorkflowExecution(ctx, request)
54✔
1347
        }
54✔
1348
        return p.call(metrics.PersistenceVisibilityDeleteWorkflowExecutionScope, op)
54✔
1349
}
1350

1351
func (p *visibilityPersistenceClient) DeleteUninitializedWorkflowExecution(
1352
        ctx context.Context,
1353
        request *VisibilityDeleteWorkflowExecutionRequest,
1354
) error {
×
1355
        op := func() error {
×
1356
                return p.persistence.DeleteUninitializedWorkflowExecution(ctx, request)
×
1357
        }
×
1358
        return p.call(metrics.PersistenceVisibilityDeleteUninitializedWorkflowExecutionScope, op)
×
1359
}
1360

1361
func (p *visibilityPersistenceClient) ListWorkflowExecutions(
1362
        ctx context.Context,
1363
        request *ListWorkflowExecutionsByQueryRequest,
1364
) (*ListWorkflowExecutionsResponse, error) {
×
1365
        var resp *ListWorkflowExecutionsResponse
×
1366
        op := func() error {
×
1367
                var err error
×
1368
                resp, err = p.persistence.ListWorkflowExecutions(ctx, request)
×
1369
                return err
×
1370
        }
×
1371
        err := p.call(metrics.PersistenceListWorkflowExecutionsScope, op)
×
1372
        if err != nil {
×
1373
                return nil, err
×
1374
        }
×
1375
        return resp, nil
×
1376
}
1377

1378
func (p *visibilityPersistenceClient) ScanWorkflowExecutions(
1379
        ctx context.Context,
1380
        request *ListWorkflowExecutionsByQueryRequest,
1381
) (*ListWorkflowExecutionsResponse, error) {
×
1382
        var resp *ListWorkflowExecutionsResponse
×
1383
        op := func() error {
×
1384
                var err error
×
1385
                resp, err = p.persistence.ScanWorkflowExecutions(ctx, request)
×
1386
                return err
×
1387
        }
×
1388
        err := p.call(metrics.PersistenceScanWorkflowExecutionsScope, op)
×
1389
        if err != nil {
×
1390
                return nil, err
×
1391
        }
×
1392
        return resp, nil
×
1393
}
1394

1395
func (p *visibilityPersistenceClient) CountWorkflowExecutions(
1396
        ctx context.Context,
1397
        request *CountWorkflowExecutionsRequest,
1398
) (*CountWorkflowExecutionsResponse, error) {
×
1399
        var resp *CountWorkflowExecutionsResponse
×
1400
        op := func() error {
×
1401
                var err error
×
1402
                resp, err = p.persistence.CountWorkflowExecutions(ctx, request)
×
1403
                return err
×
1404
        }
×
1405
        err := p.call(metrics.PersistenceCountWorkflowExecutionsScope, op)
×
1406
        if err != nil {
×
1407
                return nil, err
×
1408
        }
×
1409
        return resp, nil
×
1410
}
1411

1412
func (p *visibilityPersistenceClient) Close() {
27✔
1413
        p.persistence.Close()
27✔
1414
}
27✔
1415

1416
func (p *historyPersistenceClient) GetName() string {
×
1417
        return p.persistence.GetName()
×
1418
}
×
1419

1420
func (p *historyPersistenceClient) Close() {
39✔
1421
        p.persistence.Close()
39✔
1422
}
39✔
1423

1424
// AppendHistoryNodes add(or override) a node to a history branch
1425
func (p *historyPersistenceClient) AppendHistoryNodes(
1426
        ctx context.Context,
1427
        request *AppendHistoryNodesRequest,
1428
) (*AppendHistoryNodesResponse, error) {
3,682✔
1429
        var resp *AppendHistoryNodesResponse
3,682✔
1430
        op := func() error {
7,364✔
1431
                var err error
3,682✔
1432
                resp, err = p.persistence.AppendHistoryNodes(ctx, request)
3,682✔
1433
                return err
3,682✔
1434
        }
3,682✔
1435
        err := p.call(metrics.PersistenceAppendHistoryNodesScope, op, metrics.DomainTag(request.DomainName))
3,682✔
1436
        if err != nil {
3,682✔
1437
                return nil, err
×
1438
        }
×
1439
        return resp, nil
3,682✔
1440
}
1441

1442
// ReadHistoryBranch returns history node data for a branch
1443
func (p *historyPersistenceClient) ReadHistoryBranch(
1444
        ctx context.Context,
1445
        request *ReadHistoryBranchRequest,
1446
) (*ReadHistoryBranchResponse, error) {
1,621✔
1447
        var resp *ReadHistoryBranchResponse
1,621✔
1448
        op := func() error {
3,242✔
1449
                var err error
1,621✔
1450
                resp, err = p.persistence.ReadHistoryBranch(ctx, request)
1,621✔
1451
                if err == nil && len(resp.HistoryEvents) == 0 {
1,633✔
1452
                        scopeWithDomainTag := p.metricClient.Scope(metrics.PersistenceReadHistoryBranchScope, metrics.DomainTag(request.DomainName))
12✔
1453
                        scopeWithDomainTag.IncCounter(metrics.PersistenceEmptyResponseCounter)
12✔
1454
                }
12✔
1455
                return err
1,621✔
1456
        }
1457
        err := p.call(metrics.PersistenceReadHistoryBranchScope, op, metrics.DomainTag(request.DomainName))
1,621✔
1458
        if err != nil {
1,621✔
1459
                return nil, err
×
1460
        }
×
1461
        return resp, nil
1,621✔
1462
}
1463

1464
// ReadHistoryBranchByBatch returns history node data for a branch ByBatch
1465
func (p *historyPersistenceClient) ReadHistoryBranchByBatch(
1466
        ctx context.Context,
1467
        request *ReadHistoryBranchRequest,
1468
) (*ReadHistoryBranchByBatchResponse, error) {
135✔
1469
        var resp *ReadHistoryBranchByBatchResponse
135✔
1470
        op := func() error {
270✔
1471
                var err error
135✔
1472
                resp, err = p.persistence.ReadHistoryBranchByBatch(ctx, request)
135✔
1473
                return err
135✔
1474
        }
135✔
1475
        err := p.call(metrics.PersistenceReadHistoryBranchScope, op, metrics.DomainTag(request.DomainName))
135✔
1476
        if err != nil {
186✔
1477
                return nil, err
51✔
1478
        }
51✔
1479
        return resp, nil
84✔
1480
}
1481

1482
// ReadRawHistoryBranch returns history node raw data for a branch ByBatch
1483
func (p *historyPersistenceClient) ReadRawHistoryBranch(
1484
        ctx context.Context,
1485
        request *ReadHistoryBranchRequest,
1486
) (*ReadRawHistoryBranchResponse, error) {
3✔
1487
        var resp *ReadRawHistoryBranchResponse
3✔
1488
        op := func() error {
6✔
1489
                var err error
3✔
1490
                resp, err = p.persistence.ReadRawHistoryBranch(ctx, request)
3✔
1491
                return err
3✔
1492
        }
3✔
1493
        err := p.call(metrics.PersistenceReadHistoryBranchScope, op, metrics.DomainTag(request.DomainName))
3✔
1494
        if err != nil {
3✔
1495
                return nil, err
×
1496
        }
×
1497
        return resp, nil
3✔
1498
}
1499

1500
// ForkHistoryBranch forks a new branch from a old branch
1501
func (p *historyPersistenceClient) ForkHistoryBranch(
1502
        ctx context.Context,
1503
        request *ForkHistoryBranchRequest,
1504
) (*ForkHistoryBranchResponse, error) {
18✔
1505
        var resp *ForkHistoryBranchResponse
18✔
1506
        op := func() error {
36✔
1507
                var err error
18✔
1508
                resp, err = p.persistence.ForkHistoryBranch(ctx, request)
18✔
1509
                return err
18✔
1510
        }
18✔
1511
        err := p.call(metrics.PersistenceForkHistoryBranchScope, op, metrics.DomainTag(request.DomainName))
18✔
1512
        if err != nil {
18✔
1513
                return nil, err
×
1514
        }
×
1515
        return resp, nil
18✔
1516
}
1517

1518
// DeleteHistoryBranch removes a branch
1519
func (p *historyPersistenceClient) DeleteHistoryBranch(
1520
        ctx context.Context,
1521
        request *DeleteHistoryBranchRequest,
1522
) error {
54✔
1523
        op := func() error {
108✔
1524
                return p.persistence.DeleteHistoryBranch(ctx, request)
54✔
1525
        }
54✔
1526
        return p.call(metrics.PersistenceDeleteHistoryBranchScope, op, metrics.DomainTag(request.DomainName))
54✔
1527
}
1528

1529
func (p *historyPersistenceClient) GetAllHistoryTreeBranches(
1530
        ctx context.Context,
1531
        request *GetAllHistoryTreeBranchesRequest,
1532
) (*GetAllHistoryTreeBranchesResponse, error) {
×
1533
        var resp *GetAllHistoryTreeBranchesResponse
×
1534
        op := func() error {
×
1535
                var err error
×
1536
                resp, err = p.persistence.GetAllHistoryTreeBranches(ctx, request)
×
1537
                if err == nil && len(resp.Branches) == 0 {
×
1538
                        p.metricClient.IncCounter(metrics.PersistenceGetAllHistoryTreeBranchesScope, metrics.PersistenceEmptyResponseCounter)
×
1539
                }
×
1540
                return err
×
1541
        }
1542
        err := p.call(metrics.PersistenceGetAllHistoryTreeBranchesScope, op)
×
1543
        if err != nil {
×
1544
                return nil, err
×
1545
        }
×
1546
        return resp, nil
×
1547
}
1548

1549
// GetHistoryTree returns all branch information of a tree
1550
func (p *historyPersistenceClient) GetHistoryTree(
1551
        ctx context.Context,
1552
        request *GetHistoryTreeRequest,
1553
) (*GetHistoryTreeResponse, error) {
23✔
1554
        var resp *GetHistoryTreeResponse
23✔
1555
        op := func() error {
46✔
1556
                var err error
23✔
1557
                resp, err = p.persistence.GetHistoryTree(ctx, request)
23✔
1558
                return err
23✔
1559
        }
23✔
1560
        err := p.call(metrics.PersistenceGetHistoryTreeScope, op, metrics.DomainTag(request.DomainName))
23✔
1561
        if err != nil {
23✔
1562
                return nil, err
×
1563
        }
×
1564
        return resp, nil
23✔
1565
}
1566

1567
func (p *queuePersistenceClient) EnqueueMessage(
1568
        ctx context.Context,
1569
        message []byte,
1570
) error {
3✔
1571
        op := func() error {
6✔
1572
                return p.persistence.EnqueueMessage(ctx, message)
3✔
1573
        }
3✔
1574
        return p.call(metrics.PersistenceEnqueueMessageScope, op)
3✔
1575
}
1576

1577
func (p *queuePersistenceClient) ReadMessages(
1578
        ctx context.Context,
1579
        lastMessageID int64,
1580
        maxCount int,
1581
) ([]*QueueMessage, error) {
×
1582
        var resp []*QueueMessage
×
1583
        op := func() error {
×
1584
                var err error
×
1585
                resp, err = p.persistence.ReadMessages(ctx, lastMessageID, maxCount)
×
1586
                if err == nil && len(resp) == 0 {
×
1587
                        p.metricClient.IncCounter(metrics.PersistenceReadQueueMessagesScope, metrics.PersistenceEmptyResponseCounter)
×
1588
                }
×
1589
                return err
×
1590
        }
1591
        err := p.call(metrics.PersistenceReadQueueMessagesScope, op)
×
1592
        if err != nil {
×
1593
                return nil, err
×
1594
        }
×
1595
        return resp, nil
×
1596
}
1597

1598
func (p *queuePersistenceClient) UpdateAckLevel(
1599
        ctx context.Context,
1600
        messageID int64,
1601
        clusterName string,
1602
) error {
×
1603
        op := func() error {
×
1604
                return p.persistence.UpdateAckLevel(ctx, messageID, clusterName)
×
1605
        }
×
1606
        return p.call(metrics.PersistenceUpdateAckLevelScope, op)
×
1607
}
1608

1609
func (p *queuePersistenceClient) GetAckLevels(
1610
        ctx context.Context,
1611
) (map[string]int64, error) {
×
1612
        var resp map[string]int64
×
1613
        op := func() error {
×
1614
                var err error
×
1615
                resp, err = p.persistence.GetAckLevels(ctx)
×
1616
                return err
×
1617
        }
×
1618
        err := p.call(metrics.PersistenceGetAckLevelScope, op)
×
1619
        if err != nil {
×
1620
                return nil, err
×
1621
        }
×
1622
        return resp, nil
×
1623
}
1624

1625
func (p *queuePersistenceClient) DeleteMessagesBefore(
1626
        ctx context.Context,
1627
        messageID int64,
1628
) error {
×
1629
        op := func() error {
×
1630
                return p.persistence.DeleteMessagesBefore(ctx, messageID)
×
1631
        }
×
1632
        return p.call(metrics.PersistenceDeleteQueueMessagesScope, op)
×
1633
}
1634

1635
func (p *queuePersistenceClient) EnqueueMessageToDLQ(
1636
        ctx context.Context,
1637
        message []byte,
1638
) error {
×
1639
        op := func() error {
×
1640
                return p.persistence.EnqueueMessageToDLQ(ctx, message)
×
1641
        }
×
1642
        return p.call(metrics.PersistenceEnqueueMessageToDLQScope, op)
×
1643
}
1644

1645
func (p *queuePersistenceClient) ReadMessagesFromDLQ(
1646
        ctx context.Context,
1647
        firstMessageID int64,
1648
        lastMessageID int64,
1649
        pageSize int,
1650
        pageToken []byte,
1651
) ([]*QueueMessage, []byte, error) {
×
1652
        var result []*QueueMessage
×
1653
        var token []byte
×
1654
        op := func() error {
×
1655
                var err error
×
1656
                result, token, err = p.persistence.ReadMessagesFromDLQ(ctx, firstMessageID, lastMessageID, pageSize, pageToken)
×
1657
                return err
×
1658
        }
×
1659
        err := p.call(metrics.PersistenceReadQueueMessagesFromDLQScope, op)
×
1660
        if err != nil {
×
1661
                return nil, nil, err
×
1662
        }
×
1663
        return result, token, nil
×
1664
}
1665

1666
func (p *queuePersistenceClient) DeleteMessageFromDLQ(
1667
        ctx context.Context,
1668
        messageID int64,
1669
) error {
×
1670
        op := func() error {
×
1671
                return p.persistence.DeleteMessageFromDLQ(ctx, messageID)
×
1672
        }
×
1673
        return p.call(metrics.PersistenceDeleteQueueMessageFromDLQScope, op)
×
1674
}
1675

1676
func (p *queuePersistenceClient) RangeDeleteMessagesFromDLQ(
1677
        ctx context.Context,
1678
        firstMessageID int64,
1679
        lastMessageID int64,
1680
) error {
×
1681
        op := func() error {
×
1682
                return p.persistence.RangeDeleteMessagesFromDLQ(ctx, firstMessageID, lastMessageID)
×
1683
        }
×
1684
        return p.call(metrics.PersistenceRangeDeleteMessagesFromDLQScope, op)
×
1685
}
1686

1687
func (p *queuePersistenceClient) UpdateDLQAckLevel(
1688
        ctx context.Context,
1689
        messageID int64,
1690
        clusterName string,
1691
) error {
×
1692
        op := func() error {
×
1693
                return p.persistence.UpdateDLQAckLevel(ctx, messageID, clusterName)
×
1694
        }
×
1695
        return p.call(metrics.PersistenceUpdateDLQAckLevelScope, op)
×
1696
}
1697

1698
func (p *queuePersistenceClient) GetDLQAckLevels(
1699
        ctx context.Context,
1700
) (map[string]int64, error) {
×
1701
        var resp map[string]int64
×
1702
        op := func() error {
×
1703
                var err error
×
1704
                resp, err = p.persistence.GetDLQAckLevels(ctx)
×
1705
                return err
×
1706
        }
×
1707
        err := p.call(metrics.PersistenceGetDLQAckLevelScope, op)
×
1708
        if err != nil {
×
1709
                return nil, err
×
1710
        }
×
1711
        return resp, nil
×
1712
}
1713

1714
func (p *queuePersistenceClient) GetDLQSize(
1715
        ctx context.Context,
1716
) (int64, error) {
3✔
1717
        var resp int64
3✔
1718
        op := func() error {
6✔
1719
                var err error
3✔
1720
                resp, err = p.persistence.GetDLQSize(ctx)
3✔
1721
                return err
3✔
1722
        }
3✔
1723
        err := p.call(metrics.PersistenceGetDLQSizeScope, op)
3✔
1724
        if err != nil {
3✔
1725
                return 0, err
×
1726
        }
×
1727
        return resp, nil
3✔
1728
}
1729

1730
func (p *queuePersistenceClient) Close() {
39✔
1731
        p.persistence.Close()
39✔
1732
}
39✔
1733

1734
func (p *configStorePersistenceClient) FetchDynamicConfig(ctx context.Context, configType ConfigType) (*FetchDynamicConfigResponse, error) {
×
1735
        var resp *FetchDynamicConfigResponse
×
1736
        op := func() error {
×
1737
                var err error
×
1738
                resp, err = p.persistence.FetchDynamicConfig(ctx, configType)
×
1739
                return err
×
1740
        }
×
1741
        err := p.call(metrics.PersistenceFetchDynamicConfigScope, op)
×
1742
        if err != nil {
×
1743
                return nil, err
×
1744
        }
×
1745
        return resp, nil
×
1746
}
1747

1748
func (p *configStorePersistenceClient) UpdateDynamicConfig(ctx context.Context, request *UpdateDynamicConfigRequest, configType ConfigType) error {
×
1749
        op := func() error {
×
1750
                return p.persistence.UpdateDynamicConfig(ctx, request, configType)
×
1751
        }
×
1752
        return p.call(metrics.PersistenceUpdateDynamicConfigScope, op)
×
1753
}
1754

1755
func (p *configStorePersistenceClient) Close() {
×
1756
        p.persistence.Close()
×
1757
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc