• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.2
/service/frontend/adminHandler.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination adminHandler_mock.go -package frontend github.com/uber/cadence/service/frontend AdminHandler
22

23
package frontend
24

25
import (
26
        "context"
27
        "encoding/json"
28
        "errors"
29
        "fmt"
30
        "math"
31
        "strconv"
32
        "time"
33

34
        "github.com/uber/cadence/common/isolationgroup/isolationgroupapi"
35

36
        "github.com/pborman/uuid"
37

38
        "github.com/uber/cadence/.gen/go/shared"
39
        "github.com/uber/cadence/common"
40
        "github.com/uber/cadence/common/backoff"
41
        "github.com/uber/cadence/common/client"
42
        "github.com/uber/cadence/common/codec"
43
        "github.com/uber/cadence/common/definition"
44
        "github.com/uber/cadence/common/domain"
45
        dc "github.com/uber/cadence/common/dynamicconfig"
46
        "github.com/uber/cadence/common/elasticsearch"
47
        "github.com/uber/cadence/common/log"
48
        "github.com/uber/cadence/common/log/tag"
49
        "github.com/uber/cadence/common/metrics"
50
        "github.com/uber/cadence/common/ndc"
51
        "github.com/uber/cadence/common/persistence"
52
        "github.com/uber/cadence/common/resource"
53
        "github.com/uber/cadence/common/service"
54
        "github.com/uber/cadence/common/types"
55
        "github.com/uber/cadence/service/history/execution"
56
)
57

58
var _ AdminHandler = (*adminHandlerImpl)(nil)
59

60
const (
61
        endMessageID int64 = 1<<63 - 1
62
)
63

64
var (
65
        errInvalidFilters = &types.BadRequestError{Message: "Request Filters are invalid, unable to parse."}
66
)
67

68
type (
69
        // AdminHandler interface for admin service
70
        AdminHandler interface {
71
                common.Daemon
72

73
                AddSearchAttribute(context.Context, *types.AddSearchAttributeRequest) error
74
                CloseShard(context.Context, *types.CloseShardRequest) error
75
                DescribeCluster(context.Context) (*types.DescribeClusterResponse, error)
76
                DescribeShardDistribution(context.Context, *types.DescribeShardDistributionRequest) (*types.DescribeShardDistributionResponse, error)
77
                DescribeHistoryHost(context.Context, *types.DescribeHistoryHostRequest) (*types.DescribeHistoryHostResponse, error)
78
                DescribeQueue(context.Context, *types.DescribeQueueRequest) (*types.DescribeQueueResponse, error)
79
                DescribeWorkflowExecution(context.Context, *types.AdminDescribeWorkflowExecutionRequest) (*types.AdminDescribeWorkflowExecutionResponse, error)
80
                GetDLQReplicationMessages(context.Context, *types.GetDLQReplicationMessagesRequest) (*types.GetDLQReplicationMessagesResponse, error)
81
                GetDomainReplicationMessages(context.Context, *types.GetDomainReplicationMessagesRequest) (*types.GetDomainReplicationMessagesResponse, error)
82
                GetReplicationMessages(context.Context, *types.GetReplicationMessagesRequest) (*types.GetReplicationMessagesResponse, error)
83
                GetWorkflowExecutionRawHistoryV2(context.Context, *types.GetWorkflowExecutionRawHistoryV2Request) (*types.GetWorkflowExecutionRawHistoryV2Response, error)
84
                CountDLQMessages(context.Context, *types.CountDLQMessagesRequest) (*types.CountDLQMessagesResponse, error)
85
                MergeDLQMessages(context.Context, *types.MergeDLQMessagesRequest) (*types.MergeDLQMessagesResponse, error)
86
                PurgeDLQMessages(context.Context, *types.PurgeDLQMessagesRequest) error
87
                ReadDLQMessages(context.Context, *types.ReadDLQMessagesRequest) (*types.ReadDLQMessagesResponse, error)
88
                ReapplyEvents(context.Context, *types.ReapplyEventsRequest) error
89
                RefreshWorkflowTasks(context.Context, *types.RefreshWorkflowTasksRequest) error
90
                RemoveTask(context.Context, *types.RemoveTaskRequest) error
91
                ResendReplicationTasks(context.Context, *types.ResendReplicationTasksRequest) error
92
                ResetQueue(context.Context, *types.ResetQueueRequest) error
93
                GetCrossClusterTasks(context.Context, *types.GetCrossClusterTasksRequest) (*types.GetCrossClusterTasksResponse, error)
94
                RespondCrossClusterTasksCompleted(context.Context, *types.RespondCrossClusterTasksCompletedRequest) (*types.RespondCrossClusterTasksCompletedResponse, error)
95
                GetDynamicConfig(context.Context, *types.GetDynamicConfigRequest) (*types.GetDynamicConfigResponse, error)
96
                UpdateDynamicConfig(context.Context, *types.UpdateDynamicConfigRequest) error
97
                RestoreDynamicConfig(context.Context, *types.RestoreDynamicConfigRequest) error
98
                ListDynamicConfig(context.Context, *types.ListDynamicConfigRequest) (*types.ListDynamicConfigResponse, error)
99
                DeleteWorkflow(context.Context, *types.AdminDeleteWorkflowRequest) (*types.AdminDeleteWorkflowResponse, error)
100
                MaintainCorruptWorkflow(context.Context, *types.AdminMaintainWorkflowRequest) (*types.AdminMaintainWorkflowResponse, error)
101
                GetGlobalIsolationGroups(ctx context.Context, request *types.GetGlobalIsolationGroupsRequest) (*types.GetGlobalIsolationGroupsResponse, error)
102
                UpdateGlobalIsolationGroups(ctx context.Context, request *types.UpdateGlobalIsolationGroupsRequest) (*types.UpdateGlobalIsolationGroupsResponse, error)
103
                GetDomainIsolationGroups(ctx context.Context, request *types.GetDomainIsolationGroupsRequest) (*types.GetDomainIsolationGroupsResponse, error)
104
                UpdateDomainIsolationGroups(ctx context.Context, request *types.UpdateDomainIsolationGroupsRequest) (*types.UpdateDomainIsolationGroupsResponse, error)
105
        }
106

107
        // adminHandlerImpl is an implementation for admin service independent of wire protocol
108
        adminHandlerImpl struct {
109
                resource.Resource
110

111
                numberOfHistoryShards int
112
                params                *resource.Params
113
                config                *Config
114
                domainDLQHandler      domain.DLQMessageHandler
115
                domainFailoverWatcher domain.FailoverWatcher
116
                eventSerializer       persistence.PayloadSerializer
117
                esClient              elasticsearch.GenericClient
118
                throttleRetry         *backoff.ThrottleRetry
119
                isolationGroups       isolationgroupapi.Handler
120
        }
121

122
        workflowQueryTemplate struct {
123
                name     string
124
                function func(request *types.AdminMaintainWorkflowRequest) error
125
        }
126

127
        getWorkflowRawHistoryV2Token struct {
128
                DomainName        string
129
                WorkflowID        string
130
                RunID             string
131
                StartEventID      int64
132
                StartEventVersion int64
133
                EndEventID        int64
134
                EndEventVersion   int64
135
                PersistenceToken  []byte
136
                VersionHistories  *types.VersionHistories
137
        }
138
)
139

140
var (
141
        adminServiceRetryPolicy = common.CreateAdminServiceRetryPolicy()
142

143
        corruptWorkflowErrorList = [3]string{
144
                execution.ErrMissingWorkflowStartEvent.Error(),
145
                execution.ErrMissingActivityScheduledEvent.Error(),
146
                persistence.ErrCorruptedHistory.Error(),
147
        }
148
)
149

150
// NewAdminHandler creates a thrift service for the cadence admin service
151
func NewAdminHandler(
152
        resource resource.Resource,
153
        params *resource.Params,
154
        config *Config,
155
        domainHandler domain.Handler,
156
) AdminHandler {
39✔
157

39✔
158
        domainReplicationTaskExecutor := domain.NewReplicationTaskExecutor(
39✔
159
                resource.GetDomainManager(),
39✔
160
                resource.GetTimeSource(),
39✔
161
                resource.GetLogger(),
39✔
162
        )
39✔
163

39✔
164
        return &adminHandlerImpl{
39✔
165
                Resource:              resource,
39✔
166
                numberOfHistoryShards: params.PersistenceConfig.NumHistoryShards,
39✔
167
                params:                params,
39✔
168
                config:                config,
39✔
169
                domainDLQHandler: domain.NewDLQMessageHandler(
39✔
170
                        domainReplicationTaskExecutor,
39✔
171
                        resource.GetDomainReplicationQueue(),
39✔
172
                        resource.GetLogger(),
39✔
173
                        resource.GetMetricsClient(),
39✔
174
                ),
39✔
175
                domainFailoverWatcher: domain.NewFailoverWatcher(
39✔
176
                        resource.GetDomainCache(),
39✔
177
                        resource.GetDomainManager(),
39✔
178
                        resource.GetTimeSource(),
39✔
179
                        config.DomainFailoverRefreshInterval,
39✔
180
                        config.DomainFailoverRefreshTimerJitterCoefficient,
39✔
181
                        resource.GetMetricsClient(),
39✔
182
                        resource.GetLogger(),
39✔
183
                ),
39✔
184
                eventSerializer: persistence.NewPayloadSerializer(),
39✔
185
                esClient:        params.ESClient,
39✔
186
                throttleRetry: backoff.NewThrottleRetry(
39✔
187
                        backoff.WithRetryPolicy(adminServiceRetryPolicy),
39✔
188
                        backoff.WithRetryableError(common.IsServiceTransientError),
39✔
189
                ),
39✔
190
                isolationGroups: isolationgroupapi.New(resource.GetLogger(), resource.GetIsolationGroupStore(), domainHandler),
39✔
191
        }
39✔
192
}
39✔
193

194
// Start starts the handler
195
func (adh *adminHandlerImpl) Start() {
39✔
196
        adh.domainDLQHandler.Start()
39✔
197

39✔
198
        if adh.config.EnableGracefulFailover() {
54✔
199
                adh.domainFailoverWatcher.Start()
15✔
200
        }
15✔
201
}
202

203
// Stop stops the handler
204
func (adh *adminHandlerImpl) Stop() {
39✔
205
        adh.domainDLQHandler.Stop()
39✔
206
        adh.domainFailoverWatcher.Stop()
39✔
207
}
39✔
208

209
// AddSearchAttribute add search attribute to whitelist
210
func (adh *adminHandlerImpl) AddSearchAttribute(
211
        ctx context.Context,
212
        request *types.AddSearchAttributeRequest,
213
) (retError error) {
9✔
214

9✔
215
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
18✔
216
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminAddSearchAttributeScope)
9✔
217
        defer sw.Stop()
9✔
218

9✔
219
        // validate request
9✔
220
        if request == nil {
10✔
221
                return adh.error(errRequestNotSet, scope)
1✔
222
        }
1✔
223
        if err := checkPermission(adh.config, request.SecurityToken); err != nil {
9✔
224
                return adh.error(errNoPermission, scope)
1✔
225
        }
1✔
226
        if len(request.GetSearchAttribute()) == 0 {
9✔
227
                return adh.error(&types.BadRequestError{Message: "SearchAttributes are not provided"}, scope)
2✔
228
        }
2✔
229

230
        searchAttr := request.GetSearchAttribute()
5✔
231
        currentValidAttr, err := adh.params.DynamicConfig.GetMapValue(dc.ValidSearchAttributes, nil)
5✔
232
        if err != nil {
5✔
233
                return adh.error(&types.InternalServiceError{Message: fmt.Sprintf("Failed to get dynamic config, err: %v", err)}, scope)
×
234
        }
×
235

236
        for keyName, valueType := range searchAttr {
10✔
237
                if definition.IsSystemIndexedKey(keyName) {
6✔
238
                        return adh.error(&types.BadRequestError{Message: fmt.Sprintf("Key [%s] is reserved by system", keyName)}, scope)
1✔
239
                }
1✔
240
                if currValType, exist := currentValidAttr[keyName]; exist {
5✔
241
                        if currValType != int(valueType) {
2✔
242
                                return adh.error(&types.BadRequestError{Message: fmt.Sprintf("Key [%s] is already whitelisted as a different type", keyName)}, scope)
1✔
243
                        }
1✔
244
                        adh.GetLogger().Warn("Adding a search attribute that is already existing in dynamicconfig, it's probably a noop if ElasticSearch is already added. Here will re-do it on ElasticSearch.")
×
245
                }
246
                currentValidAttr[keyName] = int(valueType)
3✔
247
        }
248

249
        // update dynamic config. Until the DB based dynamic config is implemented, we shouldn't fail the updating.
250
        err = adh.params.DynamicConfig.UpdateValue(dc.ValidSearchAttributes, currentValidAttr)
3✔
251
        if err != nil {
4✔
252
                adh.GetLogger().Warn("Failed to update dynamicconfig. This is only useful in local dev environment for filebased config. Please ignore this warn if this is in a real Cluster, because your filebased dynamicconfig MUST be updated separately. Configstore dynamic config will also require separate updating via the CLI.")
1✔
253
        }
1✔
254

255
        // when have valid advance visibility config, update elasticsearch mapping, new added field will not be able to remove or update
256
        if err := adh.validateConfigForAdvanceVisibility(); err != nil {
3✔
257
                adh.GetLogger().Warn("Skip updating OpenSearch/ElasticSearch mapping since Advance Visibility hasn't been enabled.")
×
258
        } else {
3✔
259
                index := adh.params.ESConfig.GetVisibilityIndex()
3✔
260
                for k, v := range searchAttr {
6✔
261
                        valueType := convertIndexedValueTypeToESDataType(v)
3✔
262
                        if len(valueType) == 0 {
5✔
263
                                return adh.error(&types.BadRequestError{Message: fmt.Sprintf("Unknown value type, %v", v)}, scope)
2✔
264
                        }
2✔
265
                        err := adh.params.ESClient.PutMapping(ctx, index, definition.Attr, k, valueType)
1✔
266
                        if adh.esClient.IsNotFoundError(err) {
1✔
267
                                err = adh.params.ESClient.CreateIndex(ctx, index)
×
268
                                if err != nil {
×
269
                                        return adh.error(&types.InternalServiceError{Message: fmt.Sprintf("Failed to create ES index, err: %v", err)}, scope)
×
270
                                }
×
271
                                err = adh.params.ESClient.PutMapping(ctx, index, definition.Attr, k, valueType)
×
272
                        }
273
                        if err != nil {
2✔
274
                                return adh.error(&types.InternalServiceError{Message: fmt.Sprintf("Failed to update ES mapping, err: %v", err)}, scope)
1✔
275
                        }
1✔
276
                }
277
        }
278

279
        return nil
×
280
}
281

282
// DescribeWorkflowExecution returns information about the specified workflow execution.
283
func (adh *adminHandlerImpl) DescribeWorkflowExecution(
284
        ctx context.Context,
285
        request *types.AdminDescribeWorkflowExecutionRequest,
286
) (resp *types.AdminDescribeWorkflowExecutionResponse, retError error) {
5✔
287

5✔
288
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
10✔
289
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeWorkflowExecutionScope)
5✔
290
        defer sw.Stop()
5✔
291

5✔
292
        if request == nil {
5✔
293
                return nil, adh.error(errRequestNotSet, scope)
×
294
        }
×
295

296
        if err := validateExecution(request.Execution); err != nil {
5✔
297
                return nil, adh.error(err, scope)
×
298
        }
×
299

300
        shardID := common.WorkflowIDToHistoryShard(request.Execution.WorkflowID, adh.numberOfHistoryShards)
5✔
301
        shardIDstr := string(rune(shardID)) // originally `string(int_shard_id)`, but changing it will change the ring hashing
5✔
302
        shardIDForOutput := strconv.Itoa(shardID)
5✔
303

5✔
304
        historyHost, err := adh.GetMembershipResolver().Lookup(service.History, shardIDstr)
5✔
305
        if err != nil {
5✔
306
                return nil, adh.error(err, scope)
×
307
        }
×
308

309
        domainID, err := adh.GetDomainCache().GetDomainID(request.GetDomain())
5✔
310
        if err != nil {
5✔
311
                return nil, adh.error(err, scope)
×
312
        }
×
313

314
        historyAddr := historyHost.GetAddress()
5✔
315
        resp2, err := adh.GetHistoryClient().DescribeMutableState(ctx, &types.DescribeMutableStateRequest{
5✔
316
                DomainUUID: domainID,
5✔
317
                Execution:  request.Execution,
5✔
318
        })
5✔
319
        if err != nil {
5✔
320
                return &types.AdminDescribeWorkflowExecutionResponse{}, err
×
321
        }
×
322
        return &types.AdminDescribeWorkflowExecutionResponse{
5✔
323
                ShardID:                shardIDForOutput,
5✔
324
                HistoryAddr:            historyAddr,
5✔
325
                MutableStateInDatabase: resp2.MutableStateInDatabase,
5✔
326
                MutableStateInCache:    resp2.MutableStateInCache,
5✔
327
        }, err
5✔
328
}
329

330
// RemoveTask returns information about the internal states of a history host
331
func (adh *adminHandlerImpl) RemoveTask(
332
        ctx context.Context,
333
        request *types.RemoveTaskRequest,
334
) (retError error) {
×
335

×
336
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
×
337
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminRemoveTaskScope)
×
338
        defer sw.Stop()
×
339

×
340
        if request == nil || request.Type == nil {
×
341
                return adh.error(errRequestNotSet, scope)
×
342
        }
×
343
        if err := adh.GetHistoryClient().RemoveTask(ctx, request); err != nil {
×
344
                return adh.error(err, scope)
×
345
        }
×
346
        return nil
×
347
}
348

349
func (adh *adminHandlerImpl) getCorruptWorkflowQueryTemplates(
350
        ctx context.Context, request *types.AdminMaintainWorkflowRequest,
351
) []workflowQueryTemplate {
7✔
352
        client := adh.GetFrontendClient()
7✔
353
        return []workflowQueryTemplate{
7✔
354
                {
7✔
355
                        name: "DescribeWorkflowExecution",
7✔
356
                        function: func(request *types.AdminMaintainWorkflowRequest) error {
14✔
357
                                _, err := client.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{
7✔
358
                                        Domain:    request.Domain,
7✔
359
                                        Execution: request.Execution,
7✔
360
                                })
7✔
361
                                return err
7✔
362
                        },
7✔
363
                },
364
                {
365
                        name: "GetWorkflowExecutionHistory",
366
                        function: func(request *types.AdminMaintainWorkflowRequest) error {
4✔
367
                                _, err := client.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
4✔
368
                                        Domain:    request.Domain,
4✔
369
                                        Execution: request.Execution,
4✔
370
                                })
4✔
371
                                return err
4✔
372
                        },
4✔
373
                },
374
        }
375
}
376

377
func (adh *adminHandlerImpl) MaintainCorruptWorkflow(
378
        ctx context.Context,
379
        request *types.AdminMaintainWorkflowRequest,
380
) (*types.AdminMaintainWorkflowResponse, error) {
7✔
381
        if request.GetExecution() == nil {
7✔
382
                return nil, types.BadRequestError{Message: "Execution is missing"}
×
383
        }
×
384

385
        logger := adh.GetLogger().WithTags(
7✔
386
                tag.WorkflowDomainName(request.Domain),
7✔
387
                tag.WorkflowID(request.GetExecution().GetWorkflowID()),
7✔
388
                tag.WorkflowRunID(request.GetExecution().GetRunID()),
7✔
389
        )
7✔
390

7✔
391
        scope := adh.GetMetricsClient().Scope(metrics.WatchDogScope)
7✔
392
        tagged := scope.Tagged(metrics.DomainTag(request.Domain))
7✔
393
        resp := &types.AdminMaintainWorkflowResponse{
7✔
394
                HistoryDeleted:    false,
7✔
395
                ExecutionsDeleted: false,
7✔
396
                VisibilityDeleted: false,
7✔
397
        }
7✔
398

7✔
399
        queryTemplates := adh.getCorruptWorkflowQueryTemplates(ctx, request)
7✔
400
        for _, template := range queryTemplates {
18✔
401
                functionName := template.name
11✔
402
                queryFunc := template.function
11✔
403
                err := queryFunc(request)
11✔
404
                if err == nil {
16✔
405
                        logger.Info(fmt.Sprintf("Query succeeded for function: %s", functionName))
5✔
406
                        continue
5✔
407
                }
408
                if err != nil {
12✔
409
                        logger.Info(fmt.Sprintf("%s returned error %#v", functionName, err))
6✔
410
                }
6✔
411

412
                // check if the error message indicates corrupt workflow
413
                errorMessage := err.Error()
6✔
414
                for _, corruptMessage := range corruptWorkflowErrorList {
18✔
415
                        if errorMessage == corruptMessage {
17✔
416
                                logger.Info(fmt.Sprintf("Will delete workflow because (%v) returned corrupted error (%#v)",
5✔
417
                                        functionName, err))
5✔
418
                                resp, err = adh.DeleteWorkflow(ctx, request)
5✔
419
                                if err == nil {
10✔
420
                                        tagged.AddCounter(metrics.WatchDogNumDeletedCorruptWorkflows, 1)
5✔
421
                                } else {
5✔
422
                                        tagged.AddCounter(metrics.WatchDogNumFailedToDeleteCorruptWorkflows, 1)
×
423
                                }
×
424
                                return resp, nil
5✔
425
                        }
426
                }
427
        }
428

429
        return resp, nil
2✔
430
}
431

432
func (adh *adminHandlerImpl) deleteWorkflowFromHistory(
433
        ctx context.Context,
434
        logger log.Logger,
435
        shardIDInt int,
436
        mutableState persistence.WorkflowMutableState,
437
) bool {
5✔
438
        historyManager := adh.GetHistoryManager()
5✔
439

5✔
440
        branchInfo := shared.HistoryBranch{}
5✔
441
        thriftrwEncoder := codec.NewThriftRWEncoder()
5✔
442
        branchTokens := [][]byte{mutableState.ExecutionInfo.BranchToken}
5✔
443
        if mutableState.VersionHistories != nil {
5✔
444
                // if VersionHistories is set, then all branch infos are stored in VersionHistories
×
445
                branchTokens = [][]byte{}
×
446
                for _, versionHistory := range mutableState.VersionHistories.ToInternalType().Histories {
×
447
                        branchTokens = append(branchTokens, versionHistory.BranchToken)
×
448
                }
×
449
        }
450

451
        deletedFromHistory := len(branchTokens) == 0
5✔
452
        failedToDeleteFromHistory := false
5✔
453
        for _, branchToken := range branchTokens {
10✔
454
                err := thriftrwEncoder.Decode(branchToken, &branchInfo)
5✔
455
                if err != nil {
5✔
456
                        logger.Error("Cannot decode thrift object", tag.Error(err))
×
457
                        continue
×
458
                }
459
                domainName, err := adh.GetDomainCache().GetDomainName(mutableState.ExecutionInfo.DomainID)
5✔
460
                if err != nil {
5✔
461
                        logger.Error("Unexpected: Cannot fetch domain name", tag.Error(err))
×
462
                        continue
×
463
                }
464
                logger.Info(fmt.Sprintf("Deleting history events for %#v", branchInfo))
5✔
465
                err = historyManager.DeleteHistoryBranch(ctx, &persistence.DeleteHistoryBranchRequest{
5✔
466
                        BranchToken: branchToken,
5✔
467
                        ShardID:     &shardIDInt,
5✔
468
                        DomainName:  domainName,
5✔
469
                })
5✔
470
                if err != nil {
5✔
471
                        logger.Error("Failed to delete history", tag.Error(err))
×
472
                        failedToDeleteFromHistory = true
×
473
                } else {
5✔
474
                        deletedFromHistory = true
5✔
475
                }
5✔
476
        }
477
        return deletedFromHistory && !failedToDeleteFromHistory
5✔
478
}
479

480
func (adh *adminHandlerImpl) deleteWorkflowFromExecutions(
481
        ctx context.Context,
482
        logger log.Logger,
483
        shardIDInt int,
484
        domainID string,
485
        workflowID string,
486
        runID string,
487
        scope metrics.Scope,
488
) bool {
5✔
489
        exeStore, err := adh.GetExecutionManager(shardIDInt)
5✔
490
        if err != nil {
5✔
491
                logger.Error(fmt.Sprintf("Cannot get execution manager for shardID(%v): %#v", shardIDInt, err))
×
492
                return false
×
493
        }
×
494
        domainName, err := adh.GetDomainCache().GetDomainName(domainID)
5✔
495
        if err != nil {
5✔
496
                logger.Error("Unexpected: Cannot fetch domain name", tag.Error(err))
×
497
                return false
×
498
        }
×
499
        req := &persistence.DeleteWorkflowExecutionRequest{
5✔
500
                DomainID:   domainID,
5✔
501
                WorkflowID: workflowID,
5✔
502
                RunID:      runID,
5✔
503
                DomainName: domainName,
5✔
504
        }
5✔
505

5✔
506
        deletedFromExecutions := false
5✔
507
        err = exeStore.DeleteWorkflowExecution(ctx, req)
5✔
508
        if err != nil {
5✔
509
                logger.Error("Delete mutableState row failed", tag.Error(err))
×
510
        } else {
5✔
511
                deletedFromExecutions = true
5✔
512
        }
5✔
513

514
        deleteCurrentReq := &persistence.DeleteCurrentWorkflowExecutionRequest{
5✔
515
                DomainID:   domainID,
5✔
516
                WorkflowID: workflowID,
5✔
517
                RunID:      runID,
5✔
518
                DomainName: domainName,
5✔
519
        }
5✔
520

5✔
521
        err = exeStore.DeleteCurrentWorkflowExecution(ctx, deleteCurrentReq)
5✔
522
        if err != nil {
5✔
523
                logger.Error(fmt.Sprintf("Delete current row failed: %#v", err))
×
524
                deletedFromExecutions = false
×
525
        }
×
526

527
        if deletedFromExecutions {
10✔
528
                logger.Info(fmt.Sprintf("Deleted executions row successfully %#v", deleteCurrentReq))
5✔
529
        }
5✔
530
        return deletedFromExecutions
5✔
531
}
532

533
func (adh *adminHandlerImpl) deleteWorkflowFromVisibility(
534
        ctx context.Context,
535
        logger log.Logger,
536
        domainID string,
537
        domain string,
538
        workflowID string,
539
        runID string,
540
) bool {
5✔
541
        visibilityManager := adh.Resource.GetVisibilityManager()
5✔
542
        if visibilityManager == nil {
5✔
543
                logger.Info("No visibility manager found")
×
544
                return false
×
545
        }
×
546

547
        logger.Info("Deleting workflow from visibility store")
5✔
548
        key := persistence.VisibilityAdminDeletionKey("visibilityAdminDelete")
5✔
549
        visCtx := context.WithValue(ctx, key, true)
5✔
550
        err := visibilityManager.DeleteWorkflowExecution(
5✔
551
                visCtx,
5✔
552
                &persistence.VisibilityDeleteWorkflowExecutionRequest{
5✔
553
                        DomainID:   domainID,
5✔
554
                        Domain:     domain,
5✔
555
                        RunID:      runID,
5✔
556
                        WorkflowID: workflowID,
5✔
557
                        TaskID:     math.MaxInt64,
5✔
558
                },
5✔
559
        )
5✔
560
        if err != nil {
5✔
561
                logger.Error("Cannot delete visibility record", tag.Error(err))
×
562
        } else {
5✔
563
                logger.Info("Deleted visibility record successfully")
5✔
564
        }
5✔
565
        return err == nil
5✔
566
}
567

568
// DeleteWorkflow delete a workflow execution for admin
569
func (adh *adminHandlerImpl) DeleteWorkflow(
570
        ctx context.Context,
571
        request *types.AdminDeleteWorkflowRequest,
572
) (*types.AdminDeleteWorkflowResponse, error) {
5✔
573
        logger := adh.GetLogger()
5✔
574
        scope := adh.GetMetricsClient().Scope(metrics.AdminDeleteWorkflowScope).Tagged(metrics.GetContextTags(ctx)...)
5✔
575
        if request.GetExecution() == nil {
5✔
576
                logger.Info(fmt.Sprintf("Bad request: %#v", request))
×
577
                return nil, adh.error(errRequestNotSet, scope)
×
578
        }
×
579
        domainName := request.GetDomain()
5✔
580
        workflowID := request.GetExecution().GetWorkflowID()
5✔
581
        runID := request.GetExecution().GetRunID()
5✔
582
        skipErrors := request.GetSkipErrors()
5✔
583

5✔
584
        resp, err := adh.DescribeWorkflowExecution(
5✔
585
                ctx,
5✔
586
                &types.AdminDescribeWorkflowExecutionRequest{
5✔
587
                        Domain: domainName,
5✔
588
                        Execution: &types.WorkflowExecution{
5✔
589
                                WorkflowID: workflowID,
5✔
590
                                RunID:      runID,
5✔
591
                        },
5✔
592
                })
5✔
593

5✔
594
        if err != nil {
5✔
595
                logger.Error("Describe workflow failed", tag.Error(err))
×
596
                if !skipErrors {
×
597
                        return nil, adh.error(err, scope)
×
598
                }
×
599
        }
600

601
        msStr := resp.GetMutableStateInDatabase()
5✔
602
        ms := persistence.WorkflowMutableState{}
5✔
603
        err = json.Unmarshal([]byte(msStr), &ms)
5✔
604
        if err != nil {
5✔
605
                logger.Error(fmt.Sprintf("DeleteWorkflow failed: Cannot unmarshal mutableState: %#v", err))
×
606
                return nil, adh.error(err, scope)
×
607
        }
×
608
        domainID := ms.ExecutionInfo.DomainID
5✔
609
        logger = logger.WithTags(
5✔
610
                tag.WorkflowDomainID(domainID),
5✔
611
                tag.WorkflowDomainName(domainName),
5✔
612
                tag.WorkflowID(workflowID),
5✔
613
                tag.WorkflowRunID(runID),
5✔
614
        )
5✔
615

5✔
616
        shardID := resp.GetShardID()
5✔
617
        shardIDInt, err := strconv.Atoi(shardID)
5✔
618
        if err != nil {
5✔
619
                logger.Error(fmt.Sprintf("Cannot convert shardID(%v) to int: %#v", shardID, err))
×
620
                return nil, adh.error(err, scope)
×
621
        }
×
622
        ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
5✔
623
        defer cancel()
5✔
624

5✔
625
        deletedFromHistory := adh.deleteWorkflowFromHistory(ctx, logger, shardIDInt, ms)
5✔
626
        deletedFromExecutions := adh.deleteWorkflowFromExecutions(ctx, logger, shardIDInt, domainID, workflowID, runID, scope)
5✔
627
        deletedFromVisibility := false
5✔
628
        if deletedFromExecutions {
10✔
629
                // Without deleting the executions record, let's not delete the visibility record.
5✔
630
                // If we do that, workflow won't be visible but it will exist in the DB
5✔
631
                deletedFromVisibility = adh.deleteWorkflowFromVisibility(ctx, logger, domainID, domainName, workflowID, runID)
5✔
632
        }
5✔
633

634
        return &types.AdminDeleteWorkflowResponse{
5✔
635
                HistoryDeleted:    deletedFromHistory,
5✔
636
                ExecutionsDeleted: deletedFromExecutions,
5✔
637
                VisibilityDeleted: deletedFromVisibility,
5✔
638
        }, nil
5✔
639
}
640

641
// CloseShard returns information about the internal states of a history host
642
func (adh *adminHandlerImpl) CloseShard(
643
        ctx context.Context,
644
        request *types.CloseShardRequest,
645
) (retError error) {
×
646

×
647
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
×
648
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminCloseShardScope)
×
649
        defer sw.Stop()
×
650

×
651
        if request == nil {
×
652
                return adh.error(errRequestNotSet, scope)
×
653
        }
×
654
        if err := adh.GetHistoryClient().CloseShard(ctx, request); err != nil {
×
655
                return adh.error(err, scope)
×
656
        }
×
657
        return nil
×
658
}
659

660
// ResetQueue resets processing queue states
661
func (adh *adminHandlerImpl) ResetQueue(
662
        ctx context.Context,
663
        request *types.ResetQueueRequest,
664
) (retError error) {
×
665

×
666
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
×
667
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminResetQueueScope)
×
668
        defer sw.Stop()
×
669

×
670
        if request == nil || request.Type == nil {
×
671
                return adh.error(errRequestNotSet, scope)
×
672
        }
×
673
        if request.GetClusterName() == "" {
×
674
                return adh.error(errClusterNameNotSet, scope)
×
675
        }
×
676

677
        if err := adh.GetHistoryClient().ResetQueue(ctx, request); err != nil {
×
678
                return adh.error(err, scope)
×
679
        }
×
680
        return nil
×
681
}
682

683
// DescribeQueue describes processing queue states
684
func (adh *adminHandlerImpl) DescribeQueue(
685
        ctx context.Context,
686
        request *types.DescribeQueueRequest,
687
) (resp *types.DescribeQueueResponse, retError error) {
×
688

×
689
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
×
690
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeQueueScope)
×
691
        defer sw.Stop()
×
692

×
693
        if request == nil || request.Type == nil {
×
694
                return nil, adh.error(errRequestNotSet, scope)
×
695
        }
×
696
        if request.GetClusterName() == "" {
×
697
                return nil, adh.error(errClusterNameNotSet, scope)
×
698
        }
×
699

700
        return adh.GetHistoryClient().DescribeQueue(ctx, request)
×
701
}
702

703
// DescribeShardDistribution returns information about history shard distribution
704
func (adh *adminHandlerImpl) DescribeShardDistribution(
705
        ctx context.Context,
706
        request *types.DescribeShardDistributionRequest,
707
) (resp *types.DescribeShardDistributionResponse, retError error) {
×
708

×
709
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
×
710
        _, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeShardDistributionScope)
×
711
        defer sw.Stop()
×
712

×
713
        numShards := adh.config.NumHistoryShards
×
714
        resp = &types.DescribeShardDistributionResponse{
×
715
                NumberOfShards: int32(numShards),
×
716
                Shards:         make(map[int32]string),
×
717
        }
×
718

×
719
        offset := int(request.PageID * request.PageSize)
×
720
        nextPageStart := offset + int(request.PageSize)
×
721
        for shardID := offset; shardID < numShards && shardID < nextPageStart; shardID++ {
×
722
                info, err := adh.GetMembershipResolver().Lookup(service.History, string(rune(shardID)))
×
723
                if err != nil {
×
724
                        resp.Shards[int32(shardID)] = "unknown"
×
725
                } else {
×
726
                        resp.Shards[int32(shardID)] = info.Identity()
×
727
                }
×
728
        }
729
        return resp, nil
×
730
}
731

732
// DescribeHistoryHost returns information about the internal states of a history host
733
func (adh *adminHandlerImpl) DescribeHistoryHost(
734
        ctx context.Context,
735
        request *types.DescribeHistoryHostRequest,
736
) (resp *types.DescribeHistoryHostResponse, retError error) {
×
737

×
738
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
×
739
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeHistoryHostScope)
×
740
        defer sw.Stop()
×
741

×
742
        if request == nil || (request.ShardIDForHost == nil && request.ExecutionForHost == nil && request.HostAddress == nil) {
×
743
                return nil, adh.error(errRequestNotSet, scope)
×
744
        }
×
745

746
        if request.ExecutionForHost != nil {
×
747
                if err := validateExecution(request.ExecutionForHost); err != nil {
×
748
                        return nil, adh.error(err, scope)
×
749
                }
×
750
        }
751

752
        return adh.GetHistoryClient().DescribeHistoryHost(ctx, request)
×
753
}
754

755
// GetWorkflowExecutionRawHistoryV2 - retrieves the history of workflow execution
756
func (adh *adminHandlerImpl) GetWorkflowExecutionRawHistoryV2(
757
        ctx context.Context,
758
        request *types.GetWorkflowExecutionRawHistoryV2Request,
759
) (resp *types.GetWorkflowExecutionRawHistoryV2Response, retError error) {
9✔
760

9✔
761
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
18✔
762
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetWorkflowExecutionRawHistoryV2Scope)
9✔
763
        defer sw.Stop()
9✔
764

9✔
765
        if err := adh.validateGetWorkflowExecutionRawHistoryV2Request(
9✔
766
                request,
9✔
767
        ); err != nil {
12✔
768
                return nil, adh.error(err, scope)
3✔
769
        }
3✔
770
        domainID, err := adh.GetDomainCache().GetDomainID(request.GetDomain())
6✔
771
        if err != nil {
7✔
772
                return nil, adh.error(err, scope)
1✔
773
        }
1✔
774
        scope = scope.Tagged(metrics.DomainTag(request.GetDomain()))
5✔
775

5✔
776
        execution := request.Execution
5✔
777
        var pageToken *getWorkflowRawHistoryV2Token
5✔
778
        var targetVersionHistory *persistence.VersionHistory
5✔
779
        if request.NextPageToken == nil {
10✔
780
                response, err := adh.GetHistoryClient().GetMutableState(ctx, &types.GetMutableStateRequest{
5✔
781
                        DomainUUID: domainID,
5✔
782
                        Execution:  execution,
5✔
783
                })
5✔
784
                if err != nil {
5✔
785
                        return nil, adh.error(err, scope)
×
786
                }
×
787

788
                versionHistories := persistence.NewVersionHistoriesFromInternalType(
5✔
789
                        response.GetVersionHistories(),
5✔
790
                )
5✔
791
                targetVersionHistory, err = adh.setRequestDefaultValueAndGetTargetVersionHistory(
5✔
792
                        request,
5✔
793
                        versionHistories,
5✔
794
                )
5✔
795
                if err != nil {
5✔
796
                        return nil, adh.error(err, scope)
×
797
                }
×
798

799
                pageToken = adh.generatePaginationToken(request, versionHistories)
5✔
800
        } else {
3✔
801
                pageToken, err = deserializeRawHistoryToken(request.NextPageToken)
3✔
802
                if err != nil {
3✔
803
                        return nil, adh.error(err, scope)
×
804
                }
×
805
                versionHistories := pageToken.VersionHistories
3✔
806
                if versionHistories == nil {
3✔
807
                        return nil, adh.error(&types.BadRequestError{Message: "Invalid version histories."}, scope)
×
808
                }
×
809
                targetVersionHistory, err = adh.setRequestDefaultValueAndGetTargetVersionHistory(
3✔
810
                        request,
3✔
811
                        persistence.NewVersionHistoriesFromInternalType(versionHistories),
3✔
812
                )
3✔
813
                if err != nil {
3✔
814
                        return nil, adh.error(err, scope)
×
815
                }
×
816
        }
817

818
        if err := adh.validatePaginationToken(
5✔
819
                request,
5✔
820
                pageToken,
5✔
821
        ); err != nil {
5✔
822
                return nil, adh.error(err, scope)
×
823
        }
×
824

825
        if pageToken.StartEventID+1 == pageToken.EndEventID {
6✔
826
                // API is exclusive-exclusive. Return empty response here.
1✔
827
                return &types.GetWorkflowExecutionRawHistoryV2Response{
1✔
828
                        HistoryBatches: []*types.DataBlob{},
1✔
829
                        NextPageToken:  nil, // no further pagination
1✔
830
                        VersionHistory: targetVersionHistory.ToInternalType(),
1✔
831
                }, nil
1✔
832
        }
1✔
833
        pageSize := int(request.GetMaximumPageSize())
4✔
834
        shardID := common.WorkflowIDToHistoryShard(
4✔
835
                execution.GetWorkflowID(),
4✔
836
                adh.numberOfHistoryShards,
4✔
837
        )
4✔
838

4✔
839
        rawHistoryResponse, err := adh.GetHistoryManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
4✔
840
                BranchToken: targetVersionHistory.GetBranchToken(),
4✔
841
                // GetWorkflowExecutionRawHistoryV2 is exclusive exclusive.
4✔
842
                // ReadRawHistoryBranch is inclusive exclusive.
4✔
843
                MinEventID:    pageToken.StartEventID + 1,
4✔
844
                MaxEventID:    pageToken.EndEventID,
4✔
845
                PageSize:      pageSize,
4✔
846
                NextPageToken: pageToken.PersistenceToken,
4✔
847
                ShardID:       common.IntPtr(shardID),
4✔
848
                DomainName:    request.GetDomain(),
4✔
849
        })
4✔
850
        if err != nil {
4✔
851
                if _, ok := err.(*types.EntityNotExistsError); ok {
×
852
                        // when no events can be returned from DB, DB layer will return
×
853
                        // EntityNotExistsError, this API shall return empty response
×
854
                        return &types.GetWorkflowExecutionRawHistoryV2Response{
×
855
                                HistoryBatches: []*types.DataBlob{},
×
856
                                NextPageToken:  nil, // no further pagination
×
857
                                VersionHistory: targetVersionHistory.ToInternalType(),
×
858
                        }, nil
×
859
                }
×
860
                return nil, err
×
861
        }
862

863
        pageToken.PersistenceToken = rawHistoryResponse.NextPageToken
4✔
864
        size := rawHistoryResponse.Size
4✔
865
        scope.RecordTimer(metrics.HistorySize, time.Duration(size))
4✔
866

4✔
867
        rawBlobs := rawHistoryResponse.HistoryEventBlobs
4✔
868
        blobs := []*types.DataBlob{}
4✔
869
        for _, blob := range rawBlobs {
7✔
870
                blobs = append(blobs, blob.ToInternal())
3✔
871
        }
3✔
872

873
        result := &types.GetWorkflowExecutionRawHistoryV2Response{
4✔
874
                HistoryBatches: blobs,
4✔
875
                VersionHistory: targetVersionHistory.ToInternalType(),
4✔
876
        }
4✔
877
        if len(pageToken.PersistenceToken) == 0 {
8✔
878
                result.NextPageToken = nil
4✔
879
        } else {
7✔
880
                result.NextPageToken, err = serializeRawHistoryToken(pageToken)
3✔
881
                if err != nil {
3✔
882
                        return nil, err
×
883
                }
×
884
        }
885

886
        return result, nil
4✔
887
}
888

889
// DescribeCluster return information about cadence deployment
890
func (adh *adminHandlerImpl) DescribeCluster(
891
        ctx context.Context,
892
) (resp *types.DescribeClusterResponse, retError error) {
×
893

×
894
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
×
895
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeClusterScope)
×
896
        defer sw.Stop()
×
897

×
898
        // expose visibility store backend and if advanced options are available
×
899
        ave := types.PersistenceFeature{
×
900
                Key:     "advancedVisibilityEnabled",
×
901
                Enabled: adh.params.ESConfig != nil,
×
902
        }
×
903
        visibilityStoreInfo := types.PersistenceInfo{
×
904
                Backend:  adh.Resource.GetVisibilityManager().GetName(),
×
905
                Features: []*types.PersistenceFeature{&ave},
×
906
        }
×
907

×
908
        // expose history store backend
×
909
        historyStoreInfo := types.PersistenceInfo{
×
910
                Backend: adh.GetHistoryManager().GetName(),
×
911
        }
×
912

×
913
        membershipInfo := types.MembershipInfo{}
×
914
        if monitor := adh.GetMembershipResolver(); monitor != nil {
×
915
                currentHost, err := monitor.WhoAmI()
×
916
                if err != nil {
×
917
                        return nil, adh.error(err, scope)
×
918
                }
×
919

920
                membershipInfo.CurrentHost = &types.HostInfo{
×
921
                        Identity: currentHost.Identity(),
×
922
                }
×
923

×
924
                var rings []*types.RingInfo
×
925
                for _, role := range service.List {
×
926
                        var servers []*types.HostInfo
×
927
                        members, err := monitor.Members(role)
×
928
                        if err != nil {
×
929
                                return nil, adh.error(err, scope)
×
930
                        }
×
931

932
                        for _, server := range members {
×
933
                                servers = append(servers, &types.HostInfo{
×
934
                                        Identity: server.Identity(),
×
935
                                })
×
936
                                membershipInfo.ReachableMembers = append(membershipInfo.ReachableMembers, server.Identity())
×
937
                        }
×
938

939
                        rings = append(rings, &types.RingInfo{
×
940
                                Role:        role,
×
941
                                MemberCount: int32(len(servers)),
×
942
                                Members:     servers,
×
943
                        })
×
944
                }
945
                membershipInfo.Rings = rings
×
946
        }
947

948
        return &types.DescribeClusterResponse{
×
949
                SupportedClientVersions: &types.SupportedClientVersions{
×
950
                        GoSdk:   client.SupportedGoSDKVersion,
×
951
                        JavaSdk: client.SupportedJavaSDKVersion,
×
952
                },
×
953
                MembershipInfo: &membershipInfo,
×
954
                PersistenceInfo: map[string]*types.PersistenceInfo{
×
955
                        "visibilityStore": &visibilityStoreInfo,
×
956
                        "historyStore":    &historyStoreInfo,
×
957
                },
×
958
        }, nil
×
959
}
960

961
// GetReplicationMessages returns new replication tasks since the read level provided in the token.
962
func (adh *adminHandlerImpl) GetReplicationMessages(
963
        ctx context.Context,
964
        request *types.GetReplicationMessagesRequest,
965
) (resp *types.GetReplicationMessagesResponse, err error) {
×
966

×
967
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
968
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetReplicationMessagesScope)
×
969
        defer sw.Stop()
×
970

×
971
        if request == nil {
×
972
                return nil, adh.error(errRequestNotSet, scope)
×
973
        }
×
974
        if request.ClusterName == "" {
×
975
                return nil, adh.error(errClusterNameNotSet, scope)
×
976
        }
×
977

978
        resp, err = adh.GetHistoryRawClient().GetReplicationMessages(ctx, request)
×
979
        if err != nil {
×
980
                return nil, adh.error(err, scope)
×
981
        }
×
982
        return resp, nil
×
983
}
984

985
// GetDomainReplicationMessages returns new domain replication tasks since last retrieved task ID.
986
func (adh *adminHandlerImpl) GetDomainReplicationMessages(
987
        ctx context.Context,
988
        request *types.GetDomainReplicationMessagesRequest,
989
) (resp *types.GetDomainReplicationMessagesResponse, err error) {
×
990

×
991
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
992
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetDomainReplicationMessagesScope)
×
993
        defer sw.Stop()
×
994

×
995
        if request == nil {
×
996
                return nil, adh.error(errRequestNotSet, scope)
×
997
        }
×
998

999
        if adh.GetDomainReplicationQueue() == nil {
×
1000
                return nil, adh.error(errors.New("domain replication queue not enabled for cluster"), scope)
×
1001
        }
×
1002

1003
        lastMessageID := defaultLastMessageID
×
1004
        if request.LastRetrievedMessageID != nil {
×
1005
                lastMessageID = request.GetLastRetrievedMessageID()
×
1006
        }
×
1007

1008
        if lastMessageID == defaultLastMessageID {
×
1009
                clusterAckLevels, err := adh.GetDomainReplicationQueue().GetAckLevels(ctx)
×
1010
                if err == nil {
×
1011
                        if ackLevel, ok := clusterAckLevels[request.GetClusterName()]; ok {
×
1012
                                lastMessageID = ackLevel
×
1013
                        }
×
1014
                }
1015
        }
1016

1017
        replicationTasks, lastMessageID, err := adh.GetDomainReplicationQueue().GetReplicationMessages(
×
1018
                ctx,
×
1019
                lastMessageID,
×
1020
                getDomainReplicationMessageBatchSize,
×
1021
        )
×
1022
        if err != nil {
×
1023
                return nil, adh.error(err, scope)
×
1024
        }
×
1025

1026
        lastProcessedMessageID := defaultLastMessageID
×
1027
        if request.LastProcessedMessageID != nil {
×
1028
                lastProcessedMessageID = request.GetLastProcessedMessageID()
×
1029
        }
×
1030
        if err := adh.GetDomainReplicationQueue().UpdateAckLevel(ctx, lastProcessedMessageID, request.GetClusterName()); err != nil {
×
1031
                adh.GetLogger().Warn("Failed to update domain replication queue ack level.",
×
1032
                        tag.TaskID(int64(lastProcessedMessageID)),
×
1033
                        tag.ClusterName(request.GetClusterName()))
×
1034
        }
×
1035

1036
        return &types.GetDomainReplicationMessagesResponse{
×
1037
                Messages: &types.ReplicationMessages{
×
1038
                        ReplicationTasks:       replicationTasks,
×
1039
                        LastRetrievedMessageID: lastMessageID,
×
1040
                },
×
1041
        }, nil
×
1042
}
1043

1044
// GetDLQReplicationMessages returns new replication tasks based on the dlq info.
1045
func (adh *adminHandlerImpl) GetDLQReplicationMessages(
1046
        ctx context.Context,
1047
        request *types.GetDLQReplicationMessagesRequest,
1048
) (resp *types.GetDLQReplicationMessagesResponse, err error) {
×
1049

×
1050
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
1051
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetDLQReplicationMessagesScope)
×
1052
        defer sw.Stop()
×
1053

×
1054
        if request == nil {
×
1055
                return nil, adh.error(errRequestNotSet, scope)
×
1056
        }
×
1057
        if len(request.GetTaskInfos()) == 0 {
×
1058
                return nil, adh.error(errEmptyReplicationInfo, scope)
×
1059
        }
×
1060

1061
        resp, err = adh.GetHistoryClient().GetDLQReplicationMessages(ctx, request)
×
1062
        if err != nil {
×
1063
                return nil, adh.error(err, scope)
×
1064
        }
×
1065
        return resp, nil
×
1066
}
1067

1068
// ReapplyEvents applies stale events to the current workflow and the current run
1069
func (adh *adminHandlerImpl) ReapplyEvents(
1070
        ctx context.Context,
1071
        request *types.ReapplyEventsRequest,
1072
) (err error) {
×
1073

×
1074
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
1075
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminReapplyEventsScope)
×
1076
        defer sw.Stop()
×
1077

×
1078
        if request == nil {
×
1079
                return adh.error(errRequestNotSet, scope)
×
1080
        }
×
1081
        if request.GetDomainName() == "" {
×
1082
                return adh.error(errDomainNotSet, scope)
×
1083
        }
×
1084
        if request.WorkflowExecution == nil {
×
1085
                return adh.error(errExecutionNotSet, scope)
×
1086
        }
×
1087
        if request.GetWorkflowExecution().GetWorkflowID() == "" {
×
1088
                return adh.error(errWorkflowIDNotSet, scope)
×
1089
        }
×
1090
        if request.GetEvents() == nil {
×
1091
                return adh.error(errWorkflowIDNotSet, scope)
×
1092
        }
×
1093
        domainEntry, err := adh.GetDomainCache().GetDomain(request.GetDomainName())
×
1094
        if err != nil {
×
1095
                return adh.error(err, scope)
×
1096
        }
×
1097

1098
        err = adh.GetHistoryClient().ReapplyEvents(ctx, &types.HistoryReapplyEventsRequest{
×
1099
                DomainUUID: domainEntry.GetInfo().ID,
×
1100
                Request:    request,
×
1101
        })
×
1102
        if err != nil {
×
1103
                return adh.error(err, scope)
×
1104
        }
×
1105
        return nil
×
1106
}
1107

1108
// ReadDLQMessages reads messages from DLQ
1109
func (adh *adminHandlerImpl) ReadDLQMessages(
1110
        ctx context.Context,
1111
        request *types.ReadDLQMessagesRequest,
1112
) (resp *types.ReadDLQMessagesResponse, err error) {
×
1113

×
1114
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
1115
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminReadDLQMessagesScope)
×
1116
        defer sw.Stop()
×
1117

×
1118
        if request == nil {
×
1119
                return nil, adh.error(errRequestNotSet, scope)
×
1120
        }
×
1121

1122
        if request.Type == nil {
×
1123
                return nil, adh.error(errEmptyQueueType, scope)
×
1124
        }
×
1125

1126
        if request.GetMaximumPageSize() <= 0 {
×
1127
                request.MaximumPageSize = common.ReadDLQMessagesPageSize
×
1128
        }
×
1129

1130
        if request.InclusiveEndMessageID == nil {
×
1131
                request.InclusiveEndMessageID = common.Int64Ptr(common.EndMessageID)
×
1132
        }
×
1133

1134
        var tasks []*types.ReplicationTask
×
1135
        var token []byte
×
1136
        var op func() error
×
1137
        switch request.GetType() {
×
1138
        case types.DLQTypeReplication:
×
1139
                return adh.GetHistoryClient().ReadDLQMessages(ctx, request)
×
1140
        case types.DLQTypeDomain:
×
1141
                op = func() error {
×
1142
                        select {
×
1143
                        case <-ctx.Done():
×
1144
                                return ctx.Err()
×
1145
                        default:
×
1146
                                var err error
×
1147
                                tasks, token, err = adh.domainDLQHandler.Read(
×
1148
                                        ctx,
×
1149
                                        request.GetInclusiveEndMessageID(),
×
1150
                                        int(request.GetMaximumPageSize()),
×
1151
                                        request.GetNextPageToken())
×
1152
                                return err
×
1153
                        }
1154
                }
1155
        default:
×
1156
                return nil, &types.BadRequestError{Message: "The DLQ type is not supported."}
×
1157
        }
1158
        err = adh.throttleRetry.Do(ctx, op)
×
1159
        if err != nil {
×
1160
                return nil, adh.error(err, scope)
×
1161
        }
×
1162

1163
        return &types.ReadDLQMessagesResponse{
×
1164
                ReplicationTasks: tasks,
×
1165
                NextPageToken:    token,
×
1166
        }, nil
×
1167
}
1168

1169
// PurgeDLQMessages purge messages from DLQ
1170
func (adh *adminHandlerImpl) PurgeDLQMessages(
1171
        ctx context.Context,
1172
        request *types.PurgeDLQMessagesRequest,
1173
) (err error) {
×
1174

×
1175
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
1176
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminPurgeDLQMessagesScope)
×
1177
        defer sw.Stop()
×
1178

×
1179
        if request == nil {
×
1180
                return adh.error(errRequestNotSet, scope)
×
1181
        }
×
1182

1183
        if request.Type == nil {
×
1184
                return adh.error(errEmptyQueueType, scope)
×
1185
        }
×
1186

1187
        if request.InclusiveEndMessageID == nil {
×
1188
                request.InclusiveEndMessageID = common.Int64Ptr(endMessageID)
×
1189
        }
×
1190

1191
        var op func() error
×
1192
        switch request.GetType() {
×
1193
        case types.DLQTypeReplication:
×
1194
                return adh.GetHistoryClient().PurgeDLQMessages(ctx, request)
×
1195
        case types.DLQTypeDomain:
×
1196
                op = func() error {
×
1197
                        select {
×
1198
                        case <-ctx.Done():
×
1199
                                return ctx.Err()
×
1200
                        default:
×
1201
                                return adh.domainDLQHandler.Purge(
×
1202
                                        ctx,
×
1203
                                        request.GetInclusiveEndMessageID(),
×
1204
                                )
×
1205
                        }
1206
                }
1207
        default:
×
1208
                return &types.BadRequestError{Message: "The DLQ type is not supported."}
×
1209
        }
1210
        err = adh.throttleRetry.Do(ctx, op)
×
1211
        if err != nil {
×
1212
                return adh.error(err, scope)
×
1213
        }
×
1214

1215
        return nil
×
1216
}
1217

1218
func (adh *adminHandlerImpl) CountDLQMessages(
1219
        ctx context.Context,
1220
        request *types.CountDLQMessagesRequest,
1221
) (resp *types.CountDLQMessagesResponse, err error) {
×
1222
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
1223

1224
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminCountDLQMessagesScope)
×
1225
        defer sw.Stop()
×
1226

×
1227
        domain, err := adh.domainDLQHandler.Count(ctx, request.ForceFetch)
×
1228
        if err != nil {
×
1229
                return nil, adh.error(err, scope)
×
1230
        }
×
1231

1232
        history, err := adh.GetHistoryClient().CountDLQMessages(ctx, request)
×
1233
        if err != nil {
×
1234
                err = adh.error(err, scope)
×
1235
        }
×
1236

1237
        return &types.CountDLQMessagesResponse{
×
1238
                History: history.Entries,
×
1239
                Domain:  domain,
×
1240
        }, err
×
1241
}
1242

1243
// MergeDLQMessages merges DLQ messages
1244
func (adh *adminHandlerImpl) MergeDLQMessages(
1245
        ctx context.Context,
1246
        request *types.MergeDLQMessagesRequest,
1247
) (resp *types.MergeDLQMessagesResponse, err error) {
×
1248

×
1249
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
1250
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminMergeDLQMessagesScope)
×
1251
        defer sw.Stop()
×
1252

×
1253
        if request == nil {
×
1254
                return nil, adh.error(errRequestNotSet, scope)
×
1255
        }
×
1256

1257
        if request.Type == nil {
×
1258
                return nil, adh.error(errEmptyQueueType, scope)
×
1259
        }
×
1260

1261
        if request.InclusiveEndMessageID == nil {
×
1262
                request.InclusiveEndMessageID = common.Int64Ptr(endMessageID)
×
1263
        }
×
1264

1265
        var token []byte
×
1266
        var op func() error
×
1267
        switch request.GetType() {
×
1268
        case types.DLQTypeReplication:
×
1269
                return adh.GetHistoryClient().MergeDLQMessages(ctx, request)
×
1270
        case types.DLQTypeDomain:
×
1271

×
1272
                op = func() error {
×
1273
                        select {
×
1274
                        case <-ctx.Done():
×
1275
                                return ctx.Err()
×
1276
                        default:
×
1277
                                var err error
×
1278
                                token, err = adh.domainDLQHandler.Merge(
×
1279
                                        ctx,
×
1280
                                        request.GetInclusiveEndMessageID(),
×
1281
                                        int(request.GetMaximumPageSize()),
×
1282
                                        request.GetNextPageToken(),
×
1283
                                )
×
1284
                                return err
×
1285
                        }
1286
                }
1287
        default:
×
1288
                return nil, &types.BadRequestError{Message: "The DLQ type is not supported."}
×
1289
        }
1290
        err = adh.throttleRetry.Do(ctx, op)
×
1291
        if err != nil {
×
1292
                return nil, adh.error(err, scope)
×
1293
        }
×
1294

1295
        return &types.MergeDLQMessagesResponse{
×
1296
                NextPageToken: token,
×
1297
        }, nil
×
1298
}
1299

1300
// RefreshWorkflowTasks re-generates the workflow tasks
1301
func (adh *adminHandlerImpl) RefreshWorkflowTasks(
1302
        ctx context.Context,
1303
        request *types.RefreshWorkflowTasksRequest,
1304
) (err error) {
×
1305
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
1306
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminRefreshWorkflowTasksScope)
×
1307
        defer sw.Stop()
×
1308

×
1309
        if request == nil {
×
1310
                return adh.error(errRequestNotSet, scope)
×
1311
        }
×
1312
        if err := validateExecution(request.Execution); err != nil {
×
1313
                return adh.error(err, scope)
×
1314
        }
×
1315
        domainEntry, err := adh.GetDomainCache().GetDomain(request.GetDomain())
×
1316
        if err != nil {
×
1317
                return adh.error(err, scope)
×
1318
        }
×
1319

1320
        err = adh.GetHistoryClient().RefreshWorkflowTasks(ctx, &types.HistoryRefreshWorkflowTasksRequest{
×
1321
                DomainUIID: domainEntry.GetInfo().ID,
×
1322
                Request:    request,
×
1323
        })
×
1324
        if err != nil {
×
1325
                return adh.error(err, scope)
×
1326
        }
×
1327
        return nil
×
1328
}
1329

1330
// ResendReplicationTasks requests replication task from remote cluster
1331
func (adh *adminHandlerImpl) ResendReplicationTasks(
1332
        ctx context.Context,
1333
        request *types.ResendReplicationTasksRequest,
1334
) (err error) {
×
1335
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
1336
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminResendReplicationTasksScope)
×
1337
        defer sw.Stop()
×
1338

×
1339
        if request == nil {
×
1340
                return adh.error(errRequestNotSet, scope)
×
1341
        }
×
1342
        resender := ndc.NewHistoryResender(
×
1343
                adh.GetDomainCache(),
×
1344
                adh.GetRemoteAdminClient(request.GetRemoteCluster()),
×
1345
                func(ctx context.Context, request *types.ReplicateEventsV2Request) error {
×
1346
                        return adh.GetHistoryClient().ReplicateEventsV2(ctx, request)
×
1347
                },
×
1348
                nil,
1349
                nil,
1350
                adh.GetLogger(),
1351
        )
1352
        return resender.SendSingleWorkflowHistory(
×
1353
                request.DomainID,
×
1354
                request.GetWorkflowID(),
×
1355
                request.GetRunID(),
×
1356
                request.StartEventID,
×
1357
                request.StartVersion,
×
1358
                request.EndEventID,
×
1359
                request.EndVersion,
×
1360
        )
×
1361
}
1362

1363
func (adh *adminHandlerImpl) GetCrossClusterTasks(
1364
        ctx context.Context,
1365
        request *types.GetCrossClusterTasksRequest,
1366
) (resp *types.GetCrossClusterTasksResponse, err error) {
×
1367

×
1368
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
1369
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetCrossClusterTasksScope)
×
1370
        defer sw.Stop()
×
1371

×
1372
        if request == nil {
×
1373
                return nil, adh.error(errRequestNotSet, scope)
×
1374
        }
×
1375
        if request.TargetCluster == "" {
×
1376
                return nil, adh.error(errClusterNameNotSet, scope)
×
1377
        }
×
1378

1379
        resp, err = adh.GetHistoryRawClient().GetCrossClusterTasks(ctx, request)
×
1380
        if err != nil {
×
1381
                return nil, adh.error(err, scope)
×
1382
        }
×
1383
        return resp, nil
×
1384
}
1385

1386
func (adh *adminHandlerImpl) RespondCrossClusterTasksCompleted(
1387
        ctx context.Context,
1388
        request *types.RespondCrossClusterTasksCompletedRequest,
1389
) (resp *types.RespondCrossClusterTasksCompletedResponse, err error) {
×
1390

×
1391
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }()
×
1392
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminRespondCrossClusterTasksCompletedScope)
×
1393
        defer sw.Stop()
×
1394

×
1395
        if request == nil {
×
1396
                return nil, adh.error(errRequestNotSet, scope)
×
1397
        }
×
1398
        if request.TargetCluster == "" {
×
1399
                return nil, adh.error(errClusterNameNotSet, scope)
×
1400
        }
×
1401

1402
        resp, err = adh.GetHistoryClient().RespondCrossClusterTasksCompleted(ctx, request)
×
1403
        if err != nil {
×
1404
                return nil, adh.error(err, scope)
×
1405
        }
×
1406
        return resp, nil
×
1407
}
1408

1409
func (adh *adminHandlerImpl) validateGetWorkflowExecutionRawHistoryV2Request(
1410
        request *types.GetWorkflowExecutionRawHistoryV2Request,
1411
) error {
9✔
1412

9✔
1413
        execution := request.Execution
9✔
1414
        if len(execution.GetWorkflowID()) == 0 {
10✔
1415
                return &types.BadRequestError{Message: "Invalid WorkflowID."}
1✔
1416
        }
1✔
1417
        // TODO currently, this API is only going to be used by re-send history events
1418
        // to remote cluster if kafka is lossy again, in the future, this API can be used
1419
        // by CLI and client, then empty runID (meaning the current workflow) should be allowed
1420
        if len(execution.GetRunID()) == 0 || uuid.Parse(execution.GetRunID()) == nil {
9✔
1421
                return &types.BadRequestError{Message: "Invalid RunID."}
1✔
1422
        }
1✔
1423

1424
        pageSize := int(request.GetMaximumPageSize())
7✔
1425
        if pageSize <= 0 {
8✔
1426
                return &types.BadRequestError{Message: "Invalid PageSize."}
1✔
1427
        }
1✔
1428

1429
        if (request.StartEventID != nil && request.StartEventVersion == nil) ||
6✔
1430
                (request.StartEventID == nil && request.StartEventVersion != nil) {
6✔
1431
                return &types.BadRequestError{Message: "Invalid start event id and start event version combination."}
×
1432
        }
×
1433

1434
        if (request.EndEventID != nil && request.EndEventVersion == nil) ||
6✔
1435
                (request.EndEventID == nil && request.EndEventVersion != nil) {
6✔
1436
                return &types.BadRequestError{Message: "Invalid end event id and end event version combination."}
×
1437
        }
×
1438
        return nil
6✔
1439
}
1440

1441
func (adh *adminHandlerImpl) validateConfigForAdvanceVisibility() error {
3✔
1442
        if adh.params.ESConfig == nil || adh.params.ESClient == nil {
3✔
1443
                return errors.New("ES related config not found")
×
1444
        }
×
1445
        return nil
3✔
1446
}
1447

1448
func (adh *adminHandlerImpl) setRequestDefaultValueAndGetTargetVersionHistory(
1449
        request *types.GetWorkflowExecutionRawHistoryV2Request,
1450
        versionHistories *persistence.VersionHistories,
1451
) (*persistence.VersionHistory, error) {
9✔
1452

9✔
1453
        targetBranch, err := versionHistories.GetCurrentVersionHistory()
9✔
1454
        if err != nil {
9✔
1455
                return nil, err
×
1456
        }
×
1457
        firstItem, err := targetBranch.GetFirstItem()
9✔
1458
        if err != nil {
9✔
1459
                return nil, err
×
1460
        }
×
1461
        lastItem, err := targetBranch.GetLastItem()
9✔
1462
        if err != nil {
9✔
1463
                return nil, err
×
1464
        }
×
1465

1466
        if request.StartEventID == nil || request.StartEventVersion == nil {
13✔
1467
                // If start event is not set, get the events from the first event
4✔
1468
                // As the API is exclusive-exclusive, use first event id - 1 here
4✔
1469
                request.StartEventID = common.Int64Ptr(common.FirstEventID - 1)
4✔
1470
                request.StartEventVersion = common.Int64Ptr(firstItem.Version)
4✔
1471
        }
4✔
1472
        if request.EndEventID == nil || request.EndEventVersion == nil {
14✔
1473
                // If end event is not set, get the events until the end event
5✔
1474
                // As the API is exclusive-exclusive, use end event id + 1 here
5✔
1475
                request.EndEventID = common.Int64Ptr(lastItem.EventID + 1)
5✔
1476
                request.EndEventVersion = common.Int64Ptr(lastItem.Version)
5✔
1477
        }
5✔
1478

1479
        if request.GetStartEventID() < 0 {
9✔
1480
                return nil, &types.BadRequestError{Message: "Invalid FirstEventID && NextEventID combination."}
×
1481
        }
×
1482

1483
        // get branch based on the end event if end event is defined in the request
1484
        if request.GetEndEventID() == lastItem.EventID+1 &&
9✔
1485
                request.GetEndEventVersion() == lastItem.Version {
14✔
1486
                // this is a special case, target branch remains the same
5✔
1487
        } else {
12✔
1488
                endItem := persistence.NewVersionHistoryItem(request.GetEndEventID(), request.GetEndEventVersion())
7✔
1489
                _, targetBranch, err = versionHistories.FindFirstVersionHistoryByItem(endItem)
7✔
1490
                if err != nil {
7✔
1491
                        return nil, err
×
1492
                }
×
1493
        }
1494

1495
        startItem := persistence.NewVersionHistoryItem(request.GetStartEventID(), request.GetStartEventVersion())
9✔
1496
        // If the request start event is defined. The start event may be on a different branch as current branch.
9✔
1497
        // We need to find the LCA of the start event and the current branch.
9✔
1498
        if request.GetStartEventID() == common.FirstEventID-1 &&
9✔
1499
                request.GetStartEventVersion() == firstItem.Version {
13✔
1500
                // this is a special case, start event is on the same branch as target branch
4✔
1501
        } else {
12✔
1502
                if !targetBranch.ContainsItem(startItem) {
12✔
1503
                        _, startBranch, err := versionHistories.FindFirstVersionHistoryByItem(startItem)
4✔
1504
                        if err != nil {
4✔
1505
                                return nil, err
×
1506
                        }
×
1507
                        startItem, err = targetBranch.FindLCAItem(startBranch)
4✔
1508
                        if err != nil {
4✔
1509
                                return nil, err
×
1510
                        }
×
1511
                        request.StartEventID = common.Int64Ptr(startItem.EventID)
4✔
1512
                        request.StartEventVersion = common.Int64Ptr(startItem.Version)
4✔
1513
                }
1514
        }
1515

1516
        return targetBranch, nil
9✔
1517
}
1518

1519
func (adh *adminHandlerImpl) generatePaginationToken(
1520
        request *types.GetWorkflowExecutionRawHistoryV2Request,
1521
        versionHistories *persistence.VersionHistories,
1522
) *getWorkflowRawHistoryV2Token {
5✔
1523

5✔
1524
        execution := request.Execution
5✔
1525
        return &getWorkflowRawHistoryV2Token{
5✔
1526
                DomainName:        request.GetDomain(),
5✔
1527
                WorkflowID:        execution.GetWorkflowID(),
5✔
1528
                RunID:             execution.GetRunID(),
5✔
1529
                StartEventID:      request.GetStartEventID(),
5✔
1530
                StartEventVersion: request.GetStartEventVersion(),
5✔
1531
                EndEventID:        request.GetEndEventID(),
5✔
1532
                EndEventVersion:   request.GetEndEventVersion(),
5✔
1533
                VersionHistories:  versionHistories.ToInternalType(),
5✔
1534
                PersistenceToken:  nil, // this is the initialized value
5✔
1535
        }
5✔
1536
}
5✔
1537

1538
func (adh *adminHandlerImpl) validatePaginationToken(
1539
        request *types.GetWorkflowExecutionRawHistoryV2Request,
1540
        token *getWorkflowRawHistoryV2Token,
1541
) error {
5✔
1542

5✔
1543
        execution := request.Execution
5✔
1544
        if request.GetDomain() != token.DomainName ||
5✔
1545
                execution.GetWorkflowID() != token.WorkflowID ||
5✔
1546
                execution.GetRunID() != token.RunID ||
5✔
1547
                request.GetStartEventID() != token.StartEventID ||
5✔
1548
                request.GetStartEventVersion() != token.StartEventVersion ||
5✔
1549
                request.GetEndEventID() != token.EndEventID ||
5✔
1550
                request.GetEndEventVersion() != token.EndEventVersion {
5✔
1551
                return &types.BadRequestError{Message: "Invalid pagination token."}
×
1552
        }
×
1553
        return nil
5✔
1554
}
1555

1556
// startRequestProfile initiates recording of request metrics
1557
func (adh *adminHandlerImpl) startRequestProfile(ctx context.Context, scope int) (metrics.Scope, metrics.Stopwatch) {
41✔
1558
        metricsScope := adh.GetMetricsClient().Scope(scope).Tagged(metrics.DomainUnknownTag()).Tagged(metrics.GetContextTags(ctx)...)
41✔
1559
        sw := metricsScope.StartTimer(metrics.CadenceLatency)
41✔
1560
        metricsScope.IncCounter(metrics.CadenceRequests)
41✔
1561
        return metricsScope, sw
41✔
1562
}
41✔
1563

1564
func (adh *adminHandlerImpl) error(err error, scope metrics.Scope) error {
21✔
1565
        switch err.(type) {
21✔
1566
        case *types.InternalServiceError:
1✔
1567
                adh.GetLogger().Error("Internal service error", tag.Error(err))
1✔
1568
                scope.IncCounter(metrics.CadenceFailures)
1✔
1569
                return err
1✔
1570
        case *types.BadRequestError:
14✔
1571
                scope.IncCounter(metrics.CadenceErrBadRequestCounter)
14✔
1572
                return err
14✔
1573
        case *types.ServiceBusyError:
×
1574
                scope.IncCounter(metrics.CadenceErrServiceBusyCounter)
×
1575
                return err
×
1576
        case *types.EntityNotExistsError:
×
1577
                return err
×
1578
        default:
6✔
1579
                adh.GetLogger().Error("Uncategorized error", tag.Error(err))
6✔
1580
                scope.IncCounter(metrics.CadenceFailures)
6✔
1581
                return &types.InternalServiceError{Message: err.Error()}
6✔
1582
        }
1583
}
1584

1585
func convertIndexedValueTypeToESDataType(valueType types.IndexedValueType) string {
10✔
1586
        switch valueType {
10✔
1587
        case types.IndexedValueTypeString:
1✔
1588
                return "text"
1✔
1589
        case types.IndexedValueTypeKeyword:
2✔
1590
                return "keyword"
2✔
1591
        case types.IndexedValueTypeInt:
1✔
1592
                return "long"
1✔
1593
        case types.IndexedValueTypeDouble:
1✔
1594
                return "double"
1✔
1595
        case types.IndexedValueTypeBool:
1✔
1596
                return "boolean"
1✔
1597
        case types.IndexedValueTypeDatetime:
1✔
1598
                return "date"
1✔
1599
        default:
3✔
1600
                return ""
3✔
1601
        }
1602
}
1603

1604
func serializeRawHistoryToken(token *getWorkflowRawHistoryV2Token) ([]byte, error) {
3✔
1605
        if token == nil {
3✔
1606
                return nil, nil
×
1607
        }
×
1608

1609
        bytes, err := json.Marshal(token)
3✔
1610
        return bytes, err
3✔
1611
}
1612

1613
func deserializeRawHistoryToken(bytes []byte) (*getWorkflowRawHistoryV2Token, error) {
3✔
1614
        token := &getWorkflowRawHistoryV2Token{}
3✔
1615
        err := json.Unmarshal(bytes, token)
3✔
1616
        return token, err
3✔
1617
}
3✔
1618

1619
func (adh *adminHandlerImpl) GetDynamicConfig(ctx context.Context, request *types.GetDynamicConfigRequest) (_ *types.GetDynamicConfigResponse, retError error) {
4✔
1620
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
8✔
1621
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetDynamicConfigScope)
4✔
1622
        defer sw.Stop()
4✔
1623

4✔
1624
        if request == nil || request.ConfigName == "" {
5✔
1625
                return nil, adh.error(errRequestNotSet, scope)
1✔
1626
        }
1✔
1627

1628
        keyVal, err := dc.GetKeyFromKeyName(request.ConfigName)
3✔
1629
        if err != nil {
4✔
1630
                return nil, adh.error(err, scope)
1✔
1631
        }
1✔
1632

1633
        var value interface{}
2✔
1634
        if request.Filters == nil {
3✔
1635
                value, err = adh.params.DynamicConfig.GetValue(keyVal)
1✔
1636
                if err != nil {
1✔
1637
                        return nil, adh.error(err, scope)
×
1638
                }
×
1639
        } else {
1✔
1640
                convFilters, err := convertFilterListToMap(request.Filters)
1✔
1641
                if err != nil {
1✔
1642
                        return nil, adh.error(err, scope)
×
1643
                }
×
1644
                value, err = adh.params.DynamicConfig.GetValueWithFilters(keyVal, convFilters)
1✔
1645
                if err != nil {
1✔
1646
                        return nil, adh.error(err, scope)
×
1647
                }
×
1648
        }
1649

1650
        data, err := json.Marshal(value)
2✔
1651
        if err != nil {
2✔
1652
                return nil, adh.error(err, scope)
×
1653
        }
×
1654

1655
        return &types.GetDynamicConfigResponse{
2✔
1656
                Value: &types.DataBlob{
2✔
1657
                        EncodingType: types.EncodingTypeJSON.Ptr(),
2✔
1658
                        Data:         data,
2✔
1659
                },
2✔
1660
        }, nil
2✔
1661
}
1662

1663
func (adh *adminHandlerImpl) UpdateDynamicConfig(ctx context.Context, request *types.UpdateDynamicConfigRequest) (retError error) {
2✔
1664
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
4✔
1665
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminUpdateDynamicConfigScope)
2✔
1666
        defer sw.Stop()
2✔
1667

2✔
1668
        if request == nil || request.ConfigName == "" {
3✔
1669
                return adh.error(errRequestNotSet, scope)
1✔
1670
        }
1✔
1671

1672
        keyVal, err := dc.GetKeyFromKeyName(request.ConfigName)
1✔
1673
        if err != nil {
2✔
1674
                return adh.error(err, scope)
1✔
1675
        }
1✔
1676

1677
        return adh.params.DynamicConfig.UpdateValue(keyVal, request.ConfigValues)
×
1678
}
1679

1680
func (adh *adminHandlerImpl) RestoreDynamicConfig(ctx context.Context, request *types.RestoreDynamicConfigRequest) (retError error) {
2✔
1681
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
4✔
1682
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminRestoreDynamicConfigScope)
2✔
1683
        defer sw.Stop()
2✔
1684

2✔
1685
        if request == nil || request.ConfigName == "" {
3✔
1686
                return adh.error(errRequestNotSet, scope)
1✔
1687
        }
1✔
1688

1689
        keyVal, err := dc.GetKeyFromKeyName(request.ConfigName)
1✔
1690
        if err != nil {
2✔
1691
                return adh.error(err, scope)
1✔
1692
        }
1✔
1693

1694
        var filters map[dc.Filter]interface{}
×
1695

×
1696
        if request.Filters == nil {
×
1697
                filters = nil
×
1698
        } else {
×
1699
                filters, err = convertFilterListToMap(request.Filters)
×
1700
                if err != nil {
×
1701
                        return adh.error(errInvalidFilters, scope)
×
1702
                }
×
1703
        }
1704
        return adh.params.DynamicConfig.RestoreValue(keyVal, filters)
×
1705
}
1706

1707
func (adh *adminHandlerImpl) ListDynamicConfig(ctx context.Context, request *types.ListDynamicConfigRequest) (_ *types.ListDynamicConfigResponse, retError error) {
×
1708
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
×
1709
        scope, sw := adh.startRequestProfile(ctx, metrics.AdminListDynamicConfigScope)
×
1710
        defer sw.Stop()
×
1711

×
1712
        if request == nil {
×
1713
                return nil, adh.error(errRequestNotSet, scope)
×
1714
        }
×
1715

1716
        keyVal, err := dc.GetKeyFromKeyName(request.ConfigName)
×
1717
        if err != nil || request.ConfigName == "" {
×
1718
                entries, err2 := adh.params.DynamicConfig.ListValue(nil)
×
1719
                if err2 != nil {
×
1720
                        return nil, adh.error(err2, scope)
×
1721
                }
×
1722
                return &types.ListDynamicConfigResponse{
×
1723
                        Entries: entries,
×
1724
                }, nil
×
1725
        }
1726

1727
        entries, err2 := adh.params.DynamicConfig.ListValue(keyVal)
×
1728
        if err2 != nil {
×
1729
                err = adh.error(err2, scope)
×
1730
                return nil, adh.error(err, scope)
×
1731
        }
×
1732

1733
        return &types.ListDynamicConfigResponse{
×
1734
                Entries: entries,
×
1735
        }, nil
×
1736
}
1737

1738
func (adh *adminHandlerImpl) GetGlobalIsolationGroups(ctx context.Context, request *types.GetGlobalIsolationGroupsRequest) (_ *types.GetGlobalIsolationGroupsResponse, retError error) {
3✔
1739
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
6✔
1740
        scope, sw := adh.startRequestProfile(ctx, metrics.GetGlobalIsolationGroups)
3✔
1741
        defer sw.Stop()
3✔
1742

3✔
1743
        if request == nil {
3✔
1744
                return nil, adh.error(errRequestNotSet, scope)
×
1745
        }
×
1746
        if adh.isolationGroups == nil {
4✔
1747
                return nil, adh.error(types.BadRequestError{Message: "isolation groups are not enabled in this cluster"}, scope)
1✔
1748
        }
1✔
1749

1750
        return adh.isolationGroups.GetGlobalState(ctx)
2✔
1751
}
1752

1753
func (adh *adminHandlerImpl) UpdateGlobalIsolationGroups(ctx context.Context, request *types.UpdateGlobalIsolationGroupsRequest) (_ *types.UpdateGlobalIsolationGroupsResponse, retError error) {
3✔
1754
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
6✔
1755
        scope, sw := adh.startRequestProfile(ctx, metrics.UpdateGlobalIsolationGroups)
3✔
1756
        defer sw.Stop()
3✔
1757
        if request == nil {
3✔
1758
                return nil, adh.error(errRequestNotSet, scope)
×
1759
        }
×
1760
        if adh.isolationGroups == nil {
4✔
1761
                return nil, adh.error(types.BadRequestError{Message: "isolation groups are not enabled in this cluster"}, scope)
1✔
1762
        }
1✔
1763
        err := adh.isolationGroups.UpdateGlobalState(ctx, *request)
2✔
1764
        if err != nil {
3✔
1765
                return nil, err
1✔
1766
        }
1✔
1767
        return &types.UpdateGlobalIsolationGroupsResponse{}, nil
1✔
1768
}
1769

1770
func (adh *adminHandlerImpl) GetDomainIsolationGroups(ctx context.Context, request *types.GetDomainIsolationGroupsRequest) (_ *types.GetDomainIsolationGroupsResponse, retError error) {
2✔
1771
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
4✔
1772
        scope, sw := adh.startRequestProfile(ctx, metrics.GetDomainIsolationGroups)
2✔
1773
        defer sw.Stop()
2✔
1774

2✔
1775
        if request == nil {
2✔
1776
                return nil, adh.error(errRequestNotSet, scope)
×
1777
        }
×
1778
        if adh.isolationGroups == nil {
2✔
1779
                return nil, adh.error(types.BadRequestError{Message: "isolation groups are not enabled in this cluster"}, scope)
×
1780
        }
×
1781

1782
        return adh.isolationGroups.GetDomainState(ctx, types.GetDomainIsolationGroupsRequest{Domain: request.Domain})
2✔
1783
}
1784

1785
func (adh *adminHandlerImpl) UpdateDomainIsolationGroups(ctx context.Context, request *types.UpdateDomainIsolationGroupsRequest) (_ *types.UpdateDomainIsolationGroupsResponse, retError error) {
2✔
1786
        defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }()
4✔
1787
        scope, sw := adh.startRequestProfile(ctx, metrics.UpdateDomainIsolationGroups)
2✔
1788
        defer sw.Stop()
2✔
1789
        if request == nil {
2✔
1790
                return nil, adh.error(errRequestNotSet, scope)
×
1791
        }
×
1792
        if adh.isolationGroups == nil {
2✔
1793
                return nil, adh.error(types.BadRequestError{Message: "isolation groups are not enabled in this cluster"}, scope)
×
1794
        }
×
1795
        err := adh.isolationGroups.UpdateDomainState(ctx, *request)
2✔
1796
        if err != nil {
3✔
1797
                return nil, err
1✔
1798
        }
1✔
1799
        return &types.UpdateDomainIsolationGroupsResponse{}, nil
1✔
1800
}
1801

1802
func convertFromDataBlob(blob *types.DataBlob) (interface{}, error) {
1✔
1803
        switch *blob.EncodingType {
1✔
1804
        case types.EncodingTypeJSON:
1✔
1805
                var v interface{}
1✔
1806
                err := json.Unmarshal(blob.Data, &v)
1✔
1807
                return v, err
1✔
1808
        default:
×
1809
                return nil, errors.New("unsupported blob encoding")
×
1810
        }
1811
}
1812

1813
func convertFilterListToMap(filters []*types.DynamicConfigFilter) (map[dc.Filter]interface{}, error) {
1✔
1814
        newFilters := make(map[dc.Filter]interface{})
1✔
1815

1✔
1816
        for _, filter := range filters {
2✔
1817
                val, err := convertFromDataBlob(filter.Value)
1✔
1818
                if err != nil {
1✔
1819
                        return nil, err
×
1820
                }
×
1821
                newFilters[dc.ParseFilter(filter.Name)] = val
1✔
1822
        }
1823
        return newFilters, nil
1✔
1824
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc