• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0186ed8c-939f-4cc5-9459-fac2c80653f9

17 Mar 2023 03:30AM UTC coverage: 57.113% (-0.005%) from 57.118%
0186ed8c-939f-4cc5-9459-fac2c80653f9

push

buildkite

GitHub
[history] more cautious in deciding domain state to make decisions on dropping queued tasks (#5164)

1 of 1 new or added line in 1 file covered. (100.0%)

85268 of 149297 relevant lines covered (57.11%)

2300.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.97
/service/frontend/workflowHandler.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package frontend
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "errors"
27
        "fmt"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/pborman/uuid"
32
        "go.uber.org/yarpc"
33
        "go.uber.org/yarpc/yarpcerrors"
34
        "golang.org/x/sync/errgroup"
35

36
        "github.com/uber/cadence/common"
37
        "github.com/uber/cadence/common/archiver"
38
        "github.com/uber/cadence/common/backoff"
39
        "github.com/uber/cadence/common/cache"
40
        "github.com/uber/cadence/common/client"
41
        "github.com/uber/cadence/common/domain"
42
        "github.com/uber/cadence/common/elasticsearch/validator"
43
        "github.com/uber/cadence/common/log"
44
        "github.com/uber/cadence/common/log/tag"
45
        "github.com/uber/cadence/common/messaging"
46
        "github.com/uber/cadence/common/metrics"
47
        "github.com/uber/cadence/common/persistence"
48
        persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
49
        "github.com/uber/cadence/common/quotas"
50
        "github.com/uber/cadence/common/resource"
51
        "github.com/uber/cadence/common/service"
52
        "github.com/uber/cadence/common/types"
53
)
54

55
const (
56
        getDomainReplicationMessageBatchSize = 100
57
        defaultLastMessageID                 = int64(-1)
58
)
59

60
const (
61
        // HealthStatusOK is used when this node is healthy and rpc requests are allowed
62
        HealthStatusOK HealthStatus = iota + 1
63
        // HealthStatusWarmingUp is used when the rpc handler is warming up
64
        HealthStatusWarmingUp
65
        // HealthStatusShuttingDown is used when the rpc handler is shutting down
66
        HealthStatusShuttingDown
67
)
68

69
var _ Handler = (*WorkflowHandler)(nil)
70

71
// ratelimitType differentiates between the three categories of ratelimiters
72
type ratelimitType int
73

74
const (
75
        ratelimitTypeUser ratelimitType = iota + 1
76
        ratelimitTypeWorker
77
        ratelimitTypeVisibility
78
)
79

80
type (
81
        // WorkflowHandler - Thrift handler interface for workflow service
82
        WorkflowHandler struct {
83
                resource.Resource
84

85
                shuttingDown              int32
86
                healthStatus              int32
87
                tokenSerializer           common.TaskTokenSerializer
88
                userRateLimiter           quotas.Policy
89
                workerRateLimiter         quotas.Policy
90
                visibilityRateLimiter     quotas.Policy
91
                config                    *Config
92
                versionChecker            client.VersionChecker
93
                domainHandler             domain.Handler
94
                visibilityQueryValidator  *validator.VisibilityQueryValidator
95
                searchAttributesValidator *validator.SearchAttributesValidator
96
                throttleRetry             *backoff.ThrottleRetry
97
        }
98

99
        getHistoryContinuationToken struct {
100
                RunID             string
101
                FirstEventID      int64
102
                NextEventID       int64
103
                IsWorkflowRunning bool
104
                PersistenceToken  []byte
105
                TransientDecision *types.TransientDecisionInfo
106
                BranchToken       []byte
107
        }
108

109
        domainGetter interface {
110
                GetDomain() string
111
        }
112

113
        // HealthStatus is an enum that refers to the rpc handler health status
114
        HealthStatus int32
115
)
116

117
var (
118
        errDomainNotSet                               = &types.BadRequestError{Message: "Domain not set on request."}
119
        errTaskTokenNotSet                            = &types.BadRequestError{Message: "Task token not set on request."}
120
        errInvalidTaskToken                           = &types.BadRequestError{Message: "Invalid TaskToken."}
121
        errTaskListNotSet                             = &types.BadRequestError{Message: "TaskList is not set on request."}
122
        errTaskListTypeNotSet                         = &types.BadRequestError{Message: "TaskListType is not set on request."}
123
        errExecutionNotSet                            = &types.BadRequestError{Message: "Execution is not set on request."}
124
        errWorkflowIDNotSet                           = &types.BadRequestError{Message: "WorkflowId is not set on request."}
125
        errActivityIDNotSet                           = &types.BadRequestError{Message: "ActivityID is not set on request."}
126
        errSignalNameNotSet                           = &types.BadRequestError{Message: "SignalName is not set on request."}
127
        errInvalidRunID                               = &types.BadRequestError{Message: "Invalid RunId."}
128
        errInvalidNextPageToken                       = &types.BadRequestError{Message: "Invalid NextPageToken."}
129
        errNextPageTokenRunIDMismatch                 = &types.BadRequestError{Message: "RunID in the request does not match the NextPageToken."}
130
        errQueryNotSet                                = &types.BadRequestError{Message: "WorkflowQuery is not set on request."}
131
        errQueryTypeNotSet                            = &types.BadRequestError{Message: "QueryType is not set on request."}
132
        errRequestNotSet                              = &types.BadRequestError{Message: "Request is nil."}
133
        errNoPermission                               = &types.BadRequestError{Message: "No permission to do this operation."}
134
        errRequestIDNotSet                            = &types.BadRequestError{Message: "RequestId is not set on request."}
135
        errWorkflowTypeNotSet                         = &types.BadRequestError{Message: "WorkflowType is not set on request."}
136
        errInvalidRetention                           = &types.BadRequestError{Message: "RetentionDays is invalid."}
137
        errInvalidExecutionStartToCloseTimeoutSeconds = &types.BadRequestError{Message: "A valid ExecutionStartToCloseTimeoutSeconds is not set on request."}
138
        errInvalidTaskStartToCloseTimeoutSeconds      = &types.BadRequestError{Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}
139
        errInvalidDelayStartSeconds                   = &types.BadRequestError{Message: "A valid DelayStartSeconds is not set on request."}
140
        errInvalidJitterStartSeconds                  = &types.BadRequestError{Message: "A valid JitterStartSeconds is not set on request (negative)."}
141
        errInvalidJitterStartSeconds2                 = &types.BadRequestError{Message: "A valid JitterStartSeconds is not set on request (larger than cron duration)."}
142
        errQueryDisallowedForDomain                   = &types.BadRequestError{Message: "Domain is not allowed to query, please contact cadence team to re-enable queries."}
143
        errClusterNameNotSet                          = &types.BadRequestError{Message: "Cluster name is not set."}
144
        errEmptyReplicationInfo                       = &types.BadRequestError{Message: "Replication task info is not set."}
145
        errEmptyQueueType                             = &types.BadRequestError{Message: "Queue type is not set."}
146
        errDomainInLockdown                           = &types.BadRequestError{Message: "Domain is not accepting fail overs at this time due to lockdown."}
147
        errShuttingDown                               = &types.InternalServiceError{Message: "Shutting down"}
148

149
        // err for archival
150
        errHistoryNotFound = &types.BadRequestError{Message: "Requested workflow history not found, may have passed retention period."}
151

152
        // err for string too long
153
        errDomainTooLong       = &types.BadRequestError{Message: "Domain length exceeds limit."}
154
        errWorkflowTypeTooLong = &types.BadRequestError{Message: "WorkflowType length exceeds limit."}
155
        errWorkflowIDTooLong   = &types.BadRequestError{Message: "WorkflowID length exceeds limit."}
156
        errSignalNameTooLong   = &types.BadRequestError{Message: "SignalName length exceeds limit."}
157
        errTaskListTooLong     = &types.BadRequestError{Message: "TaskList length exceeds limit."}
158
        errRequestIDTooLong    = &types.BadRequestError{Message: "RequestID length exceeds limit."}
159
        errIdentityTooLong     = &types.BadRequestError{Message: "Identity length exceeds limit."}
160

161
        frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
162
)
163

164
// NewWorkflowHandler creates a thrift handler for the cadence service
165
func NewWorkflowHandler(
166
        resource resource.Resource,
167
        config *Config,
168
        replicationMessageSink messaging.Producer,
169
        versionChecker client.VersionChecker,
170
) *WorkflowHandler {
98✔
171
        return &WorkflowHandler{
98✔
172
                Resource:        resource,
98✔
173
                config:          config,
98✔
174
                healthStatus:    int32(HealthStatusWarmingUp),
98✔
175
                tokenSerializer: common.NewJSONTaskTokenSerializer(),
98✔
176
                userRateLimiter: quotas.NewMultiStageRateLimiter(
98✔
177
                        quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()),
98✔
178
                        quotas.NewCollection(func(domain string) quotas.Limiter {
134✔
179
                                return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic(
36✔
180
                                        service.Frontend,
36✔
181
                                        config.GlobalDomainUserRPS.AsFloat64(domain),
36✔
182
                                        config.MaxDomainUserRPSPerInstance.AsFloat64(domain),
36✔
183
                                        resource.GetMembershipResolver(),
36✔
184
                                ))
36✔
185
                        }),
36✔
186
                ),
187
                workerRateLimiter: quotas.NewMultiStageRateLimiter(
188
                        quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()),
189
                        quotas.NewCollection(func(domain string) quotas.Limiter {
25✔
190
                                return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic(
25✔
191
                                        service.Frontend,
25✔
192
                                        config.GlobalDomainWorkerRPS.AsFloat64(domain),
25✔
193
                                        config.MaxDomainWorkerRPSPerInstance.AsFloat64(domain),
25✔
194
                                        resource.GetMembershipResolver(),
25✔
195
                                ))
25✔
196
                        }),
25✔
197
                ),
198
                visibilityRateLimiter: quotas.NewMultiStageRateLimiter(
199
                        quotas.NewDynamicRateLimiter(config.VisibilityRPS.AsFloat64()),
200
                        quotas.NewCollection(func(domain string) quotas.Limiter {
20✔
201
                                return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic(
20✔
202
                                        service.Frontend,
20✔
203
                                        config.GlobalDomainVisibilityRPS.AsFloat64(domain),
20✔
204
                                        config.MaxDomainVisibilityRPSPerInstance.AsFloat64(domain),
20✔
205
                                        resource.GetMembershipResolver(),
20✔
206
                                ))
20✔
207
                        }),
20✔
208
                ),
209
                versionChecker: versionChecker,
210
                domainHandler: domain.NewHandler(
211
                        config.domainConfig,
212
                        resource.GetLogger(),
213
                        resource.GetDomainManager(),
214
                        resource.GetClusterMetadata(),
215
                        domain.NewDomainReplicator(replicationMessageSink, resource.GetLogger()),
216
                        resource.GetArchivalMetadata(),
217
                        resource.GetArchiverProvider(),
218
                        resource.GetTimeSource(),
219
                ),
220
                visibilityQueryValidator: validator.NewQueryValidator(
221
                        config.ValidSearchAttributes,
222
                        config.EnableQueryAttributeValidation,
223
                ),
224
                searchAttributesValidator: validator.NewSearchAttributesValidator(
225
                        resource.GetLogger(),
226
                        config.EnableQueryAttributeValidation,
227
                        config.ValidSearchAttributes,
228
                        config.SearchAttributesNumberOfKeysLimit,
229
                        config.SearchAttributesSizeOfValueLimit,
230
                        config.SearchAttributesTotalSizeLimit,
231
                ),
232
                throttleRetry: backoff.NewThrottleRetry(
233
                        backoff.WithRetryPolicy(frontendServiceRetryPolicy),
234
                        backoff.WithRetryableError(common.IsServiceTransientError),
235
                ),
236
        }
237
}
238

239
// Start starts the handler
240
func (wh *WorkflowHandler) Start() {
15✔
241
        // TODO: Get warmup duration from config. Even better, run proactive checks such as probing downstream connections.
15✔
242
        const warmUpDuration = 30 * time.Second
15✔
243

15✔
244
        warmupTimer := time.NewTimer(warmUpDuration)
15✔
245
        go func() {
30✔
246
                <-warmupTimer.C
15✔
247
                wh.GetLogger().Warn("Service warmup duration has elapsed.")
15✔
248
                if atomic.CompareAndSwapInt32(&wh.healthStatus, int32(HealthStatusWarmingUp), int32(HealthStatusOK)) {
27✔
249
                        wh.GetLogger().Warn("Warmup time has elapsed. Service is healthy.")
12✔
250
                } else {
12✔
251
                        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
×
252
                        wh.GetLogger().Warn(fmt.Sprintf("Warmup time has elapsed. Service status is: %v", status.String()))
×
253
                }
×
254
        }()
255
}
256

257
// Stop stops the handler
258
func (wh *WorkflowHandler) Stop() {
15✔
259
        atomic.StoreInt32(&wh.shuttingDown, 1)
15✔
260
}
15✔
261

262
// UpdateHealthStatus sets the health status for this rpc handler.
263
// This health status will be used within the rpc health check handler
264
func (wh *WorkflowHandler) UpdateHealthStatus(status HealthStatus) {
15✔
265
        atomic.StoreInt32(&wh.healthStatus, int32(status))
15✔
266
}
15✔
267

268
func (wh *WorkflowHandler) isShuttingDown() bool {
6,551✔
269
        return atomic.LoadInt32(&wh.shuttingDown) != 0
6,551✔
270
}
6,551✔
271

272
// Health is for health check
273
func (wh *WorkflowHandler) Health(ctx context.Context) (*types.HealthStatus, error) {
×
274
        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
×
275
        msg := status.String()
×
276

×
277
        if status != HealthStatusOK {
×
278
                wh.GetLogger().Warn(fmt.Sprintf("Service status is: %v", msg))
×
279
        }
×
280

281
        return &types.HealthStatus{
×
282
                Ok:  status == HealthStatusOK,
×
283
                Msg: msg,
×
284
        }, nil
×
285
}
286

287
// RegisterDomain creates a new domain which can be used as a container for all resources.  Domain is a top level
288
// entity within Cadence, used as a container for all resources like workflow executions, tasklists, etc.  Domain
289
// acts as a sandbox and provides isolation for all resources within the domain.  All resources belongs to exactly one
290
// domain.
291
func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *types.RegisterDomainRequest) (retError error) {
45✔
292
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
90✔
293

294
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendRegisterDomainScope)
45✔
295
        defer sw.Stop()
45✔
296

45✔
297
        if wh.isShuttingDown() {
45✔
298
                return errShuttingDown
×
299
        }
×
300

301
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
45✔
302
                return wh.error(err, scope)
×
303
        }
×
304

305
        if registerRequest == nil {
45✔
306
                return errRequestNotSet
×
307
        }
×
308

309
        if registerRequest.GetWorkflowExecutionRetentionPeriodInDays() > int32(wh.config.domainConfig.MaxRetentionDays()) {
45✔
310
                return errInvalidRetention
×
311
        }
×
312

313
        if err := checkPermission(wh.config, registerRequest.SecurityToken); err != nil {
45✔
314
                return err
×
315
        }
×
316

317
        if err := checkRequiredDomainDataKVs(wh.config.domainConfig.RequiredDomainDataKeys(), registerRequest.GetData()); err != nil {
46✔
318
                return err
1✔
319
        }
1✔
320

321
        if registerRequest.GetName() == "" {
44✔
322
                return errDomainNotSet
×
323
        }
×
324

325
        err := wh.domainHandler.RegisterDomain(ctx, registerRequest)
44✔
326
        if err != nil {
45✔
327
                return wh.error(err, scope)
1✔
328
        }
1✔
329
        return nil
43✔
330
}
331

332
// ListDomains returns the information and configuration for a registered domain.
333
func (wh *WorkflowHandler) ListDomains(
334
        ctx context.Context,
335
        listRequest *types.ListDomainsRequest,
336
) (response *types.ListDomainsResponse, retError error) {
×
337
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
338

339
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendListDomainsScope)
×
340
        defer sw.Stop()
×
341

×
342
        if wh.isShuttingDown() {
×
343
                return nil, errShuttingDown
×
344
        }
×
345

346
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
347
                return nil, wh.error(err, scope)
×
348
        }
×
349

350
        if listRequest == nil {
×
351
                return nil, errRequestNotSet
×
352
        }
×
353

354
        resp, err := wh.domainHandler.ListDomains(ctx, listRequest)
×
355
        if err != nil {
×
356
                return resp, wh.error(err, scope)
×
357
        }
×
358
        return resp, err
×
359
}
360

361
// DescribeDomain returns the information and configuration for a registered domain.
362
func (wh *WorkflowHandler) DescribeDomain(
363
        ctx context.Context,
364
        describeRequest *types.DescribeDomainRequest,
365
) (response *types.DescribeDomainResponse, retError error) {
134✔
366
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
268✔
367

368
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendDescribeDomainScope)
134✔
369
        defer sw.Stop()
134✔
370

134✔
371
        if wh.isShuttingDown() {
134✔
372
                return nil, errShuttingDown
×
373
        }
×
374

375
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
134✔
376
                return nil, wh.error(err, scope)
×
377
        }
×
378

379
        if describeRequest == nil {
134✔
380
                return nil, errRequestNotSet
×
381
        }
×
382

383
        if describeRequest.GetName() == "" && describeRequest.GetUUID() == "" {
134✔
384
                return nil, errDomainNotSet
×
385
        }
×
386

387
        resp, err := wh.domainHandler.DescribeDomain(ctx, describeRequest)
134✔
388
        if err != nil {
134✔
389
                return resp, wh.error(err, scope)
×
390
        }
×
391

392
        if resp.GetFailoverInfo() != nil && resp.GetFailoverInfo().GetFailoverExpireTimestamp() > 0 {
134✔
393
                // fetch ongoing failover info from history service
×
394
                failoverResp, err := wh.GetHistoryClient().GetFailoverInfo(ctx, &types.GetFailoverInfoRequest{
×
395
                        DomainID: resp.GetDomainInfo().UUID,
×
396
                })
×
397
                if err != nil {
×
398
                        // despite the error from history, return describe domain response
×
399
                        wh.GetLogger().Error(
×
400
                                fmt.Sprintf("Failed to get failover info for domain %s", resp.DomainInfo.GetName()),
×
401
                                tag.Error(err),
×
402
                        )
×
403
                        return resp, nil
×
404
                }
×
405
                resp.FailoverInfo.CompletedShardCount = failoverResp.GetCompletedShardCount()
×
406
                resp.FailoverInfo.PendingShards = failoverResp.GetPendingShards()
×
407
        }
408
        return resp, err
134✔
409
}
410

411
// UpdateDomain is used to update the information and configuration for a registered domain.
412
func (wh *WorkflowHandler) UpdateDomain(
413
        ctx context.Context,
414
        updateRequest *types.UpdateDomainRequest,
415
) (resp *types.UpdateDomainResponse, retError error) {
9✔
416
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
18✔
417

418
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendUpdateDomainScope)
9✔
419
        defer sw.Stop()
9✔
420

9✔
421
        domainName := ""
9✔
422
        if updateRequest != nil {
18✔
423
                domainName = updateRequest.GetName()
9✔
424
        }
9✔
425

426
        logger := wh.GetLogger().WithTags(
9✔
427
                tag.WorkflowDomainName(domainName),
9✔
428
                tag.OperationName("DomainUpdate"))
9✔
429

9✔
430
        if updateRequest == nil {
9✔
431
                logger.Error("Nil domain update request.",
×
432
                        tag.Error(errRequestNotSet))
×
433
                return nil, errRequestNotSet
×
434
        }
×
435

436
        isFailover := isFailoverRequest(updateRequest)
9✔
437
        isGraceFailover := isGraceFailoverRequest(updateRequest)
9✔
438
        logger.Info(fmt.Sprintf(
9✔
439
                "Domain Update requested. isFailover: %v, isGraceFailover: %v, Request: %#v.",
9✔
440
                isFailover,
9✔
441
                isGraceFailover,
9✔
442
                updateRequest))
9✔
443

9✔
444
        if wh.isShuttingDown() {
9✔
445
                logger.Error("Won't apply the domain update since workflowHandler is shutting down.",
×
446
                        tag.Error(errShuttingDown))
×
447
                return nil, errShuttingDown
×
448
        }
×
449

450
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
9✔
451
                logger.Error("Won't apply the domain update since client version is not supported.",
×
452
                        tag.Error(err))
×
453
                return nil, wh.error(err, scope)
×
454
        }
×
455

456
        // don't require permission for failover request
457
        if isFailover {
11✔
458
                // reject the failover if the cluster is in lockdown
2✔
459
                if err := checkFailOverPermission(wh.config, updateRequest.GetName()); err != nil {
3✔
460
                        logger.Error("Domain failover request rejected since domain is in lockdown.",
1✔
461
                                tag.Error(err))
1✔
462
                        return nil, err
1✔
463
                }
1✔
464
        } else {
7✔
465
                if err := checkPermission(wh.config, updateRequest.SecurityToken); err != nil {
7✔
466
                        logger.Error("Domain update request rejected due to failing permissions.",
×
467
                                tag.Error(err))
×
468
                        return nil, err
×
469
                }
×
470
        }
471

472
        if isGraceFailover {
9✔
473
                if err := wh.checkOngoingFailover(
1✔
474
                        ctx,
1✔
475
                        &updateRequest.Name,
1✔
476
                ); err != nil {
1✔
477
                        logger.Error("Graceful domain failover request failed. Not able to check ongoing failovers.",
×
478
                                tag.Error(err))
×
479
                        return nil, err
×
480
                }
×
481
        }
482

483
        if updateRequest.GetName() == "" {
8✔
484
                logger.Error("Domain not set on request.",
×
485
                        tag.Error(errDomainNotSet))
×
486
                return nil, errDomainNotSet
×
487
        }
×
488
        // TODO: call remote clusters to verify domain data
489
        resp, err := wh.domainHandler.UpdateDomain(ctx, updateRequest)
8✔
490
        if err != nil {
10✔
491
                logger.Error("Domain update operation failed.",
2✔
492
                        tag.Error(err))
2✔
493
                return resp, wh.error(err, scope)
2✔
494
        }
2✔
495
        logger.Info("Domain update operation succeeded.")
6✔
496
        return resp, err
6✔
497
}
498

499
// DeprecateDomain us used to update status of a registered domain to DEPRECATED. Once the domain is deprecated
500
// it cannot be used to start new workflow executions.  Existing workflow executions will continue to run on
501
// deprecated domains.
502
func (wh *WorkflowHandler) DeprecateDomain(ctx context.Context, deprecateRequest *types.DeprecateDomainRequest) (retError error) {
×
503
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
504

505
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendDeprecateDomainScope)
×
506
        defer sw.Stop()
×
507

×
508
        if wh.isShuttingDown() {
×
509
                return errShuttingDown
×
510
        }
×
511

512
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
513
                return wh.error(err, scope)
×
514
        }
×
515

516
        if deprecateRequest == nil {
×
517
                return errRequestNotSet
×
518
        }
×
519

520
        if err := checkPermission(wh.config, deprecateRequest.SecurityToken); err != nil {
×
521
                return err
×
522
        }
×
523

524
        if deprecateRequest.GetName() == "" {
×
525
                return errDomainNotSet
×
526
        }
×
527

528
        err := wh.domainHandler.DeprecateDomain(ctx, deprecateRequest)
×
529
        if err != nil {
×
530
                return wh.error(err, scope)
×
531
        }
×
532
        return err
×
533
}
534

535
// PollForActivityTask - Poll for an activity task.
536
func (wh *WorkflowHandler) PollForActivityTask(
537
        ctx context.Context,
538
        pollRequest *types.PollForActivityTaskRequest,
539
) (resp *types.PollForActivityTaskResponse, retError error) {
762✔
540
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
1,524✔
541

542
        callTime := time.Now()
762✔
543

762✔
544
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendPollForActivityTaskScope, pollRequest)
762✔
545
        defer sw.Stop()
762✔
546

762✔
547
        if wh.isShuttingDown() {
762✔
548
                return nil, errShuttingDown
×
549
        }
×
550

551
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
762✔
552
                return nil, wh.error(err, scope)
×
553
        }
×
554

555
        if pollRequest == nil {
762✔
556
                return nil, wh.error(errRequestNotSet, scope)
×
557
        }
×
558

559
        domainName := pollRequest.GetDomain()
762✔
560
        tags := getDomainWfIDRunIDTags(domainName, nil)
762✔
561

762✔
562
        if domainName == "" {
762✔
563
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
564
        }
×
565

566
        wh.GetLogger().Debug("Received PollForActivityTask")
762✔
567
        if err := common.ValidateLongPollContextTimeout(
762✔
568
                ctx,
762✔
569
                "PollForActivityTask",
762✔
570
                wh.GetThrottledLogger(),
762✔
571
        ); err != nil {
764✔
572
                return nil, wh.error(err, scope, tags...)
2✔
573
        }
2✔
574

575
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
760✔
576
        if !common.ValidIDLength(
760✔
577
                domainName,
760✔
578
                scope,
760✔
579
                idLengthWarnLimit,
760✔
580
                wh.config.DomainNameMaxLength(domainName),
760✔
581
                metrics.CadenceErrDomainNameExceededWarnLimit,
760✔
582
                domainName,
760✔
583
                wh.GetLogger(),
760✔
584
                tag.IDTypeDomainName) {
760✔
585
                return nil, wh.error(errDomainTooLong, scope, tags...)
×
586
        }
×
587

588
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
760✔
589
                return nil, wh.error(err, scope, tags...)
×
590
        }
×
591

592
        if !common.ValidIDLength(
760✔
593
                pollRequest.GetIdentity(),
760✔
594
                scope,
760✔
595
                idLengthWarnLimit,
760✔
596
                wh.config.IdentityMaxLength(domainName),
760✔
597
                metrics.CadenceErrIdentityExceededWarnLimit,
760✔
598
                domainName,
760✔
599
                wh.GetLogger(),
760✔
600
                tag.IDTypeIdentity) {
760✔
601
                return nil, wh.error(errIdentityTooLong, scope, tags...)
×
602
        }
×
603

604
        if ok := wh.allow(ratelimitTypeWorker, pollRequest); !ok {
760✔
605
                // pollers exponentially back off up to 10s
×
606
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
607
        }
×
608

609
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
760✔
610
        if err != nil {
1,072✔
611
                return nil, wh.error(err, scope, tags...)
312✔
612
        }
312✔
613

614
        pollerID := uuid.New()
448✔
615
        op := func() error {
896✔
616
                resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{
448✔
617
                        DomainUUID:  domainID,
448✔
618
                        PollerID:    pollerID,
448✔
619
                        PollRequest: pollRequest,
448✔
620
                })
448✔
621
                return err
448✔
622
        }
448✔
623

624
        err = wh.throttleRetry.Do(ctx, op)
448✔
625
        if err != nil {
514✔
626
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeActivity, pollRequest.TaskList, pollerID)
66✔
627
                if err != nil {
66✔
628
                        // For all other errors log an error and return it back to client.
×
629
                        ctxTimeout := "not-set"
×
630
                        ctxDeadline, ok := ctx.Deadline()
×
631
                        if ok {
×
632
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
633
                        }
×
634
                        wh.GetLogger().Error("PollForActivityTask failed.",
×
635
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
636
                                tag.Value(ctxTimeout),
×
637
                                tag.Error(err))
×
638
                        return nil, wh.error(err, scope, tags...)
×
639
                }
640
        }
641
        return resp, nil
448✔
642
}
643

644
// PollForDecisionTask - Poll for a decision task.
645
func (wh *WorkflowHandler) PollForDecisionTask(
646
        ctx context.Context,
647
        pollRequest *types.PollForDecisionTaskRequest,
648
) (resp *types.PollForDecisionTaskResponse, retError error) {
1,548✔
649
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
3,096✔
650

651
        callTime := time.Now()
1,548✔
652

1,548✔
653
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendPollForDecisionTaskScope, pollRequest)
1,548✔
654
        defer sw.Stop()
1,548✔
655

1,548✔
656
        if wh.isShuttingDown() {
1,548✔
657
                return nil, errShuttingDown
×
658
        }
×
659

660
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1,548✔
661
                return nil, wh.error(err, scope)
×
662
        }
×
663

664
        if pollRequest == nil {
1,548✔
665
                return nil, wh.error(errRequestNotSet, scope)
×
666
        }
×
667

668
        domainName := pollRequest.GetDomain()
1,548✔
669
        tags := getDomainWfIDRunIDTags(domainName, nil)
1,548✔
670

1,548✔
671
        if domainName == "" {
1,548✔
672
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
673
        }
×
674

675
        wh.GetLogger().Debug("Received PollForDecisionTask")
1,548✔
676
        if err := common.ValidateLongPollContextTimeout(
1,548✔
677
                ctx,
1,548✔
678
                "PollForDecisionTask",
1,548✔
679
                wh.GetThrottledLogger(),
1,548✔
680
        ); err != nil {
1,550✔
681
                return nil, wh.error(err, scope, tags...)
2✔
682
        }
2✔
683

684
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
1,546✔
685
        if !common.ValidIDLength(
1,546✔
686
                domainName,
1,546✔
687
                scope,
1,546✔
688
                idLengthWarnLimit,
1,546✔
689
                wh.config.DomainNameMaxLength(domainName),
1,546✔
690
                metrics.CadenceErrDomainNameExceededWarnLimit,
1,546✔
691
                domainName,
1,546✔
692
                wh.GetLogger(),
1,546✔
693
                tag.IDTypeDomainName) {
1,546✔
694
                return nil, wh.error(errDomainTooLong, scope, tags...)
×
695
        }
×
696

697
        if !common.ValidIDLength(
1,546✔
698
                pollRequest.GetIdentity(),
1,546✔
699
                scope,
1,546✔
700
                idLengthWarnLimit,
1,546✔
701
                wh.config.IdentityMaxLength(domainName),
1,546✔
702
                metrics.CadenceErrIdentityExceededWarnLimit,
1,546✔
703
                domainName,
1,546✔
704
                wh.GetLogger(),
1,546✔
705
                tag.IDTypeIdentity) {
1,546✔
706
                return nil, wh.error(errIdentityTooLong, scope, tags...)
×
707
        }
×
708

709
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
1,546✔
710
                return nil, wh.error(err, scope, tags...)
×
711
        }
×
712

713
        domainEntry, err := wh.GetDomainCache().GetDomain(domainName)
1,546✔
714
        if err != nil {
1,862✔
715
                return nil, wh.error(err, scope, tags...)
316✔
716
        }
316✔
717
        domainID := domainEntry.GetInfo().ID
1,230✔
718

1,230✔
719
        wh.GetLogger().Debug("Poll for decision.", tag.WorkflowDomainName(domainName), tag.WorkflowDomainID(domainID))
1,230✔
720
        if err := wh.checkBadBinary(domainEntry, pollRequest.GetBinaryChecksum()); err != nil {
1,230✔
721
                return nil, wh.error(err, scope, tags...)
×
722
        }
×
723

724
        if ok := wh.allow(ratelimitTypeWorker, pollRequest); !ok {
1,230✔
725
                // pollers exponentially back off up to 10s
×
726
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
727
        }
×
728

729
        pollerID := uuid.New()
1,230✔
730
        var matchingResp *types.MatchingPollForDecisionTaskResponse
1,230✔
731
        op := func() error {
2,460✔
732
                matchingResp, err = wh.GetMatchingClient().PollForDecisionTask(ctx, &types.MatchingPollForDecisionTaskRequest{
1,230✔
733
                        DomainUUID:  domainID,
1,230✔
734
                        PollerID:    pollerID,
1,230✔
735
                        PollRequest: pollRequest,
1,230✔
736
                })
1,230✔
737
                return err
1,230✔
738
        }
1,230✔
739

740
        err = wh.throttleRetry.Do(ctx, op)
1,230✔
741
        if err != nil {
1,296✔
742
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeDecision, pollRequest.TaskList, pollerID)
66✔
743
                if err != nil {
66✔
744
                        // For all other errors log an error and return it back to client.
×
745
                        ctxTimeout := "not-set"
×
746
                        ctxDeadline, ok := ctx.Deadline()
×
747
                        if ok {
×
748
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
749
                        }
×
750
                        wh.GetLogger().Error("PollForDecisionTask failed.",
×
751
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
752
                                tag.Value(ctxTimeout),
×
753
                                tag.Error(err))
×
754
                        return nil, wh.error(err, scope)
×
755
                }
756

757
                // Must be cancellation error.  Does'nt matter what we return here.  Client already went away.
758
                return nil, nil
66✔
759
        }
760

761
        tags = append(tags, []tag.Tag{tag.WorkflowID(
1,164✔
762
                matchingResp.GetWorkflowExecution().GetWorkflowID()),
1,164✔
763
                tag.WorkflowRunID(matchingResp.GetWorkflowExecution().GetRunID())}...)
1,164✔
764
        resp, err = wh.createPollForDecisionTaskResponse(ctx, scope, domainID, matchingResp, matchingResp.GetBranchToken())
1,164✔
765
        if err != nil {
1,164✔
766
                return nil, wh.error(err, scope, tags...)
×
767
        }
×
768
        return resp, nil
1,164✔
769
}
770

771
func (wh *WorkflowHandler) checkBadBinary(domainEntry *cache.DomainCacheEntry, binaryChecksum string) error {
1,230✔
772
        if domainEntry.GetConfig().BadBinaries.Binaries != nil {
2,460✔
773
                badBinaries := domainEntry.GetConfig().BadBinaries.Binaries
1,230✔
774
                _, ok := badBinaries[binaryChecksum]
1,230✔
775
                if ok {
1,230✔
776
                        wh.GetMetricsClient().IncCounter(metrics.FrontendPollForDecisionTaskScope, metrics.CadenceErrBadBinaryCounter)
×
777
                        return &types.BadRequestError{
×
778
                                Message: fmt.Sprintf("binary %v already marked as bad deployment", binaryChecksum),
×
779
                        }
×
780
                }
×
781
        }
782
        return nil
1,230✔
783
}
784

785
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error, domainID string, taskListType int32,
786
        taskList *types.TaskList, pollerID string) error {
132✔
787
        // First check if this err is due to context cancellation.  This means client connection to frontend is closed.
132✔
788
        if ctx.Err() == context.Canceled {
264✔
789
                // Our rpc stack does not propagates context cancellation to the other service.  Lets make an explicit
132✔
790
                // call to matching to notify this poller is gone to prevent any tasks being dispatched to zombie pollers.
132✔
791
                err = wh.GetMatchingClient().CancelOutstandingPoll(context.Background(), &types.CancelOutstandingPollRequest{
132✔
792
                        DomainUUID:   domainID,
132✔
793
                        TaskListType: common.Int32Ptr(taskListType),
132✔
794
                        TaskList:     taskList,
132✔
795
                        PollerID:     pollerID,
132✔
796
                })
132✔
797
                // We can not do much if this call fails.  Just log the error and move on
132✔
798
                if err != nil {
132✔
799
                        wh.GetLogger().Warn("Failed to cancel outstanding poller.",
×
800
                                tag.WorkflowTaskListName(taskList.GetName()), tag.Error(err))
×
801
                }
×
802

803
                // Clear error as we don't want to report context cancellation error to count against our SLA
804
                return nil
132✔
805
        }
806

807
        return err
×
808
}
809

810
// RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
811
func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
812
        ctx context.Context,
813
        heartbeatRequest *types.RecordActivityTaskHeartbeatRequest,
814
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
381✔
815
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
762✔
816

817
        scope := wh.getDefaultScope(ctx, metrics.FrontendRecordActivityTaskHeartbeatScope)
381✔
818

381✔
819
        if wh.isShuttingDown() {
381✔
820
                return nil, errShuttingDown
×
821
        }
×
822

823
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
381✔
824
                return nil, wh.error(err, scope)
×
825
        }
×
826

827
        if heartbeatRequest == nil {
381✔
828
                return nil, wh.error(errRequestNotSet, scope)
×
829
        }
×
830

831
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeat")
381✔
832
        if heartbeatRequest.TaskToken == nil {
381✔
833
                return nil, wh.error(errTaskTokenNotSet, scope)
×
834
        }
×
835
        taskToken, err := wh.tokenSerializer.Deserialize(heartbeatRequest.TaskToken)
381✔
836
        if err != nil {
381✔
837
                return nil, wh.error(err, scope)
×
838
        }
×
839
        if taskToken.DomainID == "" {
381✔
840
                return nil, wh.error(errDomainNotSet, scope)
×
841
        }
×
842

843
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
381✔
844
        if err != nil {
381✔
845
                return nil, wh.error(err, scope)
×
846
        }
×
847

848
        dw := domainWrapper{
381✔
849
                domain: domainName,
381✔
850
        }
381✔
851
        scope, sw := wh.startRequestProfileWithDomain(
381✔
852
                ctx,
381✔
853
                metrics.FrontendRecordActivityTaskHeartbeatScope,
381✔
854
                dw,
381✔
855
        )
381✔
856
        defer sw.Stop()
381✔
857

381✔
858
        // Count the request in the host RPS,
381✔
859
        // but we still accept it even if RPS is exceeded
381✔
860
        wh.allow(ratelimitTypeWorker, dw)
381✔
861

381✔
862
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
381✔
863
                WorkflowID: taskToken.WorkflowID,
381✔
864
                RunID:      taskToken.RunID,
381✔
865
        })
381✔
866

381✔
867
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
381✔
868
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
381✔
869

381✔
870
        if err := common.CheckEventBlobSizeLimit(
381✔
871
                len(heartbeatRequest.Details),
381✔
872
                sizeLimitWarn,
381✔
873
                sizeLimitError,
381✔
874
                taskToken.DomainID,
381✔
875
                taskToken.WorkflowID,
381✔
876
                taskToken.RunID,
381✔
877
                scope,
381✔
878
                wh.GetThrottledLogger(),
381✔
879
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeat"),
381✔
880
        ); err != nil {
381✔
881
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
882
                failRequest := &types.RespondActivityTaskFailedRequest{
×
883
                        TaskToken: heartbeatRequest.TaskToken,
×
884
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
885
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
886
                        Identity:  heartbeatRequest.Identity,
×
887
                }
×
888
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
889
                        DomainUUID:    taskToken.DomainID,
×
890
                        FailedRequest: failRequest,
×
891
                })
×
892
                if err != nil {
×
893
                        return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
894
                }
×
895
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
896
        } else {
381✔
897
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
381✔
898
                        DomainUUID:       taskToken.DomainID,
381✔
899
                        HeartbeatRequest: heartbeatRequest,
381✔
900
                })
381✔
901
                if err != nil {
381✔
902
                        return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
903
                }
×
904
        }
905

906
        return resp, nil
381✔
907
}
908

909
// RecordActivityTaskHeartbeatByID - Record Activity Task Heart beat.
910
func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
911
        ctx context.Context,
912
        heartbeatRequest *types.RecordActivityTaskHeartbeatByIDRequest,
913
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
×
914
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
915

916
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRecordActivityTaskHeartbeatByIDScope, heartbeatRequest)
×
917
        defer sw.Stop()
×
918

×
919
        if wh.isShuttingDown() {
×
920
                return nil, errShuttingDown
×
921
        }
×
922

923
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
924
                return nil, wh.error(err, scope)
×
925
        }
×
926

927
        if heartbeatRequest == nil {
×
928
                return nil, wh.error(errRequestNotSet, scope)
×
929
        }
×
930

931
        domainName := heartbeatRequest.GetDomain()
×
932
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
×
933
                WorkflowID: heartbeatRequest.GetWorkflowID(),
×
934
                RunID:      heartbeatRequest.GetRunID(),
×
935
        })
×
936

×
937
        if domainName == "" {
×
938
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
939
        }
×
940

941
        // Count the request in the host RPS,
942
        // but we still accept it even if RPS is exceeded
943
        wh.allow(ratelimitTypeWorker, heartbeatRequest)
×
944

×
945
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID")
×
946
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
947
        if err != nil {
×
948
                return nil, wh.error(err, scope, tags...)
×
949
        }
×
950
        workflowID := heartbeatRequest.GetWorkflowID()
×
951
        runID := heartbeatRequest.GetRunID() // runID is optional so can be empty
×
952
        activityID := heartbeatRequest.GetActivityID()
×
953

×
954
        if domainID == "" {
×
955
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
956
        }
×
957
        if workflowID == "" {
×
958
                return nil, wh.error(errWorkflowIDNotSet, scope, tags...)
×
959
        }
×
960
        if activityID == "" {
×
961
                return nil, wh.error(errActivityIDNotSet, scope, tags...)
×
962
        }
×
963

964
        taskToken := &common.TaskToken{
×
965
                DomainID:   domainID,
×
966
                RunID:      runID,
×
967
                WorkflowID: workflowID,
×
968
                ScheduleID: common.EmptyEventID,
×
969
                ActivityID: activityID,
×
970
        }
×
971
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
972
        if err != nil {
×
973
                return nil, wh.error(err, scope, tags...)
×
974
        }
×
975

976
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
977
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
978

×
979
        if err := common.CheckEventBlobSizeLimit(
×
980
                len(heartbeatRequest.Details),
×
981
                sizeLimitWarn,
×
982
                sizeLimitError,
×
983
                taskToken.DomainID,
×
984
                taskToken.WorkflowID,
×
985
                taskToken.RunID,
×
986
                scope,
×
987
                wh.GetThrottledLogger(),
×
988
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeatByID"),
×
989
        ); err != nil {
×
990
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
991
                failRequest := &types.RespondActivityTaskFailedRequest{
×
992
                        TaskToken: token,
×
993
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
994
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
995
                        Identity:  heartbeatRequest.Identity,
×
996
                }
×
997
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
998
                        DomainUUID:    taskToken.DomainID,
×
999
                        FailedRequest: failRequest,
×
1000
                })
×
1001
                if err != nil {
×
1002
                        return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1003
                }
×
1004
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
1005
        } else {
×
1006
                req := &types.RecordActivityTaskHeartbeatRequest{
×
1007
                        TaskToken: token,
×
1008
                        Details:   heartbeatRequest.Details,
×
1009
                        Identity:  heartbeatRequest.Identity,
×
1010
                }
×
1011

×
1012
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
×
1013
                        DomainUUID:       taskToken.DomainID,
×
1014
                        HeartbeatRequest: req,
×
1015
                })
×
1016
                if err != nil {
×
1017
                        return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1018
                }
×
1019
        }
1020

1021
        return resp, nil
×
1022
}
1023

1024
// RespondActivityTaskCompleted - response to an activity task
1025
func (wh *WorkflowHandler) RespondActivityTaskCompleted(
1026
        ctx context.Context,
1027
        completeRequest *types.RespondActivityTaskCompletedRequest,
1028
) (retError error) {
246✔
1029
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
492✔
1030

1031
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondActivityTaskCompletedScope)
246✔
1032

246✔
1033
        if wh.isShuttingDown() {
246✔
1034
                return errShuttingDown
×
1035
        }
×
1036

1037
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
246✔
1038
                return wh.error(err, scope)
×
1039
        }
×
1040

1041
        if completeRequest == nil {
246✔
1042
                return wh.error(errRequestNotSet, scope)
×
1043
        }
×
1044

1045
        if completeRequest.TaskToken == nil {
246✔
1046
                return wh.error(errTaskTokenNotSet, scope)
×
1047
        }
×
1048
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
246✔
1049
        if err != nil {
246✔
1050
                return wh.error(err, scope)
×
1051
        }
×
1052
        if taskToken.DomainID == "" {
246✔
1053
                return wh.error(errDomainNotSet, scope)
×
1054
        }
×
1055

1056
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
246✔
1057
        if err != nil {
246✔
1058
                return wh.error(err, scope)
×
1059
        }
×
1060

1061
        dw := domainWrapper{
246✔
1062
                domain: domainName,
246✔
1063
        }
246✔
1064
        scope, sw := wh.startRequestProfileWithDomain(
246✔
1065
                ctx,
246✔
1066
                metrics.FrontendRespondActivityTaskCompletedScope,
246✔
1067
                dw,
246✔
1068
        )
246✔
1069
        defer sw.Stop()
246✔
1070

246✔
1071
        // Count the request in the host RPS,
246✔
1072
        // but we still accept it even if RPS is exceeded
246✔
1073
        wh.allow(ratelimitTypeWorker, dw)
246✔
1074

246✔
1075
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
246✔
1076
                WorkflowID: taskToken.WorkflowID,
246✔
1077
                RunID:      taskToken.RunID,
246✔
1078
        })
246✔
1079

246✔
1080
        if !common.ValidIDLength(
246✔
1081
                completeRequest.GetIdentity(),
246✔
1082
                scope,
246✔
1083
                wh.config.MaxIDLengthWarnLimit(),
246✔
1084
                wh.config.IdentityMaxLength(domainName),
246✔
1085
                metrics.CadenceErrIdentityExceededWarnLimit,
246✔
1086
                domainName,
246✔
1087
                wh.GetLogger(),
246✔
1088
                tag.IDTypeIdentity) {
246✔
1089
                return wh.error(errIdentityTooLong, scope, tags...)
×
1090
        }
×
1091

1092
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
246✔
1093
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
246✔
1094

246✔
1095
        if err := common.CheckEventBlobSizeLimit(
246✔
1096
                len(completeRequest.Result),
246✔
1097
                sizeLimitWarn,
246✔
1098
                sizeLimitError,
246✔
1099
                taskToken.DomainID,
246✔
1100
                taskToken.WorkflowID,
246✔
1101
                taskToken.RunID,
246✔
1102
                scope,
246✔
1103
                wh.GetThrottledLogger(),
246✔
1104
                tag.BlobSizeViolationOperation("RespondActivityTaskCompleted"),
246✔
1105
        ); err != nil {
246✔
1106
                // result exceeds blob size limit, we would record it as failure
×
1107
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1108
                        TaskToken: completeRequest.TaskToken,
×
1109
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
1110
                        Details:   completeRequest.Result[0:sizeLimitError],
×
1111
                        Identity:  completeRequest.Identity,
×
1112
                }
×
1113
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1114
                        DomainUUID:    taskToken.DomainID,
×
1115
                        FailedRequest: failRequest,
×
1116
                })
×
1117
                if err != nil {
×
1118
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1119
                }
×
1120
        } else {
246✔
1121
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
246✔
1122
                        DomainUUID:      taskToken.DomainID,
246✔
1123
                        CompleteRequest: completeRequest,
246✔
1124
                })
246✔
1125
                if err != nil {
291✔
1126
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
45✔
1127
                }
45✔
1128
        }
1129

1130
        return nil
201✔
1131
}
1132

1133
// RespondActivityTaskCompletedByID - response to an activity task
1134
func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
1135
        ctx context.Context,
1136
        completeRequest *types.RespondActivityTaskCompletedByIDRequest,
1137
) (retError error) {
75✔
1138
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
150✔
1139

1140
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRespondActivityTaskCompletedByIDScope, completeRequest)
75✔
1141
        defer sw.Stop()
75✔
1142

75✔
1143
        if wh.isShuttingDown() {
75✔
1144
                return errShuttingDown
×
1145
        }
×
1146

1147
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
75✔
1148
                return wh.error(err, scope)
×
1149
        }
×
1150

1151
        if completeRequest == nil {
75✔
1152
                return wh.error(errRequestNotSet, scope)
×
1153
        }
×
1154

1155
        domainName := completeRequest.GetDomain()
75✔
1156
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
75✔
1157
                WorkflowID: completeRequest.GetWorkflowID(),
75✔
1158
                RunID:      completeRequest.GetRunID(),
75✔
1159
        })
75✔
1160

75✔
1161
        if domainName == "" {
75✔
1162
                return wh.error(errDomainNotSet, scope, tags...)
×
1163
        }
×
1164

1165
        // Count the request in the host RPS,
1166
        // but we still accept it even if RPS is exceeded
1167
        wh.allow(ratelimitTypeWorker, completeRequest)
75✔
1168

75✔
1169
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
75✔
1170
        if err != nil {
75✔
1171
                return wh.error(err, scope, tags...)
×
1172
        }
×
1173
        workflowID := completeRequest.GetWorkflowID()
75✔
1174
        runID := completeRequest.GetRunID() // runID is optional so can be empty
75✔
1175
        activityID := completeRequest.GetActivityID()
75✔
1176

75✔
1177
        if domainID == "" {
75✔
1178
                return wh.error(errDomainNotSet, scope, tags...)
×
1179
        }
×
1180
        if workflowID == "" {
75✔
1181
                return wh.error(errWorkflowIDNotSet, scope, tags...)
×
1182
        }
×
1183
        if activityID == "" {
75✔
1184
                return wh.error(errActivityIDNotSet, scope, tags...)
×
1185
        }
×
1186

1187
        if !common.ValidIDLength(
75✔
1188
                completeRequest.GetIdentity(),
75✔
1189
                scope,
75✔
1190
                wh.config.MaxIDLengthWarnLimit(),
75✔
1191
                wh.config.IdentityMaxLength(domainName),
75✔
1192
                metrics.CadenceErrIdentityExceededWarnLimit,
75✔
1193
                domainName,
75✔
1194
                wh.GetLogger(),
75✔
1195
                tag.IDTypeIdentity) {
75✔
1196
                return wh.error(errIdentityTooLong, scope)
×
1197
        }
×
1198

1199
        taskToken := &common.TaskToken{
75✔
1200
                DomainID:   domainID,
75✔
1201
                RunID:      runID,
75✔
1202
                WorkflowID: workflowID,
75✔
1203
                ScheduleID: common.EmptyEventID,
75✔
1204
                ActivityID: activityID,
75✔
1205
        }
75✔
1206
        token, err := wh.tokenSerializer.Serialize(taskToken)
75✔
1207
        if err != nil {
75✔
1208
                return wh.error(err, scope)
×
1209
        }
×
1210

1211
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
75✔
1212
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
75✔
1213

75✔
1214
        if err := common.CheckEventBlobSizeLimit(
75✔
1215
                len(completeRequest.Result),
75✔
1216
                sizeLimitWarn,
75✔
1217
                sizeLimitError,
75✔
1218
                taskToken.DomainID,
75✔
1219
                taskToken.WorkflowID,
75✔
1220
                taskToken.RunID,
75✔
1221
                scope,
75✔
1222
                wh.GetThrottledLogger(),
75✔
1223
                tag.BlobSizeViolationOperation("RespondActivityTaskCompletedByID"),
75✔
1224
        ); err != nil {
75✔
1225
                // result exceeds blob size limit, we would record it as failure
×
1226
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1227
                        TaskToken: token,
×
1228
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
1229
                        Details:   completeRequest.Result[0:sizeLimitError],
×
1230
                        Identity:  completeRequest.Identity,
×
1231
                }
×
1232
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1233
                        DomainUUID:    taskToken.DomainID,
×
1234
                        FailedRequest: failRequest,
×
1235
                })
×
1236
                if err != nil {
×
1237
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1238
                }
×
1239
        } else {
75✔
1240
                req := &types.RespondActivityTaskCompletedRequest{
75✔
1241
                        TaskToken: token,
75✔
1242
                        Result:    completeRequest.Result,
75✔
1243
                        Identity:  completeRequest.Identity,
75✔
1244
                }
75✔
1245

75✔
1246
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
75✔
1247
                        DomainUUID:      taskToken.DomainID,
75✔
1248
                        CompleteRequest: req,
75✔
1249
                })
75✔
1250
                if err != nil {
75✔
1251
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1252
                }
×
1253
        }
1254

1255
        return nil
75✔
1256
}
1257

1258
// RespondActivityTaskFailed - response to an activity task failure
1259
func (wh *WorkflowHandler) RespondActivityTaskFailed(
1260
        ctx context.Context,
1261
        failedRequest *types.RespondActivityTaskFailedRequest,
1262
) (retError error) {
12✔
1263
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
24✔
1264

1265
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondActivityTaskFailedScope)
12✔
1266

12✔
1267
        if wh.isShuttingDown() {
12✔
1268
                return errShuttingDown
×
1269
        }
×
1270

1271
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
12✔
1272
                return wh.error(err, scope)
×
1273
        }
×
1274

1275
        if failedRequest == nil {
12✔
1276
                return wh.error(errRequestNotSet, scope)
×
1277
        }
×
1278

1279
        if failedRequest.TaskToken == nil {
12✔
1280
                return wh.error(errTaskTokenNotSet, scope)
×
1281
        }
×
1282
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
12✔
1283
        if err != nil {
12✔
1284
                return wh.error(err, scope)
×
1285
        }
×
1286
        if taskToken.DomainID == "" {
12✔
1287
                return wh.error(errDomainNotSet, scope)
×
1288
        }
×
1289

1290
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
12✔
1291
        if err != nil {
12✔
1292
                return wh.error(err, scope)
×
1293
        }
×
1294

1295
        dw := domainWrapper{
12✔
1296
                domain: domainName,
12✔
1297
        }
12✔
1298
        scope, sw := wh.startRequestProfileWithDomain(
12✔
1299
                ctx,
12✔
1300
                metrics.FrontendRespondActivityTaskFailedScope,
12✔
1301
                dw,
12✔
1302
        )
12✔
1303
        defer sw.Stop()
12✔
1304

12✔
1305
        // Count the request in the host RPS,
12✔
1306
        // but we still accept it even if RPS is exceeded
12✔
1307
        wh.allow(ratelimitTypeWorker, dw)
12✔
1308

12✔
1309
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
12✔
1310
                WorkflowID: taskToken.WorkflowID,
12✔
1311
                RunID:      taskToken.RunID,
12✔
1312
        })
12✔
1313

12✔
1314
        if !common.ValidIDLength(
12✔
1315
                failedRequest.GetIdentity(),
12✔
1316
                scope,
12✔
1317
                wh.config.MaxIDLengthWarnLimit(),
12✔
1318
                wh.config.IdentityMaxLength(domainName),
12✔
1319
                metrics.CadenceErrIdentityExceededWarnLimit,
12✔
1320
                domainName,
12✔
1321
                wh.GetLogger(),
12✔
1322
                tag.IDTypeIdentity) {
12✔
1323
                return wh.error(errIdentityTooLong, scope, tags...)
×
1324
        }
×
1325

1326
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
12✔
1327
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
12✔
1328

12✔
1329
        if err := common.CheckEventBlobSizeLimit(
12✔
1330
                len(failedRequest.Details),
12✔
1331
                sizeLimitWarn,
12✔
1332
                sizeLimitError,
12✔
1333
                taskToken.DomainID,
12✔
1334
                taskToken.WorkflowID,
12✔
1335
                taskToken.RunID,
12✔
1336
                scope,
12✔
1337
                wh.GetThrottledLogger(),
12✔
1338
                tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
12✔
1339
        ); err != nil {
12✔
1340
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1341
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1342
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1343
        }
×
1344

1345
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
12✔
1346
                DomainUUID:    taskToken.DomainID,
12✔
1347
                FailedRequest: failedRequest,
12✔
1348
        })
12✔
1349
        if err != nil {
12✔
1350
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1351
        }
×
1352
        return nil
12✔
1353
}
1354

1355
// RespondActivityTaskFailedByID - response to an activity task failure
1356
func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
1357
        ctx context.Context,
1358
        failedRequest *types.RespondActivityTaskFailedByIDRequest,
1359
) (retError error) {
×
1360
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
1361

1362
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRespondActivityTaskFailedByIDScope, failedRequest)
×
1363
        defer sw.Stop()
×
1364

×
1365
        if wh.isShuttingDown() {
×
1366
                return errShuttingDown
×
1367
        }
×
1368

1369
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1370
                return wh.error(err, scope)
×
1371
        }
×
1372

1373
        if failedRequest == nil {
×
1374
                return wh.error(errRequestNotSet, scope)
×
1375
        }
×
1376

1377
        domainName := failedRequest.GetDomain()
×
1378
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
×
1379
                WorkflowID: failedRequest.GetWorkflowID(),
×
1380
                RunID:      failedRequest.GetRunID(),
×
1381
        })
×
1382

×
1383
        if domainName == "" {
×
1384
                return wh.error(errDomainNotSet, scope, tags...)
×
1385
        }
×
1386

1387
        // Count the request in the host RPS,
1388
        // but we still accept it even if RPS is exceeded
1389
        wh.allow(ratelimitTypeWorker, failedRequest)
×
1390

×
1391
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1392
        if err != nil {
×
1393
                return wh.error(err, scope, tags...)
×
1394
        }
×
1395
        workflowID := failedRequest.GetWorkflowID()
×
1396
        runID := failedRequest.GetRunID() // runID is optional so can be empty
×
1397
        activityID := failedRequest.GetActivityID()
×
1398

×
1399
        if domainID == "" {
×
1400
                return wh.error(errDomainNotSet, scope, tags...)
×
1401
        }
×
1402
        if workflowID == "" {
×
1403
                return wh.error(errWorkflowIDNotSet, scope, tags...)
×
1404
        }
×
1405
        if activityID == "" {
×
1406
                return wh.error(errActivityIDNotSet, scope, tags...)
×
1407
        }
×
1408

1409
        if !common.ValidIDLength(
×
1410
                failedRequest.GetIdentity(),
×
1411
                scope,
×
1412
                wh.config.MaxIDLengthWarnLimit(),
×
1413
                wh.config.IdentityMaxLength(failedRequest.GetDomain()),
×
1414
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1415
                domainName,
×
1416
                wh.GetLogger(),
×
1417
                tag.IDTypeIdentity) {
×
1418
                return wh.error(errIdentityTooLong, scope, tags...)
×
1419
        }
×
1420

1421
        taskToken := &common.TaskToken{
×
1422
                DomainID:   domainID,
×
1423
                RunID:      runID,
×
1424
                WorkflowID: workflowID,
×
1425
                ScheduleID: common.EmptyEventID,
×
1426
                ActivityID: activityID,
×
1427
        }
×
1428
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1429
        if err != nil {
×
1430
                return wh.error(err, scope, tags...)
×
1431
        }
×
1432

1433
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1434
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1435

×
1436
        if err := common.CheckEventBlobSizeLimit(
×
1437
                len(failedRequest.Details),
×
1438
                sizeLimitWarn,
×
1439
                sizeLimitError,
×
1440
                taskToken.DomainID,
×
1441
                taskToken.WorkflowID,
×
1442
                taskToken.RunID,
×
1443
                scope,
×
1444
                wh.GetThrottledLogger(),
×
1445
                tag.BlobSizeViolationOperation("RespondActivityTaskFailedByID"),
×
1446
        ); err != nil {
×
1447
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1448
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1449
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1450
        }
×
1451

1452
        req := &types.RespondActivityTaskFailedRequest{
×
1453
                TaskToken: token,
×
1454
                Reason:    failedRequest.Reason,
×
1455
                Details:   failedRequest.Details,
×
1456
                Identity:  failedRequest.Identity,
×
1457
        }
×
1458

×
1459
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1460
                DomainUUID:    taskToken.DomainID,
×
1461
                FailedRequest: req,
×
1462
        })
×
1463
        if err != nil {
×
1464
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1465
        }
×
1466
        return nil
×
1467
}
1468

1469
// RespondActivityTaskCanceled - called to cancel an activity task
1470
func (wh *WorkflowHandler) RespondActivityTaskCanceled(
1471
        ctx context.Context,
1472
        cancelRequest *types.RespondActivityTaskCanceledRequest,
1473
) (retError error) {
×
1474
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
1475

1476
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondActivityTaskCanceledScope)
×
1477

×
1478
        if wh.isShuttingDown() {
×
1479
                return errShuttingDown
×
1480
        }
×
1481

1482
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1483
                return wh.error(err, scope)
×
1484
        }
×
1485

1486
        if cancelRequest == nil {
×
1487
                return wh.error(errRequestNotSet, scope)
×
1488
        }
×
1489

1490
        if cancelRequest.TaskToken == nil {
×
1491
                return wh.error(errTaskTokenNotSet, scope)
×
1492
        }
×
1493

1494
        taskToken, err := wh.tokenSerializer.Deserialize(cancelRequest.TaskToken)
×
1495
        if err != nil {
×
1496
                return wh.error(err, scope)
×
1497
        }
×
1498

1499
        if taskToken.DomainID == "" {
×
1500
                return wh.error(errDomainNotSet, scope)
×
1501
        }
×
1502

1503
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
×
1504
        if err != nil {
×
1505
                return wh.error(err, scope)
×
1506
        }
×
1507

1508
        dw := domainWrapper{
×
1509
                domain: domainName,
×
1510
        }
×
1511
        scope, sw := wh.startRequestProfileWithDomain(
×
1512
                ctx,
×
1513
                metrics.FrontendRespondActivityTaskCanceledScope,
×
1514
                dw,
×
1515
        )
×
1516
        defer sw.Stop()
×
1517

×
1518
        // Count the request in the host RPS,
×
1519
        // but we still accept it even if RPS is exceeded
×
1520
        wh.allow(ratelimitTypeWorker, dw)
×
1521

×
1522
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
×
1523
                WorkflowID: taskToken.WorkflowID,
×
1524
                RunID:      taskToken.RunID,
×
1525
        })
×
1526

×
1527
        if !common.ValidIDLength(
×
1528
                cancelRequest.GetIdentity(),
×
1529
                scope,
×
1530
                wh.config.MaxIDLengthWarnLimit(),
×
1531
                wh.config.IdentityMaxLength(domainName),
×
1532
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1533
                domainName,
×
1534
                wh.GetLogger(),
×
1535
                tag.IDTypeIdentity) {
×
1536
                return wh.error(errIdentityTooLong, scope, tags...)
×
1537
        }
×
1538

1539
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1540
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1541

×
1542
        if err := common.CheckEventBlobSizeLimit(
×
1543
                len(cancelRequest.Details),
×
1544
                sizeLimitWarn,
×
1545
                sizeLimitError,
×
1546
                taskToken.DomainID,
×
1547
                taskToken.WorkflowID,
×
1548
                taskToken.RunID,
×
1549
                scope,
×
1550
                wh.GetThrottledLogger(),
×
1551
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceled"),
×
1552
        ); err != nil {
×
1553
                // details exceeds blob size limit, we would record it as failure
×
1554
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1555
                        TaskToken: cancelRequest.TaskToken,
×
1556
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1557
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1558
                        Identity:  cancelRequest.Identity,
×
1559
                }
×
1560
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1561
                        DomainUUID:    taskToken.DomainID,
×
1562
                        FailedRequest: failRequest,
×
1563
                })
×
1564
                if err != nil {
×
1565
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1566
                }
×
1567
        } else {
×
1568
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1569
                        DomainUUID:    taskToken.DomainID,
×
1570
                        CancelRequest: cancelRequest,
×
1571
                })
×
1572
                if err != nil {
×
1573
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1574
                }
×
1575
        }
1576

1577
        return nil
×
1578
}
1579

1580
// RespondActivityTaskCanceledByID - called to cancel an activity task
1581
func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
1582
        ctx context.Context,
1583
        cancelRequest *types.RespondActivityTaskCanceledByIDRequest,
1584
) (retError error) {
×
1585
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
1586

1587
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRespondActivityTaskCanceledScope, cancelRequest)
×
1588
        defer sw.Stop()
×
1589

×
1590
        if wh.isShuttingDown() {
×
1591
                return errShuttingDown
×
1592
        }
×
1593

1594
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1595
                return wh.error(err, scope)
×
1596
        }
×
1597

1598
        if cancelRequest == nil {
×
1599
                return wh.error(errRequestNotSet, scope)
×
1600
        }
×
1601

1602
        domainName := cancelRequest.GetDomain()
×
1603
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
×
1604
                WorkflowID: cancelRequest.GetWorkflowID(),
×
1605
                RunID:      cancelRequest.GetRunID(),
×
1606
        })
×
1607

×
1608
        if domainName == "" {
×
1609
                return wh.error(errDomainNotSet, scope, tags...)
×
1610
        }
×
1611

1612
        // Count the request in the host RPS,
1613
        // but we still accept it even if RPS is exceeded
1614
        wh.allow(ratelimitTypeWorker, cancelRequest)
×
1615

×
1616
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1617
        if err != nil {
×
1618
                return wh.error(err, scope, tags...)
×
1619
        }
×
1620
        workflowID := cancelRequest.GetWorkflowID()
×
1621
        runID := cancelRequest.GetRunID() // runID is optional so can be empty
×
1622
        activityID := cancelRequest.GetActivityID()
×
1623

×
1624
        if domainID == "" {
×
1625
                return wh.error(errDomainNotSet, scope, tags...)
×
1626
        }
×
1627
        if workflowID == "" {
×
1628
                return wh.error(errWorkflowIDNotSet, scope, tags...)
×
1629
        }
×
1630
        if activityID == "" {
×
1631
                return wh.error(errActivityIDNotSet, scope, tags...)
×
1632
        }
×
1633

1634
        if !common.ValidIDLength(
×
1635
                cancelRequest.GetIdentity(),
×
1636
                scope,
×
1637
                wh.config.MaxIDLengthWarnLimit(),
×
1638
                wh.config.IdentityMaxLength(cancelRequest.GetDomain()),
×
1639
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1640
                domainName,
×
1641
                wh.GetLogger(),
×
1642
                tag.IDTypeIdentity) {
×
1643
                return wh.error(errIdentityTooLong, scope, tags...)
×
1644
        }
×
1645

1646
        taskToken := &common.TaskToken{
×
1647
                DomainID:   domainID,
×
1648
                RunID:      runID,
×
1649
                WorkflowID: workflowID,
×
1650
                ScheduleID: common.EmptyEventID,
×
1651
                ActivityID: activityID,
×
1652
        }
×
1653
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1654
        if err != nil {
×
1655
                return wh.error(err, scope, tags...)
×
1656
        }
×
1657

1658
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1659
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1660

×
1661
        if err := common.CheckEventBlobSizeLimit(
×
1662
                len(cancelRequest.Details),
×
1663
                sizeLimitWarn,
×
1664
                sizeLimitError,
×
1665
                taskToken.DomainID,
×
1666
                taskToken.WorkflowID,
×
1667
                taskToken.RunID,
×
1668
                scope,
×
1669
                wh.GetThrottledLogger(),
×
1670
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceledByID"),
×
1671
        ); err != nil {
×
1672
                // details exceeds blob size limit, we would record it as failure
×
1673
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1674
                        TaskToken: token,
×
1675
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1676
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1677
                        Identity:  cancelRequest.Identity,
×
1678
                }
×
1679
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1680
                        DomainUUID:    taskToken.DomainID,
×
1681
                        FailedRequest: failRequest,
×
1682
                })
×
1683
                if err != nil {
×
1684
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1685
                }
×
1686
        } else {
×
1687
                req := &types.RespondActivityTaskCanceledRequest{
×
1688
                        TaskToken: token,
×
1689
                        Details:   cancelRequest.Details,
×
1690
                        Identity:  cancelRequest.Identity,
×
1691
                }
×
1692

×
1693
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1694
                        DomainUUID:    taskToken.DomainID,
×
1695
                        CancelRequest: req,
×
1696
                })
×
1697
                if err != nil {
×
1698
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1699
                }
×
1700
        }
1701

1702
        return nil
×
1703
}
1704

1705
// RespondDecisionTaskCompleted - response to a decision task
1706
func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
1707
        ctx context.Context,
1708
        completeRequest *types.RespondDecisionTaskCompletedRequest,
1709
) (resp *types.RespondDecisionTaskCompletedResponse, retError error) {
932✔
1710
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
1,864✔
1711

1712
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondDecisionTaskCompletedScope)
932✔
1713

932✔
1714
        if wh.isShuttingDown() {
932✔
1715
                return nil, errShuttingDown
×
1716
        }
×
1717

1718
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
932✔
1719
                return nil, wh.error(err, scope)
×
1720
        }
×
1721

1722
        if completeRequest == nil {
932✔
1723
                return nil, wh.error(errRequestNotSet, scope)
×
1724
        }
×
1725

1726
        if completeRequest.TaskToken == nil {
932✔
1727
                return nil, wh.error(errTaskTokenNotSet, scope)
×
1728
        }
×
1729
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
932✔
1730
        if err != nil {
932✔
1731
                return nil, wh.error(err, scope)
×
1732
        }
×
1733
        if taskToken.DomainID == "" {
932✔
1734
                return nil, wh.error(errDomainNotSet, scope)
×
1735
        }
×
1736

1737
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
932✔
1738
        if err != nil {
932✔
1739
                return nil, wh.error(err, scope)
×
1740
        }
×
1741

1742
        dw := domainWrapper{
932✔
1743
                domain: domainName,
932✔
1744
        }
932✔
1745
        scope, sw := wh.startRequestProfileWithDomain(
932✔
1746
                ctx,
932✔
1747
                metrics.FrontendRespondDecisionTaskCompletedScope,
932✔
1748
                dw,
932✔
1749
        )
932✔
1750
        defer sw.Stop()
932✔
1751

932✔
1752
        // Count the request in the host RPS,
932✔
1753
        // but we still accept it even if RPS is exceeded
932✔
1754
        wh.allow(ratelimitTypeWorker, dw)
932✔
1755

932✔
1756
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
932✔
1757
                WorkflowID: taskToken.WorkflowID,
932✔
1758
                RunID:      taskToken.RunID,
932✔
1759
        })
932✔
1760

932✔
1761
        if !common.ValidIDLength(
932✔
1762
                completeRequest.GetIdentity(),
932✔
1763
                scope,
932✔
1764
                wh.config.MaxIDLengthWarnLimit(),
932✔
1765
                wh.config.IdentityMaxLength(domainName),
932✔
1766
                metrics.CadenceErrIdentityExceededWarnLimit,
932✔
1767
                domainName,
932✔
1768
                wh.GetLogger(),
932✔
1769
                tag.IDTypeIdentity) {
932✔
1770
                return nil, wh.error(errIdentityTooLong, scope, tags...)
×
1771
        }
×
1772

1773
        if err := common.CheckDecisionResultLimit(
932✔
1774
                len(completeRequest.Decisions),
932✔
1775
                wh.config.DecisionResultCountLimit(domainName),
932✔
1776
                scope); err != nil {
932✔
1777
                return nil, wh.error(err, scope)
×
1778
        }
×
1779

1780
        histResp, err := wh.GetHistoryClient().RespondDecisionTaskCompleted(ctx, &types.HistoryRespondDecisionTaskCompletedRequest{
932✔
1781
                DomainUUID:      taskToken.DomainID,
932✔
1782
                CompleteRequest: completeRequest},
932✔
1783
        )
932✔
1784
        if err != nil {
941✔
1785
                return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
9✔
1786
        }
9✔
1787

1788
        completedResp := &types.RespondDecisionTaskCompletedResponse{}
923✔
1789
        completedResp.ActivitiesToDispatchLocally = histResp.ActivitiesToDispatchLocally
923✔
1790
        if completeRequest.GetReturnNewDecisionTask() && histResp != nil && histResp.StartedResponse != nil {
983✔
1791
                taskToken := &common.TaskToken{
60✔
1792
                        DomainID:        taskToken.DomainID,
60✔
1793
                        WorkflowID:      taskToken.WorkflowID,
60✔
1794
                        RunID:           taskToken.RunID,
60✔
1795
                        ScheduleID:      histResp.StartedResponse.GetScheduledEventID(),
60✔
1796
                        ScheduleAttempt: histResp.StartedResponse.GetAttempt(),
60✔
1797
                }
60✔
1798
                token, _ := wh.tokenSerializer.Serialize(taskToken)
60✔
1799
                workflowExecution := &types.WorkflowExecution{
60✔
1800
                        WorkflowID: taskToken.WorkflowID,
60✔
1801
                        RunID:      taskToken.RunID,
60✔
1802
                }
60✔
1803
                matchingResp := common.CreateMatchingPollForDecisionTaskResponse(histResp.StartedResponse, workflowExecution, token)
60✔
1804

60✔
1805
                newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, scope, taskToken.DomainID, matchingResp, matchingResp.GetBranchToken())
60✔
1806
                if err != nil {
60✔
1807
                        return nil, wh.error(err, scope, tags...)
×
1808
                }
×
1809
                completedResp.DecisionTask = newDecisionTask
60✔
1810
        }
1811

1812
        return completedResp, nil
923✔
1813
}
1814

1815
// RespondDecisionTaskFailed - failed response to a decision task
1816
func (wh *WorkflowHandler) RespondDecisionTaskFailed(
1817
        ctx context.Context,
1818
        failedRequest *types.RespondDecisionTaskFailedRequest,
1819
) (retError error) {
159✔
1820
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
318✔
1821

1822
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondDecisionTaskFailedScope)
159✔
1823

159✔
1824
        if wh.isShuttingDown() {
159✔
1825
                return errShuttingDown
×
1826
        }
×
1827

1828
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
159✔
1829
                return wh.error(err, scope)
×
1830
        }
×
1831

1832
        if failedRequest == nil {
159✔
1833
                return wh.error(errRequestNotSet, scope)
×
1834
        }
×
1835

1836
        if failedRequest.TaskToken == nil {
159✔
1837
                return wh.error(errTaskTokenNotSet, scope)
×
1838
        }
×
1839
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
159✔
1840
        if err != nil {
159✔
1841
                return wh.error(err, scope)
×
1842
        }
×
1843
        if taskToken.DomainID == "" {
159✔
1844
                return wh.error(errDomainNotSet, scope)
×
1845
        }
×
1846

1847
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
159✔
1848
        if err != nil {
159✔
1849
                return wh.error(err, scope)
×
1850
        }
×
1851

1852
        dw := domainWrapper{
159✔
1853
                domain: domainName,
159✔
1854
        }
159✔
1855
        scope, sw := wh.startRequestProfileWithDomain(
159✔
1856
                ctx,
159✔
1857
                metrics.FrontendRespondDecisionTaskFailedScope,
159✔
1858
                dw,
159✔
1859
        )
159✔
1860
        defer sw.Stop()
159✔
1861

159✔
1862
        // Count the request in the host RPS,
159✔
1863
        // but we still accept it even if RPS is exceeded
159✔
1864
        wh.allow(ratelimitTypeWorker, dw)
159✔
1865

159✔
1866
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
159✔
1867
                WorkflowID: taskToken.WorkflowID,
159✔
1868
                RunID:      taskToken.RunID,
159✔
1869
        })
159✔
1870

159✔
1871
        if !common.ValidIDLength(
159✔
1872
                failedRequest.GetIdentity(),
159✔
1873
                scope,
159✔
1874
                wh.config.MaxIDLengthWarnLimit(),
159✔
1875
                wh.config.IdentityMaxLength(domainName),
159✔
1876
                metrics.CadenceErrIdentityExceededWarnLimit,
159✔
1877
                domainName,
159✔
1878
                wh.GetLogger(),
159✔
1879
                tag.IDTypeIdentity) {
159✔
1880
                return wh.error(errIdentityTooLong, scope, tags...)
×
1881
        }
×
1882

1883
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
159✔
1884
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
159✔
1885

159✔
1886
        if err := common.CheckEventBlobSizeLimit(
159✔
1887
                len(failedRequest.Details),
159✔
1888
                sizeLimitWarn,
159✔
1889
                sizeLimitError,
159✔
1890
                taskToken.DomainID,
159✔
1891
                taskToken.WorkflowID,
159✔
1892
                taskToken.RunID,
159✔
1893
                scope,
159✔
1894
                wh.GetThrottledLogger(),
159✔
1895
                tag.BlobSizeViolationOperation("RespondDecisionTaskFailed"),
159✔
1896
        ); err != nil {
159✔
1897
                // details exceed, we would just truncate the size for decision task failed as the details is not used anywhere by client code
×
1898
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1899
        }
×
1900

1901
        err = wh.GetHistoryClient().RespondDecisionTaskFailed(ctx, &types.HistoryRespondDecisionTaskFailedRequest{
159✔
1902
                DomainUUID:    taskToken.DomainID,
159✔
1903
                FailedRequest: failedRequest,
159✔
1904
        })
159✔
1905
        if err != nil {
159✔
1906
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1907
        }
×
1908
        return nil
159✔
1909
}
1910

1911
// RespondQueryTaskCompleted - response to a query task
1912
func (wh *WorkflowHandler) RespondQueryTaskCompleted(
1913
        ctx context.Context,
1914
        completeRequest *types.RespondQueryTaskCompletedRequest,
1915
) (retError error) {
30✔
1916
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
60✔
1917

1918
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondQueryTaskCompletedScope)
30✔
1919

30✔
1920
        if wh.isShuttingDown() {
30✔
1921
                return errShuttingDown
×
1922
        }
×
1923

1924
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
30✔
1925
                return wh.error(err, scope)
×
1926
        }
×
1927

1928
        if completeRequest == nil {
30✔
1929
                return wh.error(errRequestNotSet, scope)
×
1930
        }
×
1931

1932
        if completeRequest.TaskToken == nil {
30✔
1933
                return wh.error(errTaskTokenNotSet, scope)
×
1934
        }
×
1935
        queryTaskToken, err := wh.tokenSerializer.DeserializeQueryTaskToken(completeRequest.TaskToken)
30✔
1936
        if err != nil {
30✔
1937
                return wh.error(err, scope)
×
1938
        }
×
1939
        if queryTaskToken.DomainID == "" || queryTaskToken.TaskList == "" || queryTaskToken.TaskID == "" {
30✔
1940
                return wh.error(errInvalidTaskToken, scope)
×
1941
        }
×
1942

1943
        domainName, err := wh.GetDomainCache().GetDomainName(queryTaskToken.DomainID)
30✔
1944
        if err != nil {
30✔
1945
                return wh.error(err, scope)
×
1946
        }
×
1947

1948
        dw := domainWrapper{
30✔
1949
                domain: domainName,
30✔
1950
        }
30✔
1951
        scope, sw := wh.startRequestProfileWithDomain(
30✔
1952
                ctx,
30✔
1953
                metrics.FrontendRespondQueryTaskCompletedScope,
30✔
1954
                dw,
30✔
1955
        )
30✔
1956
        defer sw.Stop()
30✔
1957

30✔
1958
        // Count the request in the host RPS,
30✔
1959
        // but we still accept it even if RPS is exceeded
30✔
1960
        wh.allow(ratelimitTypeWorker, dw)
30✔
1961

30✔
1962
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
30✔
1963
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
30✔
1964

30✔
1965
        if err := common.CheckEventBlobSizeLimit(
30✔
1966
                len(completeRequest.GetQueryResult()),
30✔
1967
                sizeLimitWarn,
30✔
1968
                sizeLimitError,
30✔
1969
                queryTaskToken.DomainID,
30✔
1970
                "",
30✔
1971
                "",
30✔
1972
                scope,
30✔
1973
                wh.GetThrottledLogger(),
30✔
1974
                tag.BlobSizeViolationOperation("RespondQueryTaskCompleted"),
30✔
1975
        ); err != nil {
30✔
1976
                completeRequest = &types.RespondQueryTaskCompletedRequest{
×
1977
                        TaskToken:     completeRequest.TaskToken,
×
1978
                        CompletedType: types.QueryTaskCompletedTypeFailed.Ptr(),
×
1979
                        QueryResult:   nil,
×
1980
                        ErrorMessage:  err.Error(),
×
1981
                }
×
1982
        }
×
1983

1984
        call := yarpc.CallFromContext(ctx)
30✔
1985

30✔
1986
        completeRequest.WorkerVersionInfo = &types.WorkerVersionInfo{
30✔
1987
                Impl:           call.Header(common.ClientImplHeaderName),
30✔
1988
                FeatureVersion: call.Header(common.FeatureVersionHeaderName),
30✔
1989
        }
30✔
1990
        matchingRequest := &types.MatchingRespondQueryTaskCompletedRequest{
30✔
1991
                DomainUUID:       queryTaskToken.DomainID,
30✔
1992
                TaskList:         &types.TaskList{Name: queryTaskToken.TaskList},
30✔
1993
                TaskID:           queryTaskToken.TaskID,
30✔
1994
                CompletedRequest: completeRequest,
30✔
1995
        }
30✔
1996

30✔
1997
        err = wh.GetMatchingClient().RespondQueryTaskCompleted(ctx, matchingRequest)
30✔
1998
        if err != nil {
30✔
1999
                return wh.error(err, scope)
×
2000
        }
×
2001
        return nil
30✔
2002
}
2003

2004
// StartWorkflowExecution - Creates a new workflow execution
2005
func (wh *WorkflowHandler) StartWorkflowExecution(
2006
        ctx context.Context,
2007
        startRequest *types.StartWorkflowExecutionRequest,
2008
) (resp *types.StartWorkflowExecutionResponse, retError error) {
453✔
2009
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
906✔
2010

2011
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendStartWorkflowExecutionScope, startRequest)
453✔
2012
        defer sw.Stop()
453✔
2013

453✔
2014
        if wh.isShuttingDown() {
453✔
2015
                return nil, errShuttingDown
×
2016
        }
×
2017

2018
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
453✔
2019
                return nil, wh.error(err, scope)
×
2020
        }
×
2021

2022
        if startRequest == nil {
454✔
2023
                return nil, wh.error(errRequestNotSet, scope)
1✔
2024
        }
1✔
2025

2026
        domainName := startRequest.GetDomain()
452✔
2027
        wfExecution := &types.WorkflowExecution{
452✔
2028
                WorkflowID: startRequest.GetWorkflowID(),
452✔
2029
        }
452✔
2030
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
452✔
2031

452✔
2032
        if domainName == "" {
453✔
2033
                return nil, wh.error(errDomainNotSet, scope, tags...)
1✔
2034
        }
1✔
2035

2036
        if ok := wh.allow(ratelimitTypeUser, startRequest); !ok {
451✔
2037
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
2038
        }
×
2039

2040
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
451✔
2041
        if !common.ValidIDLength(
451✔
2042
                domainName,
451✔
2043
                scope,
451✔
2044
                idLengthWarnLimit,
451✔
2045
                wh.config.DomainNameMaxLength(domainName),
451✔
2046
                metrics.CadenceErrDomainNameExceededWarnLimit,
451✔
2047
                domainName,
451✔
2048
                wh.GetLogger(),
451✔
2049
                tag.IDTypeDomainName) {
451✔
2050
                return nil, wh.error(errDomainTooLong, scope, tags...)
×
2051
        }
×
2052

2053
        if startRequest.GetWorkflowID() == "" {
452✔
2054
                return nil, wh.error(errWorkflowIDNotSet, scope, tags...)
1✔
2055
        }
1✔
2056

2057
        if !common.ValidIDLength(
450✔
2058
                startRequest.GetWorkflowID(),
450✔
2059
                scope,
450✔
2060
                idLengthWarnLimit,
450✔
2061
                wh.config.WorkflowIDMaxLength(domainName),
450✔
2062
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
450✔
2063
                domainName,
450✔
2064
                wh.GetLogger(),
450✔
2065
                tag.IDTypeWorkflowID) {
450✔
2066
                return nil, wh.error(errWorkflowIDTooLong, scope, tags...)
×
2067
        }
×
2068

2069
        if err := common.ValidateRetryPolicy(startRequest.RetryPolicy); err != nil {
450✔
2070
                return nil, wh.error(err, scope, tags...)
×
2071
        }
×
2072

2073
        if startRequest.GetCronSchedule() != "" {
468✔
2074
                if _, err := backoff.ValidateSchedule(startRequest.GetCronSchedule()); err != nil {
18✔
2075
                        return nil, wh.error(err, scope, tags...)
×
2076
                }
×
2077
        }
2078

2079
        wh.GetLogger().Debug(
450✔
2080
                "Received StartWorkflowExecution. WorkflowID",
450✔
2081
                tag.WorkflowID(startRequest.GetWorkflowID()))
450✔
2082

450✔
2083
        if startRequest.WorkflowType == nil || startRequest.WorkflowType.GetName() == "" {
451✔
2084
                return nil, wh.error(errWorkflowTypeNotSet, scope, tags...)
1✔
2085
        }
1✔
2086

2087
        if !common.ValidIDLength(
449✔
2088
                startRequest.WorkflowType.GetName(),
449✔
2089
                scope,
449✔
2090
                idLengthWarnLimit,
449✔
2091
                wh.config.WorkflowTypeMaxLength(domainName),
449✔
2092
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
449✔
2093
                domainName,
449✔
2094
                wh.GetLogger(),
449✔
2095
                tag.IDTypeWorkflowType) {
449✔
2096
                return nil, wh.error(errWorkflowTypeTooLong, scope, tags...)
×
2097
        }
×
2098

2099
        if err := wh.validateTaskList(startRequest.TaskList, scope, domainName); err != nil {
450✔
2100
                return nil, wh.error(err, scope, tags...)
1✔
2101
        }
1✔
2102

2103
        if startRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
449✔
2104
                return nil, wh.error(errInvalidExecutionStartToCloseTimeoutSeconds, scope, tags...)
1✔
2105
        }
1✔
2106

2107
        if startRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
448✔
2108
                return nil, wh.error(errInvalidTaskStartToCloseTimeoutSeconds, scope, tags...)
1✔
2109
        }
1✔
2110

2111
        if startRequest.GetDelayStartSeconds() < 0 {
447✔
2112
                return nil, wh.error(errInvalidDelayStartSeconds, scope, tags...)
1✔
2113
        }
1✔
2114

2115
        if startRequest.GetJitterStartSeconds() < 0 {
445✔
2116
                return nil, wh.error(errInvalidJitterStartSeconds, scope, tags...)
×
2117
        }
×
2118

2119
        jitter := startRequest.GetJitterStartSeconds()
445✔
2120
        cron := startRequest.GetCronSchedule()
445✔
2121
        if jitter > 0 && cron != "" {
445✔
2122
                // Calculate the cron duration and ensure that jitter is not greater than the cron duration,
×
2123
                // because that would be confusing to users.
×
2124

×
2125
                // Request using start/end time zero value, which will get us an exact answer (i.e. its not in the
×
2126
                // middle of a minute)
×
2127
                backoffSeconds, err := backoff.GetBackoffForNextScheduleInSeconds(cron, time.Time{}, time.Time{}, jitter)
×
2128
                if err != nil {
×
2129
                        tags = append(tags, tag.WorkflowCronSchedule(cron))
×
2130
                        return nil, wh.error(err, scope, tags...)
×
2131
                }
×
2132
                if jitter > backoffSeconds {
×
2133
                        return nil, wh.error(errInvalidJitterStartSeconds2, scope, tags...)
×
2134
                }
×
2135
        }
2136

2137
        if startRequest.GetRequestID() == "" {
446✔
2138
                return nil, wh.error(errRequestIDNotSet, scope, tags...)
1✔
2139
        }
1✔
2140

2141
        if !common.ValidIDLength(
444✔
2142
                startRequest.GetRequestID(),
444✔
2143
                scope,
444✔
2144
                idLengthWarnLimit,
444✔
2145
                wh.config.RequestIDMaxLength(domainName),
444✔
2146
                metrics.CadenceErrRequestIDExceededWarnLimit,
444✔
2147
                domainName,
444✔
2148
                wh.GetLogger(),
444✔
2149
                tag.IDTypeRequestID) {
444✔
2150
                return nil, wh.error(errRequestIDTooLong, scope, tags...)
×
2151
        }
×
2152

2153
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(startRequest.SearchAttributes, domainName); err != nil {
444✔
2154
                return nil, wh.error(err, scope, tags...)
×
2155
        }
×
2156

2157
        wh.GetLogger().Debug("Start workflow execution request domain", tag.WorkflowDomainName(domainName))
444✔
2158
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
444✔
2159
        if err != nil {
444✔
2160
                return nil, wh.error(err, scope, tags...)
×
2161
        }
×
2162

2163
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
444✔
2164
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
444✔
2165
        actualSize := len(startRequest.Input)
444✔
2166
        if startRequest.Memo != nil {
456✔
2167
                actualSize += common.GetSizeOfMapStringToByteArray(startRequest.Memo.GetFields())
12✔
2168
        }
12✔
2169
        if err := common.CheckEventBlobSizeLimit(
444✔
2170
                actualSize,
444✔
2171
                sizeLimitWarn,
444✔
2172
                sizeLimitError,
444✔
2173
                domainID,
444✔
2174
                startRequest.GetWorkflowID(),
444✔
2175
                "",
444✔
2176
                scope,
444✔
2177
                wh.GetThrottledLogger(),
444✔
2178
                tag.BlobSizeViolationOperation("StartWorkflowExecution"),
444✔
2179
        ); err != nil {
444✔
2180
                return nil, wh.error(err, scope, tags...)
×
2181
        }
×
2182

2183
        wh.GetLogger().Debug("Start workflow execution request domainID", tag.WorkflowDomainID(domainID))
444✔
2184
        historyRequest, err := common.CreateHistoryStartWorkflowRequest(
444✔
2185
                domainID, startRequest, time.Now())
444✔
2186
        if err != nil {
444✔
2187
                return nil, err
×
2188
        }
×
2189

2190
        resp, err = wh.GetHistoryClient().StartWorkflowExecution(ctx, historyRequest)
444✔
2191
        if err != nil {
462✔
2192
                return nil, wh.error(err, scope, tags...)
18✔
2193
        }
18✔
2194
        return resp, nil
426✔
2195
}
2196

2197
// GetWorkflowExecutionHistory - retrieves the history of workflow execution
2198
func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
2199
        ctx context.Context,
2200
        getRequest *types.GetWorkflowExecutionHistoryRequest,
2201
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
454✔
2202
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
908✔
2203

2204
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendGetWorkflowExecutionHistoryScope, getRequest)
454✔
2205
        defer sw.Stop()
454✔
2206

454✔
2207
        if wh.isShuttingDown() {
454✔
2208
                return nil, errShuttingDown
×
2209
        }
×
2210

2211
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
454✔
2212
                return nil, wh.error(err, scope)
×
2213
        }
×
2214

2215
        if getRequest == nil {
454✔
2216
                return nil, wh.error(errRequestNotSet, scope)
×
2217
        }
×
2218

2219
        domainName := getRequest.GetDomain()
454✔
2220
        wfExecution := getRequest.GetExecution()
454✔
2221
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
454✔
2222

454✔
2223
        if domainName == "" {
454✔
2224
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
2225
        }
×
2226

2227
        if ok := wh.allow(ratelimitTypeUser, getRequest); !ok {
454✔
2228
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
2229
        }
×
2230

2231
        if err := validateExecution(wfExecution); err != nil {
454✔
2232
                return nil, wh.error(err, scope, tags...)
×
2233
        }
×
2234

2235
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
454✔
2236
        if err != nil {
454✔
2237
                return nil, wh.error(err, scope, tags...)
×
2238
        }
×
2239

2240
        if getRequest.GetMaximumPageSize() <= 0 {
776✔
2241
                getRequest.MaximumPageSize = int32(wh.config.HistoryMaxPageSize(getRequest.GetDomain()))
322✔
2242
        }
322✔
2243
        // force limit page size if exceed
2244
        if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize {
454✔
2245
                wh.GetThrottledLogger().Warn("GetHistory page size is larger than threshold",
×
2246
                        tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2247
                        tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2248
                        tag.WorkflowDomainID(domainID),
×
2249
                        tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))
×
2250
                getRequest.MaximumPageSize = common.GetHistoryMaxPageSize
×
2251
        }
×
2252

2253
        if !getRequest.GetSkipArchival() {
890✔
2254
                enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled()
436✔
2255
                historyArchived := wh.historyArchived(ctx, getRequest, domainID)
436✔
2256
                if enableArchivalRead && historyArchived {
460✔
2257
                        return wh.getArchivedHistory(ctx, getRequest, domainID, scope, tags...)
24✔
2258
                }
24✔
2259
        }
2260

2261
        // this function return the following 6 things,
2262
        // 1. branch token
2263
        // 2. the workflow run ID
2264
        // 3. the last first event ID (the event ID of the last batch of events in the history)
2265
        // 4. the next event ID
2266
        // 5. whether the workflow is closed
2267
        // 6. error if any
2268
        queryHistory := func(
430✔
2269
                domainUUID string,
430✔
2270
                execution *types.WorkflowExecution,
430✔
2271
                expectedNextEventID int64,
430✔
2272
                currentBranchToken []byte,
430✔
2273
        ) ([]byte, string, int64, int64, bool, error) {
816✔
2274
                response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
386✔
2275
                        DomainUUID:          domainUUID,
386✔
2276
                        Execution:           execution,
386✔
2277
                        ExpectedNextEventID: expectedNextEventID,
386✔
2278
                        CurrentBranchToken:  currentBranchToken,
386✔
2279
                })
386✔
2280

386✔
2281
                if err != nil {
387✔
2282
                        return nil, "", 0, 0, false, err
1✔
2283
                }
1✔
2284
                isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
385✔
2285

385✔
2286
                return response.CurrentBranchToken,
385✔
2287
                        response.Execution.GetRunID(),
385✔
2288
                        response.GetLastFirstEventID(),
385✔
2289
                        response.GetNextEventID(),
385✔
2290
                        isWorkflowRunning,
385✔
2291
                        nil
385✔
2292
        }
2293

2294
        isLongPoll := getRequest.GetWaitForNewEvent()
430✔
2295
        isCloseEventOnly := getRequest.GetHistoryEventFilterType() == types.HistoryEventFilterTypeCloseEvent
430✔
2296
        execution := getRequest.Execution
430✔
2297
        token := &getHistoryContinuationToken{}
430✔
2298

430✔
2299
        var runID string
430✔
2300
        lastFirstEventID := common.FirstEventID
430✔
2301
        var nextEventID int64
430✔
2302
        var isWorkflowRunning bool
430✔
2303

430✔
2304
        // process the token for paging
430✔
2305
        queryNextEventID := common.EndEventID
430✔
2306
        if getRequest.NextPageToken != nil {
474✔
2307
                token, err = deserializeHistoryToken(getRequest.NextPageToken)
44✔
2308
                if err != nil {
44✔
2309
                        return nil, wh.error(errInvalidNextPageToken, scope, tags...)
×
2310
                }
×
2311
                if execution.RunID != "" && execution.GetRunID() != token.RunID {
44✔
2312
                        return nil, wh.error(errNextPageTokenRunIDMismatch, scope, tags...)
×
2313
                }
×
2314

2315
                execution.RunID = token.RunID
44✔
2316

44✔
2317
                // we need to update the current next event ID and whether workflow is running
44✔
2318
                if len(token.PersistenceToken) == 0 && isLongPoll && token.IsWorkflowRunning {
44✔
2319
                        logger := wh.GetLogger().WithTags(
×
2320
                                tag.WorkflowDomainName(getRequest.GetDomain()),
×
2321
                                tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2322
                                tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2323
                        )
×
2324
                        // TODO: for now we only log the invalid timeout (this is done inside the helper function) in case
×
2325
                        // this change breaks existing customers. Once we are sure no one is calling this API with very short timeout
×
2326
                        // we can return the error.
×
2327
                        _ = common.ValidateLongPollContextTimeout(ctx, "GetWorkflowExecutionHistory", logger)
×
2328

×
2329
                        if !isCloseEventOnly {
×
2330
                                queryNextEventID = token.NextEventID
×
2331
                        }
×
2332
                        token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
×
2333
                                queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
×
2334
                        if err != nil {
×
2335
                                return nil, wh.error(err, scope, tags...)
×
2336
                        }
×
2337
                        token.FirstEventID = token.NextEventID
×
2338
                        token.NextEventID = nextEventID
×
2339
                        token.IsWorkflowRunning = isWorkflowRunning
×
2340
                }
2341
        } else {
386✔
2342
                if !isCloseEventOnly {
754✔
2343
                        queryNextEventID = common.FirstEventID
368✔
2344
                }
368✔
2345
                token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
386✔
2346
                        queryHistory(domainID, execution, queryNextEventID, nil)
386✔
2347
                if err != nil {
387✔
2348
                        return nil, wh.error(err, scope, tags...)
1✔
2349
                }
1✔
2350

2351
                execution.RunID = runID
385✔
2352

385✔
2353
                token.RunID = runID
385✔
2354
                token.FirstEventID = common.FirstEventID
385✔
2355
                token.NextEventID = nextEventID
385✔
2356
                token.IsWorkflowRunning = isWorkflowRunning
385✔
2357
                token.PersistenceToken = nil
385✔
2358
        }
2359

2360
        call := yarpc.CallFromContext(ctx)
429✔
2361
        clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
429✔
2362
        clientImpl := call.Header(common.ClientImplHeaderName)
429✔
2363
        supportsRawHistoryQuery := wh.versionChecker.SupportsRawHistoryQuery(clientImpl, clientFeatureVersion) == nil
429✔
2364
        isRawHistoryEnabled := wh.config.SendRawWorkflowHistory(domainName) && supportsRawHistoryQuery
429✔
2365

429✔
2366
        history := &types.History{}
429✔
2367
        history.Events = []*types.HistoryEvent{}
429✔
2368
        var historyBlob []*types.DataBlob
429✔
2369

429✔
2370
        // helper function to just getHistory
429✔
2371
        getHistory := func(firstEventID, nextEventID int64, nextPageToken []byte) error {
858✔
2372
                if isRawHistoryEnabled {
431✔
2373
                        historyBlob, token.PersistenceToken, err = wh.getRawHistory(
2✔
2374
                                ctx,
2✔
2375
                                scope,
2✔
2376
                                domainID,
2✔
2377
                                domainName,
2✔
2378
                                *execution,
2✔
2379
                                firstEventID,
2✔
2380
                                nextEventID,
2✔
2381
                                getRequest.GetMaximumPageSize(),
2✔
2382
                                nextPageToken,
2✔
2383
                                token.TransientDecision,
2✔
2384
                                token.BranchToken,
2✔
2385
                        )
2✔
2386
                } else {
429✔
2387
                        history, token.PersistenceToken, err = wh.getHistory(
427✔
2388
                                ctx,
427✔
2389
                                scope,
427✔
2390
                                domainID,
427✔
2391
                                domainName,
427✔
2392
                                *execution,
427✔
2393
                                firstEventID,
427✔
2394
                                nextEventID,
427✔
2395
                                getRequest.GetMaximumPageSize(),
427✔
2396
                                nextPageToken,
427✔
2397
                                token.TransientDecision,
427✔
2398
                                token.BranchToken,
427✔
2399
                        )
427✔
2400
                }
427✔
2401
                if err != nil {
429✔
2402
                        return err
×
2403
                }
×
2404
                return nil
429✔
2405
        }
2406

2407
        if isCloseEventOnly {
447✔
2408
                if !isWorkflowRunning {
36✔
2409
                        if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil {
18✔
2410
                                return nil, wh.error(err, scope, tags...)
×
2411
                        }
×
2412
                        if isRawHistoryEnabled {
18✔
2413
                                // since getHistory func will not return empty history, so the below is safe
×
2414
                                historyBlob = historyBlob[len(historyBlob)-1:]
×
2415
                        } else {
18✔
2416
                                // since getHistory func will not return empty history, so the below is safe
18✔
2417
                                history.Events = history.Events[len(history.Events)-1:]
18✔
2418
                        }
18✔
2419
                        token = nil
18✔
2420
                } else if isLongPoll {
×
2421
                        // set the persistence token to be nil so next time we will query history for updates
×
2422
                        token.PersistenceToken = nil
×
2423
                } else {
×
2424
                        token = nil
×
2425
                }
×
2426
        } else {
411✔
2427
                // return all events
411✔
2428
                if token.FirstEventID >= token.NextEventID {
411✔
2429
                        // currently there is no new event
×
2430
                        history.Events = []*types.HistoryEvent{}
×
2431
                        if !isWorkflowRunning {
×
2432
                                token = nil
×
2433
                        }
×
2434
                } else {
411✔
2435
                        if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil {
411✔
2436
                                return nil, wh.error(err, scope, tags...)
×
2437
                        }
×
2438
                        // here, for long pull on history events, we need to intercept the paging token from cassandra
2439
                        // and do something clever
2440
                        if len(token.PersistenceToken) == 0 && (!token.IsWorkflowRunning || !isLongPoll) {
780✔
2441
                                // meaning, there is no more history to be returned
369✔
2442
                                token = nil
369✔
2443
                        }
369✔
2444
                }
2445
        }
2446

2447
        nextToken, err := serializeHistoryToken(token)
429✔
2448
        if err != nil {
429✔
2449
                return nil, wh.error(err, scope, tags...)
×
2450
        }
×
2451
        return &types.GetWorkflowExecutionHistoryResponse{
429✔
2452
                History:       history,
429✔
2453
                RawHistory:    historyBlob,
429✔
2454
                NextPageToken: nextToken,
429✔
2455
                Archived:      false,
429✔
2456
        }, nil
429✔
2457
}
2458

2459
func (wh *WorkflowHandler) withSignalName(
2460
        ctx context.Context,
2461
        domainName string,
2462
        signalName string,
2463
) context.Context {
724✔
2464
        if wh.config.EmitSignalNameMetricsTag(domainName) {
725✔
2465
                return metrics.TagContext(ctx, metrics.SignalNameTag(signalName))
1✔
2466
        }
1✔
2467
        return ctx
723✔
2468
}
2469

2470
// SignalWorkflowExecution is used to send a signal event to running workflow execution.  This results in
2471
// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
2472
func (wh *WorkflowHandler) SignalWorkflowExecution(
2473
        ctx context.Context,
2474
        signalRequest *types.SignalWorkflowExecutionRequest,
2475
) (retError error) {
724✔
2476
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
1,448✔
2477

2478
        ctx = wh.withSignalName(ctx, signalRequest.GetDomain(), signalRequest.GetSignalName())
724✔
2479
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendSignalWorkflowExecutionScope, signalRequest)
724✔
2480
        defer sw.Stop()
724✔
2481

724✔
2482
        if wh.isShuttingDown() {
724✔
2483
                return errShuttingDown
×
2484
        }
×
2485

2486
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
724✔
2487
                return wh.error(err, scope)
×
2488
        }
×
2489

2490
        if signalRequest == nil {
724✔
2491
                return wh.error(errRequestNotSet, scope)
×
2492
        }
×
2493

2494
        domainName := signalRequest.GetDomain()
724✔
2495
        wfExecution := signalRequest.GetWorkflowExecution()
724✔
2496
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
724✔
2497

724✔
2498
        if domainName == "" {
725✔
2499
                return wh.error(errDomainNotSet, scope, tags...)
1✔
2500
        }
1✔
2501

2502
        if ok := wh.allow(ratelimitTypeUser, signalRequest); !ok {
723✔
2503
                return wh.error(createServiceBusyError(), scope, tags...)
×
2504
        }
×
2505

2506
        if err := validateExecution(wfExecution); err != nil {
723✔
2507
                return wh.error(err, scope, tags...)
×
2508
        }
×
2509

2510
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
723✔
2511
        if !common.ValidIDLength(
723✔
2512
                domainName,
723✔
2513
                scope,
723✔
2514
                idLengthWarnLimit,
723✔
2515
                wh.config.DomainNameMaxLength(domainName),
723✔
2516
                metrics.CadenceErrDomainNameExceededWarnLimit,
723✔
2517
                domainName,
723✔
2518
                wh.GetLogger(),
723✔
2519
                tag.IDTypeDomainName) {
723✔
2520
                return wh.error(errDomainTooLong, scope, tags...)
×
2521
        }
×
2522

2523
        if signalRequest.GetSignalName() == "" {
723✔
2524
                return wh.error(errSignalNameNotSet, scope, tags...)
×
2525
        }
×
2526

2527
        if !common.ValidIDLength(
723✔
2528
                signalRequest.GetSignalName(),
723✔
2529
                scope,
723✔
2530
                idLengthWarnLimit,
723✔
2531
                wh.config.SignalNameMaxLength(domainName),
723✔
2532
                metrics.CadenceErrSignalNameExceededWarnLimit,
723✔
2533
                domainName,
723✔
2534
                wh.GetLogger(),
723✔
2535
                tag.IDTypeSignalName) {
723✔
2536
                return wh.error(errSignalNameTooLong, scope, tags...)
×
2537
        }
×
2538

2539
        if !common.ValidIDLength(
723✔
2540
                signalRequest.GetRequestID(),
723✔
2541
                scope,
723✔
2542
                idLengthWarnLimit,
723✔
2543
                wh.config.RequestIDMaxLength(domainName),
723✔
2544
                metrics.CadenceErrRequestIDExceededWarnLimit,
723✔
2545
                domainName,
723✔
2546
                wh.GetLogger(),
723✔
2547
                tag.IDTypeRequestID) {
723✔
2548
                return wh.error(errRequestIDTooLong, scope, tags...)
×
2549
        }
×
2550

2551
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
723✔
2552
        if err != nil {
723✔
2553
                return wh.error(err, scope, tags...)
×
2554
        }
×
2555

2556
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
723✔
2557
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
723✔
2558
        if err := common.CheckEventBlobSizeLimit(
723✔
2559
                len(signalRequest.Input),
723✔
2560
                sizeLimitWarn,
723✔
2561
                sizeLimitError,
723✔
2562
                domainID,
723✔
2563
                signalRequest.GetWorkflowExecution().GetWorkflowID(),
723✔
2564
                signalRequest.GetWorkflowExecution().GetRunID(),
723✔
2565
                scope,
723✔
2566
                wh.GetThrottledLogger(),
723✔
2567
                tag.BlobSizeViolationOperation("SignalWorkflowExecution"),
723✔
2568
        ); err != nil {
723✔
2569
                return wh.error(err, scope, tags...)
×
2570
        }
×
2571

2572
        err = wh.GetHistoryClient().SignalWorkflowExecution(ctx, &types.HistorySignalWorkflowExecutionRequest{
723✔
2573
                DomainUUID:    domainID,
723✔
2574
                SignalRequest: signalRequest,
723✔
2575
        })
723✔
2576
        if err != nil {
732✔
2577
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
9✔
2578
        }
9✔
2579

2580
        return nil
714✔
2581
}
2582

2583
// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
2584
// If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history
2585
// and a decision task being created for the execution.
2586
// If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled
2587
// event recorded in history, and a decision task being created for the execution
2588
func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(
2589
        ctx context.Context,
2590
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest,
2591
) (resp *types.StartWorkflowExecutionResponse, retError error) {
33✔
2592
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
66✔
2593

2594
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendSignalWithStartWorkflowExecutionScope, signalWithStartRequest)
33✔
2595
        defer sw.Stop()
33✔
2596

33✔
2597
        if wh.isShuttingDown() {
33✔
2598
                return nil, errShuttingDown
×
2599
        }
×
2600

2601
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
33✔
2602
                return nil, wh.error(err, scope)
×
2603
        }
×
2604

2605
        if signalWithStartRequest == nil {
33✔
2606
                return nil, wh.error(errRequestNotSet, scope)
×
2607
        }
×
2608

2609
        domainName := signalWithStartRequest.GetDomain()
33✔
2610
        wfExecution := &types.WorkflowExecution{
33✔
2611
                WorkflowID: signalWithStartRequest.GetWorkflowID(),
33✔
2612
        }
33✔
2613
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
33✔
2614

33✔
2615
        if domainName == "" {
33✔
2616
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
2617
        }
×
2618

2619
        if ok := wh.allow(ratelimitTypeUser, signalWithStartRequest); !ok {
33✔
2620
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
2621
        }
×
2622

2623
        if signalWithStartRequest.GetWorkflowID() == "" {
33✔
2624
                return nil, wh.error(errWorkflowIDNotSet, scope, tags...)
×
2625
        }
×
2626

2627
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
33✔
2628
        if !common.ValidIDLength(
33✔
2629
                domainName,
33✔
2630
                scope,
33✔
2631
                idLengthWarnLimit,
33✔
2632
                wh.config.DomainNameMaxLength(domainName),
33✔
2633
                metrics.CadenceErrDomainNameExceededWarnLimit,
33✔
2634
                domainName,
33✔
2635
                wh.GetLogger(),
33✔
2636
                tag.IDTypeDomainName) {
33✔
2637
                return nil, wh.error(errDomainTooLong, scope, tags...)
×
2638
        }
×
2639

2640
        if !common.ValidIDLength(
33✔
2641
                signalWithStartRequest.GetWorkflowID(),
33✔
2642
                scope,
33✔
2643
                idLengthWarnLimit,
33✔
2644
                wh.config.WorkflowIDMaxLength(domainName),
33✔
2645
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
33✔
2646
                domainName,
33✔
2647
                wh.GetLogger(),
33✔
2648
                tag.IDTypeWorkflowID) {
33✔
2649
                return nil, wh.error(errWorkflowIDTooLong, scope, tags...)
×
2650
        }
×
2651

2652
        if signalWithStartRequest.GetSignalName() == "" {
33✔
2653
                return nil, wh.error(errSignalNameNotSet, scope, tags...)
×
2654
        }
×
2655

2656
        if !common.ValidIDLength(
33✔
2657
                signalWithStartRequest.GetSignalName(),
33✔
2658
                scope,
33✔
2659
                idLengthWarnLimit,
33✔
2660
                wh.config.SignalNameMaxLength(domainName),
33✔
2661
                metrics.CadenceErrSignalNameExceededWarnLimit,
33✔
2662
                domainName,
33✔
2663
                wh.GetLogger(),
33✔
2664
                tag.IDTypeSignalName) {
33✔
2665
                return nil, wh.error(errSignalNameTooLong, scope, tags...)
×
2666
        }
×
2667

2668
        if signalWithStartRequest.WorkflowType == nil || signalWithStartRequest.WorkflowType.GetName() == "" {
33✔
2669
                return nil, wh.error(errWorkflowTypeNotSet, scope, tags...)
×
2670
        }
×
2671

2672
        if !common.ValidIDLength(
33✔
2673
                signalWithStartRequest.WorkflowType.GetName(),
33✔
2674
                scope,
33✔
2675
                idLengthWarnLimit,
33✔
2676
                wh.config.WorkflowTypeMaxLength(domainName),
33✔
2677
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
33✔
2678
                domainName,
33✔
2679
                wh.GetLogger(),
33✔
2680
                tag.IDTypeWorkflowType) {
33✔
2681
                return nil, wh.error(errWorkflowTypeTooLong, scope, tags...)
×
2682
        }
×
2683

2684
        if err := wh.validateTaskList(signalWithStartRequest.TaskList, scope, domainName); err != nil {
33✔
2685
                return nil, wh.error(err, scope, tags...)
×
2686
        }
×
2687

2688
        if !common.ValidIDLength(
33✔
2689
                signalWithStartRequest.GetRequestID(),
33✔
2690
                scope,
33✔
2691
                idLengthWarnLimit,
33✔
2692
                wh.config.RequestIDMaxLength(domainName),
33✔
2693
                metrics.CadenceErrRequestIDExceededWarnLimit,
33✔
2694
                domainName,
33✔
2695
                wh.GetLogger(),
33✔
2696
                tag.IDTypeRequestID) {
33✔
2697
                return nil, wh.error(errRequestIDTooLong, scope, tags...)
×
2698
        }
×
2699

2700
        if signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
33✔
2701
                return nil, wh.error(errInvalidExecutionStartToCloseTimeoutSeconds, scope, tags...)
×
2702
        }
×
2703

2704
        if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
33✔
2705
                return nil, wh.error(errInvalidTaskStartToCloseTimeoutSeconds, scope, tags...)
×
2706
        }
×
2707

2708
        if err := common.ValidateRetryPolicy(signalWithStartRequest.RetryPolicy); err != nil {
33✔
2709
                return nil, wh.error(err, scope, tags...)
×
2710
        }
×
2711

2712
        if signalWithStartRequest.GetCronSchedule() != "" {
33✔
2713
                if _, err := backoff.ValidateSchedule(signalWithStartRequest.GetCronSchedule()); err != nil {
×
2714
                        return nil, wh.error(err, scope, tags...)
×
2715
                }
×
2716
        }
2717

2718
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(signalWithStartRequest.SearchAttributes, domainName); err != nil {
33✔
2719
                return nil, wh.error(err, scope, tags...)
×
2720
        }
×
2721

2722
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
33✔
2723
        if err != nil {
33✔
2724
                return nil, wh.error(err, scope, tags...)
×
2725
        }
×
2726

2727
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
33✔
2728
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
33✔
2729
        if err := common.CheckEventBlobSizeLimit(
33✔
2730
                len(signalWithStartRequest.SignalInput),
33✔
2731
                sizeLimitWarn,
33✔
2732
                sizeLimitError,
33✔
2733
                domainID,
33✔
2734
                signalWithStartRequest.GetWorkflowID(),
33✔
2735
                "",
33✔
2736
                scope,
33✔
2737
                wh.GetThrottledLogger(),
33✔
2738
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
33✔
2739
        ); err != nil {
33✔
2740
                return nil, wh.error(err, scope, tags...)
×
2741
        }
×
2742
        actualSize := len(signalWithStartRequest.Input) + common.GetSizeOfMapStringToByteArray(signalWithStartRequest.Memo.GetFields())
33✔
2743
        if err := common.CheckEventBlobSizeLimit(
33✔
2744
                actualSize,
33✔
2745
                sizeLimitWarn,
33✔
2746
                sizeLimitError,
33✔
2747
                domainID,
33✔
2748
                signalWithStartRequest.GetWorkflowID(),
33✔
2749
                "",
33✔
2750
                scope,
33✔
2751
                wh.GetThrottledLogger(),
33✔
2752
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
33✔
2753
        ); err != nil {
33✔
2754
                return nil, wh.error(err, scope, tags...)
×
2755
        }
×
2756

2757
        resp, err = wh.GetHistoryClient().SignalWithStartWorkflowExecution(ctx, &types.HistorySignalWithStartWorkflowExecutionRequest{
33✔
2758
                DomainUUID:             domainID,
33✔
2759
                SignalWithStartRequest: signalWithStartRequest,
33✔
2760
        })
33✔
2761
        if err != nil {
39✔
2762
                return nil, wh.error(err, scope, tags...)
6✔
2763
        }
6✔
2764

2765
        return resp, nil
27✔
2766
}
2767

2768
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event
2769
// in the history and immediately terminating the execution instance.
2770
func (wh *WorkflowHandler) TerminateWorkflowExecution(
2771
        ctx context.Context,
2772
        terminateRequest *types.TerminateWorkflowExecutionRequest,
2773
) (retError error) {
48✔
2774
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
96✔
2775

2776
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendTerminateWorkflowExecutionScope, terminateRequest)
48✔
2777
        defer sw.Stop()
48✔
2778

48✔
2779
        if wh.isShuttingDown() {
48✔
2780
                return errShuttingDown
×
2781
        }
×
2782

2783
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
48✔
2784
                return wh.error(err, scope)
×
2785
        }
×
2786

2787
        if terminateRequest == nil {
48✔
2788
                return wh.error(errRequestNotSet, scope)
×
2789
        }
×
2790

2791
        domainName := terminateRequest.GetDomain()
48✔
2792
        wfExecution := terminateRequest.GetWorkflowExecution()
48✔
2793
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
48✔
2794

48✔
2795
        if terminateRequest.GetDomain() == "" {
48✔
2796
                return wh.error(errDomainNotSet, scope, tags...)
×
2797
        }
×
2798

2799
        if ok := wh.allow(ratelimitTypeUser, terminateRequest); !ok {
48✔
2800
                return wh.error(createServiceBusyError(), scope, tags...)
×
2801
        }
×
2802

2803
        if err := validateExecution(wfExecution); err != nil {
48✔
2804
                return wh.error(err, scope, tags...)
×
2805
        }
×
2806

2807
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
48✔
2808
        if err != nil {
48✔
2809
                return wh.error(err, scope, tags...)
×
2810
        }
×
2811

2812
        err = wh.GetHistoryClient().TerminateWorkflowExecution(ctx, &types.HistoryTerminateWorkflowExecutionRequest{
48✔
2813
                DomainUUID:       domainID,
48✔
2814
                TerminateRequest: terminateRequest,
48✔
2815
        })
48✔
2816
        if err != nil {
48✔
2817
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
2818
        }
×
2819

2820
        return nil
48✔
2821
}
2822

2823
// ResetWorkflowExecution reset an existing workflow execution to the nextFirstEventID
2824
// in the history and immediately terminating the current execution instance.
2825
func (wh *WorkflowHandler) ResetWorkflowExecution(
2826
        ctx context.Context,
2827
        resetRequest *types.ResetWorkflowExecutionRequest,
2828
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
15✔
2829
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
30✔
2830

2831
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendResetWorkflowExecutionScope, resetRequest)
15✔
2832
        defer sw.Stop()
15✔
2833

15✔
2834
        if wh.isShuttingDown() {
15✔
2835
                return nil, errShuttingDown
×
2836
        }
×
2837

2838
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2839
                return nil, wh.error(err, scope)
×
2840
        }
×
2841

2842
        if resetRequest == nil {
15✔
2843
                return nil, wh.error(errRequestNotSet, scope)
×
2844
        }
×
2845

2846
        domainName := resetRequest.GetDomain()
15✔
2847
        wfExecution := resetRequest.GetWorkflowExecution()
15✔
2848
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
15✔
2849

15✔
2850
        if domainName == "" {
15✔
2851
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
2852
        }
×
2853

2854
        if ok := wh.allow(ratelimitTypeUser, resetRequest); !ok {
15✔
2855
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
2856
        }
×
2857

2858
        if err := validateExecution(wfExecution); err != nil {
15✔
2859
                return nil, wh.error(err, scope, tags...)
×
2860
        }
×
2861

2862
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
15✔
2863
        if err != nil {
15✔
2864
                return nil, wh.error(err, scope, tags...)
×
2865
        }
×
2866

2867
        resp, err = wh.GetHistoryClient().ResetWorkflowExecution(ctx, &types.HistoryResetWorkflowExecutionRequest{
15✔
2868
                DomainUUID:   domainID,
15✔
2869
                ResetRequest: resetRequest,
15✔
2870
        })
15✔
2871
        if err != nil {
15✔
2872
                return nil, wh.error(err, scope, tags...)
×
2873
        }
×
2874

2875
        return resp, nil
15✔
2876
}
2877

2878
// RequestCancelWorkflowExecution - requests to cancel a workflow execution
2879
func (wh *WorkflowHandler) RequestCancelWorkflowExecution(
2880
        ctx context.Context,
2881
        cancelRequest *types.RequestCancelWorkflowExecutionRequest,
2882
) (retError error) {
6✔
2883
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
12✔
2884

2885
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRequestCancelWorkflowExecutionScope, cancelRequest)
6✔
2886
        defer sw.Stop()
6✔
2887

6✔
2888
        if wh.isShuttingDown() {
6✔
2889
                return errShuttingDown
×
2890
        }
×
2891

2892
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
6✔
2893
                return wh.error(err, scope)
×
2894
        }
×
2895

2896
        if cancelRequest == nil {
6✔
2897
                return wh.error(errRequestNotSet, scope)
×
2898
        }
×
2899

2900
        domainName := cancelRequest.GetDomain()
6✔
2901
        wfExecution := cancelRequest.GetWorkflowExecution()
6✔
2902
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
6✔
2903

6✔
2904
        if domainName == "" {
6✔
2905
                return wh.error(errDomainNotSet, scope, tags...)
×
2906
        }
×
2907

2908
        if ok := wh.allow(ratelimitTypeUser, cancelRequest); !ok {
6✔
2909
                return wh.error(createServiceBusyError(), scope, tags...)
×
2910
        }
×
2911

2912
        if err := validateExecution(wfExecution); err != nil {
6✔
2913
                return wh.error(err, scope, tags...)
×
2914
        }
×
2915

2916
        domainID, err := wh.GetDomainCache().GetDomainID(cancelRequest.GetDomain())
6✔
2917
        if err != nil {
6✔
2918
                return wh.error(err, scope, tags...)
×
2919
        }
×
2920

2921
        err = wh.GetHistoryClient().RequestCancelWorkflowExecution(ctx, &types.HistoryRequestCancelWorkflowExecutionRequest{
6✔
2922
                DomainUUID:    domainID,
6✔
2923
                CancelRequest: cancelRequest,
6✔
2924
        })
6✔
2925
        if err != nil {
9✔
2926
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
3✔
2927
        }
3✔
2928

2929
        return nil
3✔
2930
}
2931

2932
// ListOpenWorkflowExecutions - retrieves info for open workflow executions in a domain
2933
func (wh *WorkflowHandler) ListOpenWorkflowExecutions(
2934
        ctx context.Context,
2935
        listRequest *types.ListOpenWorkflowExecutionsRequest,
2936
) (resp *types.ListOpenWorkflowExecutionsResponse, retError error) {
103✔
2937
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
206✔
2938

2939
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendListOpenWorkflowExecutionsScope, listRequest)
103✔
2940
        defer sw.Stop()
103✔
2941

103✔
2942
        if wh.isShuttingDown() {
103✔
2943
                return nil, errShuttingDown
×
2944
        }
×
2945

2946
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
103✔
2947
                return nil, wh.error(err, scope)
×
2948
        }
×
2949

2950
        if listRequest == nil {
103✔
2951
                return nil, wh.error(errRequestNotSet, scope)
×
2952
        }
×
2953

2954
        if listRequest.GetDomain() == "" {
103✔
2955
                return nil, wh.error(errDomainNotSet, scope)
×
2956
        }
×
2957

2958
        if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok {
103✔
2959
                return nil, wh.error(createServiceBusyError(), scope)
×
2960
        }
×
2961

2962
        if listRequest.StartTimeFilter == nil {
103✔
2963
                return nil, wh.error(&types.BadRequestError{Message: "StartTimeFilter is required"}, scope)
×
2964
        }
×
2965

2966
        if listRequest.StartTimeFilter.EarliestTime == nil {
103✔
2967
                return nil, wh.error(&types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}, scope)
×
2968
        }
×
2969

2970
        if listRequest.StartTimeFilter.LatestTime == nil {
103✔
2971
                return nil, wh.error(&types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}, scope)
×
2972
        }
×
2973

2974
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
103✔
2975
                return nil, wh.error(&types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}, scope)
×
2976
        }
×
2977

2978
        if listRequest.ExecutionFilter != nil && listRequest.TypeFilter != nil {
103✔
2979
                return nil, wh.error(&types.BadRequestError{
×
2980
                        Message: "Only one of ExecutionFilter or TypeFilter is allowed"}, scope)
×
2981
        }
×
2982

2983
        if listRequest.GetMaximumPageSize() <= 0 {
164✔
2984
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
61✔
2985
        }
61✔
2986

2987
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
103✔
2988
                return nil, wh.error(&types.BadRequestError{
×
2989
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
×
2990
        }
×
2991

2992
        domain := listRequest.GetDomain()
103✔
2993
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
103✔
2994
        if err != nil {
103✔
2995
                return nil, wh.error(err, scope)
×
2996
        }
×
2997

2998
        baseReq := persistence.ListWorkflowExecutionsRequest{
103✔
2999
                DomainUUID:    domainID,
103✔
3000
                Domain:        domain,
103✔
3001
                PageSize:      int(listRequest.GetMaximumPageSize()),
103✔
3002
                NextPageToken: listRequest.NextPageToken,
103✔
3003
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
103✔
3004
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
103✔
3005
        }
103✔
3006

103✔
3007
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
103✔
3008
        if listRequest.ExecutionFilter != nil {
200✔
3009
                if wh.config.DisableListVisibilityByFilter(domain) {
98✔
3010
                        err = errNoPermission
1✔
3011
                } else {
97✔
3012
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByWorkflowID(
96✔
3013
                                ctx,
96✔
3014
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
96✔
3015
                                        ListWorkflowExecutionsRequest: baseReq,
96✔
3016
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
96✔
3017
                                })
96✔
3018
                }
96✔
3019
                wh.GetLogger().Debug("List open workflow with filter",
97✔
3020
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
97✔
3021
        } else if listRequest.TypeFilter != nil {
7✔
3022
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
3023
                        err = errNoPermission
1✔
3024
                } else {
1✔
3025
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByType(
×
3026
                                ctx,
×
3027
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
3028
                                        ListWorkflowExecutionsRequest: baseReq,
×
3029
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
3030
                                },
×
3031
                        )
×
3032
                }
×
3033
                wh.GetLogger().Debug("List open workflow with filter",
1✔
3034
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
3035
        } else {
5✔
3036
                persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutions(ctx, &baseReq)
5✔
3037
        }
5✔
3038

3039
        if err != nil {
105✔
3040
                return nil, wh.error(err, scope)
2✔
3041
        }
2✔
3042

3043
        resp = &types.ListOpenWorkflowExecutionsResponse{}
101✔
3044
        resp.Executions = persistenceResp.Executions
101✔
3045
        resp.NextPageToken = persistenceResp.NextPageToken
101✔
3046
        return resp, nil
101✔
3047
}
3048

3049
// ListArchivedWorkflowExecutions - retrieves archived info for closed workflow executions in a domain
3050
func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(
3051
        ctx context.Context,
3052
        listRequest *types.ListArchivedWorkflowExecutionsRequest,
3053
) (resp *types.ListArchivedWorkflowExecutionsResponse, retError error) {
15✔
3054
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
30✔
3055

3056
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendListArchivedWorkflowExecutionsScope, listRequest)
15✔
3057
        defer sw.Stop()
15✔
3058

15✔
3059
        if wh.isShuttingDown() {
15✔
3060
                return nil, errShuttingDown
×
3061
        }
×
3062

3063
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
3064
                return nil, wh.error(err, scope)
×
3065
        }
×
3066

3067
        if listRequest == nil {
15✔
3068
                return nil, wh.error(errRequestNotSet, scope)
×
3069
        }
×
3070

3071
        if listRequest.GetDomain() == "" {
16✔
3072
                return nil, wh.error(errDomainNotSet, scope)
1✔
3073
        }
1✔
3074

3075
        if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok {
14✔
3076
                return nil, wh.error(createServiceBusyError(), scope)
×
3077
        }
×
3078

3079
        if listRequest.GetPageSize() <= 0 {
14✔
3080
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3081
        }
×
3082

3083
        maxPageSize := wh.config.VisibilityArchivalQueryMaxPageSize()
14✔
3084
        if int(listRequest.GetPageSize()) > maxPageSize {
14✔
3085
                return nil, wh.error(&types.BadRequestError{
×
3086
                        Message: fmt.Sprintf("Pagesize is larger than allowed %d", maxPageSize)}, scope)
×
3087
        }
×
3088

3089
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() {
15✔
3090
                return nil, wh.error(&types.BadRequestError{Message: "Cluster is not configured for visibility archival"}, scope)
1✔
3091
        }
1✔
3092

3093
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ReadEnabled() {
13✔
3094
                return nil, wh.error(&types.BadRequestError{Message: "Cluster is not configured for reading archived visibility records"}, scope)
×
3095
        }
×
3096

3097
        entry, err := wh.GetDomainCache().GetDomain(listRequest.GetDomain())
13✔
3098
        if err != nil {
14✔
3099
                return nil, wh.error(err, scope)
1✔
3100
        }
1✔
3101

3102
        if entry.GetConfig().VisibilityArchivalStatus != types.ArchivalStatusEnabled {
14✔
3103
                return nil, wh.error(&types.BadRequestError{Message: "Domain is not configured for visibility archival"}, scope)
2✔
3104
        }
2✔
3105

3106
        URI, err := archiver.NewURI(entry.GetConfig().VisibilityArchivalURI)
10✔
3107
        if err != nil {
10✔
3108
                return nil, wh.error(err, scope)
×
3109
        }
×
3110

3111
        visibilityArchiver, err := wh.GetArchiverProvider().GetVisibilityArchiver(URI.Scheme(), service.Frontend)
10✔
3112
        if err != nil {
10✔
3113
                return nil, wh.error(err, scope)
×
3114
        }
×
3115

3116
        archiverRequest := &archiver.QueryVisibilityRequest{
10✔
3117
                DomainID:      entry.GetInfo().ID,
10✔
3118
                PageSize:      int(listRequest.GetPageSize()),
10✔
3119
                NextPageToken: listRequest.NextPageToken,
10✔
3120
                Query:         listRequest.GetQuery(),
10✔
3121
        }
10✔
3122

10✔
3123
        archiverResponse, err := visibilityArchiver.Query(ctx, URI, archiverRequest)
10✔
3124
        if err != nil {
10✔
3125
                return nil, wh.error(err, scope)
×
3126
        }
×
3127

3128
        // special handling of ExecutionTime for cron or retry
3129
        for _, execution := range archiverResponse.Executions {
25✔
3130
                if execution.GetExecutionTime() == 0 {
30✔
3131
                        execution.ExecutionTime = common.Int64Ptr(execution.GetStartTime())
15✔
3132
                }
15✔
3133
        }
3134

3135
        return &types.ListArchivedWorkflowExecutionsResponse{
10✔
3136
                Executions:    archiverResponse.Executions,
10✔
3137
                NextPageToken: archiverResponse.NextPageToken,
10✔
3138
        }, nil
10✔
3139
}
3140

3141
// ListClosedWorkflowExecutions - retrieves info for closed workflow executions in a domain
3142
func (wh *WorkflowHandler) ListClosedWorkflowExecutions(
3143
        ctx context.Context,
3144
        listRequest *types.ListClosedWorkflowExecutionsRequest,
3145
) (resp *types.ListClosedWorkflowExecutionsResponse, retError error) {
27✔
3146
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
54✔
3147

3148
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendListClosedWorkflowExecutionsScope, listRequest)
27✔
3149
        defer sw.Stop()
27✔
3150

27✔
3151
        if wh.isShuttingDown() {
27✔
3152
                return nil, errShuttingDown
×
3153
        }
×
3154

3155
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
27✔
3156
                return nil, wh.error(err, scope)
×
3157
        }
×
3158

3159
        if listRequest == nil {
27✔
3160
                return nil, wh.error(errRequestNotSet, scope)
×
3161
        }
×
3162

3163
        if listRequest.GetDomain() == "" {
27✔
3164
                return nil, wh.error(errDomainNotSet, scope)
×
3165
        }
×
3166

3167
        if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok {
27✔
3168
                return nil, wh.error(createServiceBusyError(), scope)
×
3169
        }
×
3170

3171
        if listRequest.StartTimeFilter == nil {
27✔
3172
                return nil, wh.error(&types.BadRequestError{Message: "StartTimeFilter is required"}, scope)
×
3173
        }
×
3174

3175
        if listRequest.StartTimeFilter.EarliestTime == nil {
27✔
3176
                return nil, wh.error(&types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}, scope)
×
3177
        }
×
3178

3179
        if listRequest.StartTimeFilter.LatestTime == nil {
27✔
3180
                return nil, wh.error(&types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}, scope)
×
3181
        }
×
3182

3183
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
27✔
3184
                return nil, wh.error(&types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}, scope)
×
3185
        }
×
3186

3187
        filterCount := 0
27✔
3188
        if listRequest.TypeFilter != nil {
28✔
3189
                filterCount++
1✔
3190
        }
1✔
3191
        if listRequest.StatusFilter != nil {
28✔
3192
                filterCount++
1✔
3193
        }
1✔
3194

3195
        if filterCount > 1 {
27✔
3196
                return nil, wh.error(&types.BadRequestError{
×
3197
                        Message: "Only one of ExecutionFilter, TypeFilter or StatusFilter is allowed"}, scope)
×
3198
        } // If ExecutionFilter is provided with one of TypeFilter or StatusFilter, use ExecutionFilter and ignore other filter
×
3199

3200
        if listRequest.GetMaximumPageSize() <= 0 {
28✔
3201
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
1✔
3202
        }
1✔
3203

3204
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
27✔
3205
                return nil, wh.error(&types.BadRequestError{
×
3206
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
×
3207
        }
×
3208

3209
        domain := listRequest.GetDomain()
27✔
3210
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
27✔
3211
        if err != nil {
27✔
3212
                return nil, wh.error(err, scope)
×
3213
        }
×
3214

3215
        baseReq := persistence.ListWorkflowExecutionsRequest{
27✔
3216
                DomainUUID:    domainID,
27✔
3217
                Domain:        domain,
27✔
3218
                PageSize:      int(listRequest.GetMaximumPageSize()),
27✔
3219
                NextPageToken: listRequest.NextPageToken,
27✔
3220
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
27✔
3221
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
27✔
3222
        }
27✔
3223

27✔
3224
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
27✔
3225
        if listRequest.ExecutionFilter != nil {
43✔
3226
                if wh.config.DisableListVisibilityByFilter(domain) {
17✔
3227
                        err = errNoPermission
1✔
3228
                } else {
16✔
3229
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByWorkflowID(
15✔
3230
                                ctx,
15✔
3231
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
15✔
3232
                                        ListWorkflowExecutionsRequest: baseReq,
15✔
3233
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
15✔
3234
                                },
15✔
3235
                        )
15✔
3236
                }
15✔
3237
                wh.GetLogger().Debug("List closed workflow with filter",
16✔
3238
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
16✔
3239
        } else if listRequest.TypeFilter != nil {
12✔
3240
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
3241
                        err = errNoPermission
1✔
3242
                } else {
1✔
3243
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByType(
×
3244
                                ctx,
×
3245
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
3246
                                        ListWorkflowExecutionsRequest: baseReq,
×
3247
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
3248
                                },
×
3249
                        )
×
3250
                }
×
3251
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
3252
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
3253
        } else if listRequest.StatusFilter != nil {
11✔
3254
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
3255
                        err = errNoPermission
1✔
3256
                } else {
1✔
3257
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByStatus(
×
3258
                                ctx,
×
3259
                                &persistence.ListClosedWorkflowExecutionsByStatusRequest{
×
3260
                                        ListWorkflowExecutionsRequest: baseReq,
×
3261
                                        Status:                        listRequest.GetStatusFilter(),
×
3262
                                },
×
3263
                        )
×
3264
                }
×
3265
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
3266
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByStatus)
1✔
3267
        } else {
9✔
3268
                persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutions(ctx, &baseReq)
9✔
3269
        }
9✔
3270

3271
        if err != nil {
30✔
3272
                return nil, wh.error(err, scope)
3✔
3273
        }
3✔
3274

3275
        resp = &types.ListClosedWorkflowExecutionsResponse{}
24✔
3276
        resp.Executions = persistenceResp.Executions
24✔
3277
        resp.NextPageToken = persistenceResp.NextPageToken
24✔
3278
        return resp, nil
24✔
3279
}
3280

3281
// ListWorkflowExecutions - retrieves info for workflow executions in a domain
3282
func (wh *WorkflowHandler) ListWorkflowExecutions(
3283
        ctx context.Context,
3284
        listRequest *types.ListWorkflowExecutionsRequest,
3285
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
143✔
3286
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
286✔
3287

3288
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendListWorkflowExecutionsScope, listRequest)
143✔
3289
        defer sw.Stop()
143✔
3290

143✔
3291
        if wh.isShuttingDown() {
143✔
3292
                return nil, errShuttingDown
×
3293
        }
×
3294

3295
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
143✔
3296
                return nil, wh.error(err, scope)
×
3297
        }
×
3298

3299
        if listRequest == nil {
143✔
3300
                return nil, wh.error(errRequestNotSet, scope)
×
3301
        }
×
3302

3303
        if listRequest.GetDomain() == "" {
143✔
3304
                return nil, wh.error(errDomainNotSet, scope)
×
3305
        }
×
3306

3307
        if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok {
143✔
3308
                return nil, wh.error(createServiceBusyError(), scope)
×
3309
        }
×
3310

3311
        if listRequest.GetPageSize() <= 0 {
143✔
3312
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3313
        }
×
3314

3315
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
143✔
3316
                return nil, wh.error(&types.BadRequestError{
×
3317
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
×
3318
        }
×
3319

3320
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
143✔
3321
        if err != nil {
146✔
3322
                return nil, wh.error(err, scope)
3✔
3323
        }
3✔
3324

3325
        domain := listRequest.GetDomain()
140✔
3326
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
140✔
3327
        if err != nil {
140✔
3328
                return nil, wh.error(err, scope)
×
3329
        }
×
3330

3331
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
140✔
3332
                DomainUUID:    domainID,
140✔
3333
                Domain:        domain,
140✔
3334
                PageSize:      int(listRequest.GetPageSize()),
140✔
3335
                NextPageToken: listRequest.NextPageToken,
140✔
3336
                Query:         validatedQuery,
140✔
3337
        }
140✔
3338
        persistenceResp, err := wh.GetVisibilityManager().ListWorkflowExecutions(ctx, req)
140✔
3339
        if err != nil {
140✔
3340
                return nil, wh.error(err, scope)
×
3341
        }
×
3342

3343
        resp = &types.ListWorkflowExecutionsResponse{}
140✔
3344
        resp.Executions = persistenceResp.Executions
140✔
3345
        resp.NextPageToken = persistenceResp.NextPageToken
140✔
3346
        return resp, nil
140✔
3347
}
3348

3349
// RestartWorkflowExecution - retrieves info for an existing workflow then restarts it
3350
func (wh *WorkflowHandler) RestartWorkflowExecution(ctx context.Context, request *types.RestartWorkflowExecutionRequest) (resp *types.RestartWorkflowExecutionResponse, retError error) {
1✔
3351
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
2✔
3352

3353
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRestartWorkflowExecutionScope, request)
1✔
3354
        defer sw.Stop()
1✔
3355

1✔
3356
        if wh.isShuttingDown() {
1✔
3357
                return nil, errShuttingDown
×
3358
        }
×
3359

3360
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1✔
3361
                return nil, wh.error(err, scope)
×
3362
        }
×
3363

3364
        if request == nil {
1✔
3365
                return nil, wh.error(errRequestNotSet, scope)
×
3366
        }
×
3367

3368
        domainName := request.GetDomain()
1✔
3369
        wfExecution := request.GetWorkflowExecution()
1✔
3370
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
1✔
3371

1✔
3372
        if request.GetDomain() == "" {
1✔
3373
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
3374
        }
×
3375

3376
        if ok := wh.allow(ratelimitTypeUser, request); !ok {
1✔
3377
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
3378
        }
×
3379

3380
        if err := validateExecution(wfExecution); err != nil {
1✔
3381
                return nil, wh.error(err, scope, tags...)
×
3382
        }
×
3383

3384
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
1✔
3385
        if err != nil {
1✔
3386
                return nil, wh.error(err, scope, tags...)
×
3387
        }
×
3388

3389
        history, err := wh.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
1✔
3390
                Domain: domainName,
1✔
3391
                Execution: &types.WorkflowExecution{
1✔
3392
                        WorkflowID: wfExecution.WorkflowID,
1✔
3393
                        RunID:      wfExecution.RunID,
1✔
3394
                },
1✔
3395
                SkipArchival: true,
1✔
3396
        })
1✔
3397
        if err != nil {
1✔
3398
                return nil, wh.error(errHistoryNotFound, scope, tags...)
×
3399
        }
×
3400
        startRequest := constructRestartWorkflowRequest(history.History.Events[0].WorkflowExecutionStartedEventAttributes,
1✔
3401
                domainName, request.Identity, wfExecution.WorkflowID)
1✔
3402
        req, err := common.CreateHistoryStartWorkflowRequest(domainID, startRequest, time.Now())
1✔
3403
        if err != nil {
1✔
3404
                return nil, err
×
3405
        }
×
3406
        startResp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, req)
1✔
3407
        if err != nil {
1✔
3408
                return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
3409
        }
×
3410
        resp = &types.RestartWorkflowExecutionResponse{
1✔
3411
                RunID: startResp.RunID,
1✔
3412
        }
1✔
3413

1✔
3414
        return resp, nil
1✔
3415
}
3416

3417
// ScanWorkflowExecutions - retrieves info for large amount of workflow executions in a domain without order
3418
func (wh *WorkflowHandler) ScanWorkflowExecutions(
3419
        ctx context.Context,
3420
        listRequest *types.ListWorkflowExecutionsRequest,
3421
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
30✔
3422
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
60✔
3423

3424
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendScanWorkflowExecutionsScope, listRequest)
30✔
3425
        defer sw.Stop()
30✔
3426

30✔
3427
        if wh.isShuttingDown() {
30✔
3428
                return nil, errShuttingDown
×
3429
        }
×
3430

3431
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
30✔
3432
                return nil, wh.error(err, scope)
×
3433
        }
×
3434

3435
        if listRequest == nil {
30✔
3436
                return nil, wh.error(errRequestNotSet, scope)
×
3437
        }
×
3438

3439
        if listRequest.GetDomain() == "" {
30✔
3440
                return nil, wh.error(errDomainNotSet, scope)
×
3441
        }
×
3442

3443
        if ok := wh.allow(ratelimitTypeUser, listRequest); !ok {
30✔
3444
                return nil, wh.error(createServiceBusyError(), scope)
×
3445
        }
×
3446

3447
        if listRequest.GetPageSize() <= 0 {
30✔
3448
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3449
        }
×
3450

3451
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
30✔
3452
                return nil, wh.error(&types.BadRequestError{
×
3453
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
×
3454
        }
×
3455

3456
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
30✔
3457
        if err != nil {
31✔
3458
                return nil, wh.error(err, scope)
1✔
3459
        }
1✔
3460

3461
        domain := listRequest.GetDomain()
29✔
3462
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
29✔
3463
        if err != nil {
29✔
3464
                return nil, wh.error(err, scope)
×
3465
        }
×
3466

3467
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
29✔
3468
                DomainUUID:    domainID,
29✔
3469
                Domain:        domain,
29✔
3470
                PageSize:      int(listRequest.GetPageSize()),
29✔
3471
                NextPageToken: listRequest.NextPageToken,
29✔
3472
                Query:         validatedQuery,
29✔
3473
        }
29✔
3474
        persistenceResp, err := wh.GetVisibilityManager().ScanWorkflowExecutions(ctx, req)
29✔
3475
        if err != nil {
29✔
3476
                return nil, wh.error(err, scope)
×
3477
        }
×
3478

3479
        resp = &types.ListWorkflowExecutionsResponse{}
29✔
3480
        resp.Executions = persistenceResp.Executions
29✔
3481
        resp.NextPageToken = persistenceResp.NextPageToken
29✔
3482
        return resp, nil
29✔
3483
}
3484

3485
// CountWorkflowExecutions - count number of workflow executions in a domain
3486
func (wh *WorkflowHandler) CountWorkflowExecutions(
3487
        ctx context.Context,
3488
        countRequest *types.CountWorkflowExecutionsRequest,
3489
) (resp *types.CountWorkflowExecutionsResponse, retError error) {
15✔
3490
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
30✔
3491

3492
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendCountWorkflowExecutionsScope, countRequest)
15✔
3493
        defer sw.Stop()
15✔
3494

15✔
3495
        if wh.isShuttingDown() {
15✔
3496
                return nil, errShuttingDown
×
3497
        }
×
3498

3499
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
3500
                return nil, wh.error(err, scope)
×
3501
        }
×
3502

3503
        if countRequest == nil {
16✔
3504
                return nil, wh.error(errRequestNotSet, scope)
1✔
3505
        }
1✔
3506

3507
        if countRequest.GetDomain() == "" {
14✔
3508
                return nil, wh.error(errDomainNotSet, scope)
×
3509
        }
×
3510

3511
        if ok := wh.allow(ratelimitTypeUser, countRequest); !ok {
14✔
3512
                return nil, wh.error(createServiceBusyError(), scope)
×
3513
        }
×
3514

3515
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(countRequest.GetQuery())
14✔
3516
        if err != nil {
15✔
3517
                return nil, wh.error(err, scope)
1✔
3518
        }
1✔
3519

3520
        domain := countRequest.GetDomain()
13✔
3521
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
13✔
3522
        if err != nil {
13✔
3523
                return nil, wh.error(err, scope)
×
3524
        }
×
3525

3526
        req := &persistence.CountWorkflowExecutionsRequest{
13✔
3527
                DomainUUID: domainID,
13✔
3528
                Domain:     domain,
13✔
3529
                Query:      validatedQuery,
13✔
3530
        }
13✔
3531
        persistenceResp, err := wh.GetVisibilityManager().CountWorkflowExecutions(ctx, req)
13✔
3532
        if err != nil {
13✔
3533
                return nil, wh.error(err, scope)
×
3534
        }
×
3535

3536
        resp = &types.CountWorkflowExecutionsResponse{
13✔
3537
                Count: persistenceResp.Count,
13✔
3538
        }
13✔
3539
        return resp, nil
13✔
3540
}
3541

3542
// GetSearchAttributes return valid indexed keys
3543
func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context) (resp *types.GetSearchAttributesResponse, retError error) {
1✔
3544
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
2✔
3545

3546
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendGetSearchAttributesScope)
1✔
3547
        defer sw.Stop()
1✔
3548

1✔
3549
        if wh.isShuttingDown() {
1✔
3550
                return nil, errShuttingDown
×
3551
        }
×
3552

3553
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1✔
3554
                return nil, wh.error(err, scope)
×
3555
        }
×
3556

3557
        keys := wh.config.ValidSearchAttributes()
1✔
3558
        resp = &types.GetSearchAttributesResponse{
1✔
3559
                Keys: wh.convertIndexedKeyToThrift(keys),
1✔
3560
        }
1✔
3561
        return resp, nil
1✔
3562
}
3563

3564
// ResetStickyTaskList reset the volatile information in mutable state of a given workflow.
3565
func (wh *WorkflowHandler) ResetStickyTaskList(
3566
        ctx context.Context,
3567
        resetRequest *types.ResetStickyTaskListRequest,
3568
) (resp *types.ResetStickyTaskListResponse, retError error) {
3✔
3569
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
6✔
3570

3571
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendResetStickyTaskListScope, resetRequest)
3✔
3572
        defer sw.Stop()
3✔
3573

3✔
3574
        if wh.isShuttingDown() {
3✔
3575
                return nil, errShuttingDown
×
3576
        }
×
3577

3578
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
3579
                return nil, wh.error(err, scope)
×
3580
        }
×
3581

3582
        if resetRequest == nil {
3✔
3583
                return nil, wh.error(errRequestNotSet, scope)
×
3584
        }
×
3585

3586
        domainName := resetRequest.GetDomain()
3✔
3587
        wfExecution := resetRequest.GetExecution()
3✔
3588
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
3✔
3589

3✔
3590
        if domainName == "" {
3✔
3591
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
3592
        }
×
3593

3594
        // Count the request in the host RPS,
3595
        // but we still accept it even if RPS is exceeded
3596
        wh.allow(ratelimitTypeWorker, resetRequest)
3✔
3597

3✔
3598
        if err := validateExecution(wfExecution); err != nil {
3✔
3599
                return nil, wh.error(err, scope, tags...)
×
3600
        }
×
3601

3602
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
3✔
3603
        if err != nil {
3✔
3604
                return nil, wh.error(err, scope, tags...)
×
3605
        }
×
3606

3607
        _, err = wh.GetHistoryClient().ResetStickyTaskList(ctx, &types.HistoryResetStickyTaskListRequest{
3✔
3608
                DomainUUID: domainID,
3✔
3609
                Execution:  resetRequest.Execution,
3✔
3610
        })
3✔
3611
        if err != nil {
3✔
3612
                return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
3613
        }
×
3614
        return &types.ResetStickyTaskListResponse{}, nil
3✔
3615
}
3616

3617
// QueryWorkflow returns query result for a specified workflow execution
3618
func (wh *WorkflowHandler) QueryWorkflow(
3619
        ctx context.Context,
3620
        queryRequest *types.QueryWorkflowRequest,
3621
) (resp *types.QueryWorkflowResponse, retError error) {
45✔
3622
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
90✔
3623

3624
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendQueryWorkflowScope, queryRequest)
45✔
3625
        defer sw.Stop()
45✔
3626

45✔
3627
        if wh.isShuttingDown() {
45✔
3628
                return nil, errShuttingDown
×
3629
        }
×
3630

3631
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
45✔
3632
                return nil, wh.error(err, scope)
×
3633
        }
×
3634

3635
        if queryRequest == nil {
45✔
3636
                return nil, wh.error(errRequestNotSet, scope)
×
3637
        }
×
3638

3639
        domainName := queryRequest.GetDomain()
45✔
3640
        wfExecution := queryRequest.GetExecution()
45✔
3641
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
45✔
3642

45✔
3643
        if domainName == "" {
45✔
3644
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
3645
        }
×
3646

3647
        if ok := wh.allow(ratelimitTypeUser, queryRequest); !ok {
45✔
3648
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
3649
        }
×
3650

3651
        if err := validateExecution(wfExecution); err != nil {
45✔
3652
                return nil, wh.error(err, scope, tags...)
×
3653
        }
×
3654

3655
        if wh.config.DisallowQuery(domainName) {
45✔
3656
                return nil, wh.error(errQueryDisallowedForDomain, scope, tags...)
×
3657
        }
×
3658

3659
        if queryRequest.Query == nil {
45✔
3660
                return nil, wh.error(errQueryNotSet, scope, tags...)
×
3661
        }
×
3662

3663
        if queryRequest.Query.GetQueryType() == "" {
45✔
3664
                return nil, wh.error(errQueryTypeNotSet, scope, tags...)
×
3665
        }
×
3666

3667
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
45✔
3668
        if err != nil {
45✔
3669
                return nil, wh.error(err, scope, tags...)
×
3670
        }
×
3671

3672
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
45✔
3673
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
45✔
3674

45✔
3675
        if err := common.CheckEventBlobSizeLimit(
45✔
3676
                len(queryRequest.GetQuery().GetQueryArgs()),
45✔
3677
                sizeLimitWarn,
45✔
3678
                sizeLimitError,
45✔
3679
                domainID,
45✔
3680
                queryRequest.GetExecution().GetWorkflowID(),
45✔
3681
                queryRequest.GetExecution().GetRunID(),
45✔
3682
                scope,
45✔
3683
                wh.GetThrottledLogger(),
45✔
3684
                tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil {
45✔
3685
                return nil, wh.error(err, scope, tags...)
×
3686
        }
×
3687

3688
        req := &types.HistoryQueryWorkflowRequest{
45✔
3689
                DomainUUID: domainID,
45✔
3690
                Request:    queryRequest,
45✔
3691
        }
45✔
3692
        hResponse, err := wh.GetHistoryClient().QueryWorkflow(ctx, req)
45✔
3693
        if err != nil {
57✔
3694
                return nil, wh.error(err, scope, tags...)
12✔
3695
        }
12✔
3696
        return hResponse.GetResponse(), nil
33✔
3697
}
3698

3699
// DescribeWorkflowExecution returns information about the specified workflow execution.
3700
func (wh *WorkflowHandler) DescribeWorkflowExecution(
3701
        ctx context.Context,
3702
        request *types.DescribeWorkflowExecutionRequest,
3703
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
93✔
3704
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
186✔
3705

3706
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendDescribeWorkflowExecutionScope, request)
93✔
3707
        defer sw.Stop()
93✔
3708

93✔
3709
        if wh.isShuttingDown() {
93✔
3710
                return nil, errShuttingDown
×
3711
        }
×
3712

3713
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
93✔
3714
                return nil, wh.error(err, scope)
×
3715
        }
×
3716

3717
        if request == nil {
93✔
3718
                return nil, wh.error(errRequestNotSet, scope)
×
3719
        }
×
3720

3721
        domainName := request.GetDomain()
93✔
3722
        wfExecution := request.GetExecution()
93✔
3723
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
93✔
3724

93✔
3725
        if domainName == "" {
93✔
3726
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
3727
        }
×
3728

3729
        if ok := wh.allow(ratelimitTypeUser, request); !ok {
93✔
3730
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
3731
        }
×
3732

3733
        if err := validateExecution(wfExecution); err != nil {
93✔
3734
                return nil, wh.error(err, scope, tags...)
×
3735
        }
×
3736

3737
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
93✔
3738
        if err != nil {
93✔
3739
                return nil, wh.error(err, scope, tags...)
×
3740
        }
×
3741

3742
        response, err := wh.GetHistoryClient().DescribeWorkflowExecution(ctx, &types.HistoryDescribeWorkflowExecutionRequest{
93✔
3743
                DomainUUID: domainID,
93✔
3744
                Request:    request,
93✔
3745
        })
93✔
3746

93✔
3747
        if err != nil {
93✔
3748
                return nil, wh.error(err, scope, tags...)
×
3749
        }
×
3750

3751
        return response, nil
93✔
3752
}
3753

3754
// DescribeTaskList returns information about the target tasklist, right now this API returns the
3755
// pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true,
3756
// it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
3757
func (wh *WorkflowHandler) DescribeTaskList(
3758
        ctx context.Context,
3759
        request *types.DescribeTaskListRequest,
3760
) (resp *types.DescribeTaskListResponse, retError error) {
18✔
3761
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
36✔
3762

3763
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendDescribeTaskListScope, request)
18✔
3764
        defer sw.Stop()
18✔
3765

18✔
3766
        if wh.isShuttingDown() {
18✔
3767
                return nil, errShuttingDown
×
3768
        }
×
3769

3770
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
18✔
3771
                return nil, wh.error(err, scope)
×
3772
        }
×
3773

3774
        if request == nil {
18✔
3775
                return nil, wh.error(errRequestNotSet, scope)
×
3776
        }
×
3777

3778
        if request.GetDomain() == "" {
18✔
3779
                return nil, wh.error(errDomainNotSet, scope)
×
3780
        }
×
3781

3782
        if ok := wh.allow(ratelimitTypeUser, request); !ok {
18✔
3783
                return nil, wh.error(createServiceBusyError(), scope)
×
3784
        }
×
3785

3786
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
18✔
3787
        if err != nil {
18✔
3788
                return nil, wh.error(err, scope)
×
3789
        }
×
3790

3791
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
18✔
3792
                return nil, wh.error(err, scope)
×
3793
        }
×
3794

3795
        if request.TaskListType == nil {
18✔
3796
                return nil, wh.error(errTaskListTypeNotSet, scope)
×
3797
        }
×
3798

3799
        response, err := wh.GetMatchingClient().DescribeTaskList(ctx, &types.MatchingDescribeTaskListRequest{
18✔
3800
                DomainUUID:  domainID,
18✔
3801
                DescRequest: request,
18✔
3802
        })
18✔
3803
        if err != nil {
18✔
3804
                return nil, wh.error(err, scope)
×
3805
        }
×
3806

3807
        return response, nil
18✔
3808
}
3809

3810
// ListTaskListPartitions returns all the partition and host for a taskList
3811
func (wh *WorkflowHandler) ListTaskListPartitions(
3812
        ctx context.Context,
3813
        request *types.ListTaskListPartitionsRequest,
3814
) (resp *types.ListTaskListPartitionsResponse, retError error) {
×
3815
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
3816

3817
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendListTaskListPartitionsScope, request)
×
3818
        defer sw.Stop()
×
3819

×
3820
        if wh.isShuttingDown() {
×
3821
                return nil, errShuttingDown
×
3822
        }
×
3823

3824
        if request == nil {
×
3825
                return nil, wh.error(errRequestNotSet, scope)
×
3826
        }
×
3827

3828
        if request.GetDomain() == "" {
×
3829
                return nil, wh.error(errDomainNotSet, scope)
×
3830
        }
×
3831

3832
        if ok := wh.allow(ratelimitTypeUser, request); !ok {
×
3833
                return nil, wh.error(createServiceBusyError(), scope)
×
3834
        }
×
3835

3836
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
×
3837
                return nil, wh.error(err, scope)
×
3838
        }
×
3839

3840
        resp, err := wh.GetMatchingClient().ListTaskListPartitions(ctx, &types.MatchingListTaskListPartitionsRequest{
×
3841
                Domain:   request.Domain,
×
3842
                TaskList: request.TaskList,
×
3843
        })
×
3844
        return resp, err
×
3845
}
3846

3847
// GetTaskListsByDomain returns all the partition and host for a taskList
3848
func (wh *WorkflowHandler) GetTaskListsByDomain(
3849
        ctx context.Context,
3850
        request *types.GetTaskListsByDomainRequest,
3851
) (resp *types.GetTaskListsByDomainResponse, retError error) {
×
3852
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
3853

3854
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendGetTaskListsByDomainScope, request)
×
3855
        defer sw.Stop()
×
3856

×
3857
        if wh.isShuttingDown() {
×
3858
                return nil, errShuttingDown
×
3859
        }
×
3860

3861
        if request == nil {
×
3862
                return nil, wh.error(errRequestNotSet, scope)
×
3863
        }
×
3864

3865
        if request.GetDomain() == "" {
×
3866
                return nil, wh.error(errDomainNotSet, scope)
×
3867
        }
×
3868

3869
        if ok := wh.allow(ratelimitTypeUser, request); !ok {
×
3870
                return nil, wh.error(createServiceBusyError(), scope)
×
3871
        }
×
3872

3873
        resp, err := wh.GetMatchingClient().GetTaskListsByDomain(ctx, &types.GetTaskListsByDomainRequest{
×
3874
                Domain: request.Domain,
×
3875
        })
×
3876
        return resp, err
×
3877
}
3878

3879
// RefreshWorkflowTasks re-generates the workflow tasks
3880
func (wh *WorkflowHandler) RefreshWorkflowTasks(
3881
        ctx context.Context,
3882
        request *types.RefreshWorkflowTasksRequest,
3883
) (err error) {
×
3884
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &err) }()
×
3885
        scope, sw := wh.startRequestProfile(ctx, metrics.AdminRefreshWorkflowTasksScope)
×
3886
        defer sw.Stop()
×
3887

×
3888
        if request == nil {
×
3889
                return wh.error(errRequestNotSet, scope)
×
3890
        }
×
3891
        if err := validateExecution(request.Execution); err != nil {
×
3892
                return wh.error(err, scope)
×
3893
        }
×
3894
        domainEntry, err := wh.GetDomainCache().GetDomain(request.GetDomain())
×
3895
        if err != nil {
×
3896
                return wh.error(err, scope)
×
3897
        }
×
3898

3899
        err = wh.GetHistoryClient().RefreshWorkflowTasks(ctx, &types.HistoryRefreshWorkflowTasksRequest{
×
3900
                DomainUIID: domainEntry.GetInfo().ID,
×
3901
                Request:    request,
×
3902
        })
×
3903
        if err != nil {
×
3904
                return wh.error(err, scope)
×
3905
        }
×
3906
        return nil
×
3907
}
3908

3909
func (wh *WorkflowHandler) getRawHistory(
3910
        ctx context.Context,
3911
        scope metrics.Scope,
3912
        domainID string,
3913
        domainName string,
3914
        execution types.WorkflowExecution,
3915
        firstEventID int64,
3916
        nextEventID int64,
3917
        pageSize int32,
3918
        nextPageToken []byte,
3919
        transientDecision *types.TransientDecisionInfo,
3920
        branchToken []byte,
3921
) ([]*types.DataBlob, []byte, error) {
2✔
3922
        rawHistory := []*types.DataBlob{}
2✔
3923
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
2✔
3924

2✔
3925
        resp, err := wh.GetHistoryManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
2✔
3926
                BranchToken:   branchToken,
2✔
3927
                MinEventID:    firstEventID,
2✔
3928
                MaxEventID:    nextEventID,
2✔
3929
                PageSize:      int(pageSize),
2✔
3930
                NextPageToken: nextPageToken,
2✔
3931
                ShardID:       common.IntPtr(shardID),
2✔
3932
                DomainName:    domainName,
2✔
3933
        })
2✔
3934
        if err != nil {
2✔
3935
                return nil, nil, err
×
3936
        }
×
3937

3938
        var encoding *types.EncodingType
2✔
3939
        for _, data := range resp.HistoryEventBlobs {
4✔
3940
                switch data.Encoding {
2✔
3941
                case common.EncodingTypeJSON:
×
3942
                        encoding = types.EncodingTypeJSON.Ptr()
×
3943
                case common.EncodingTypeThriftRW:
2✔
3944
                        encoding = types.EncodingTypeThriftRW.Ptr()
2✔
3945
                default:
×
3946
                        panic(fmt.Sprintf("Invalid encoding type for raw history, encoding type: %s", data.Encoding))
×
3947
                }
3948
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3949
                        EncodingType: encoding,
2✔
3950
                        Data:         data.Data,
2✔
3951
                })
2✔
3952
        }
3953

3954
        if len(resp.NextPageToken) == 0 && transientDecision != nil {
4✔
3955
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
2✔
3956
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3957
                        wh.GetLogger().Error("getHistory error",
×
3958
                                tag.WorkflowDomainID(domainID),
×
3959
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3960
                                tag.WorkflowRunID(execution.GetRunID()),
×
3961
                                tag.Error(err))
×
3962
                }
×
3963
                blob, err := wh.GetPayloadSerializer().SerializeBatchEvents(
2✔
3964
                        []*types.HistoryEvent{transientDecision.ScheduledEvent, transientDecision.StartedEvent}, common.EncodingTypeThriftRW)
2✔
3965
                if err != nil {
2✔
3966
                        return nil, nil, err
×
3967
                }
×
3968
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3969
                        EncodingType: types.EncodingTypeThriftRW.Ptr(),
2✔
3970
                        Data:         blob.Data,
2✔
3971
                })
2✔
3972
        }
3973

3974
        return rawHistory, resp.NextPageToken, nil
2✔
3975
}
3976

3977
func (wh *WorkflowHandler) getHistory(
3978
        ctx context.Context,
3979
        scope metrics.Scope,
3980
        domainID string,
3981
        domainName string,
3982
        execution types.WorkflowExecution,
3983
        firstEventID, nextEventID int64,
3984
        pageSize int32,
3985
        nextPageToken []byte,
3986
        transientDecision *types.TransientDecisionInfo,
3987
        branchToken []byte,
3988
) (*types.History, []byte, error) {
1,588✔
3989

1,588✔
3990
        var size int
1,588✔
3991

1,588✔
3992
        isFirstPage := len(nextPageToken) == 0
1,588✔
3993
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
1,588✔
3994
        var err error
1,588✔
3995
        historyEvents, size, nextPageToken, err := persistenceutils.ReadFullPageV2Events(ctx, wh.GetHistoryManager(), &persistence.ReadHistoryBranchRequest{
1,588✔
3996
                BranchToken:   branchToken,
1,588✔
3997
                MinEventID:    firstEventID,
1,588✔
3998
                MaxEventID:    nextEventID,
1,588✔
3999
                PageSize:      int(pageSize),
1,588✔
4000
                NextPageToken: nextPageToken,
1,588✔
4001
                ShardID:       common.IntPtr(shardID),
1,588✔
4002
                DomainName:    domainName,
1,588✔
4003
        })
1,588✔
4004

1,588✔
4005
        if err != nil {
1,588✔
4006
                return nil, nil, err
×
4007
        }
×
4008

4009
        scope.RecordTimer(metrics.HistorySize, time.Duration(size))
1,588✔
4010

1,588✔
4011
        isLastPage := len(nextPageToken) == 0
1,588✔
4012
        if err := verifyHistoryIsComplete(
1,588✔
4013
                historyEvents,
1,588✔
4014
                firstEventID,
1,588✔
4015
                nextEventID-1,
1,588✔
4016
                isFirstPage,
1,588✔
4017
                isLastPage,
1,588✔
4018
                int(pageSize)); err != nil {
1,588✔
4019
                scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
4020
                wh.GetLogger().Error("getHistory: incomplete history",
×
4021
                        tag.WorkflowDomainID(domainID),
×
4022
                        tag.WorkflowID(execution.GetWorkflowID()),
×
4023
                        tag.WorkflowRunID(execution.GetRunID()),
×
4024
                        tag.Error(err))
×
4025
                return nil, nil, err
×
4026
        }
×
4027

4028
        if len(nextPageToken) == 0 && transientDecision != nil {
1,758✔
4029
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
170✔
4030
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
4031
                        wh.GetLogger().Error("getHistory error",
×
4032
                                tag.WorkflowDomainID(domainID),
×
4033
                                tag.WorkflowID(execution.GetWorkflowID()),
×
4034
                                tag.WorkflowRunID(execution.GetRunID()),
×
4035
                                tag.Error(err))
×
4036
                }
×
4037
                // Append the transient decision events once we are done enumerating everything from the events table
4038
                historyEvents = append(historyEvents, transientDecision.ScheduledEvent, transientDecision.StartedEvent)
170✔
4039
        }
4040

4041
        executionHistory := &types.History{}
1,588✔
4042
        executionHistory.Events = historyEvents
1,588✔
4043
        return executionHistory, nextPageToken, nil
1,588✔
4044
}
4045

4046
func (wh *WorkflowHandler) validateTransientDecisionEvents(
4047
        expectedNextEventID int64,
4048
        decision *types.TransientDecisionInfo,
4049
) error {
172✔
4050

172✔
4051
        if decision.ScheduledEvent.ID == expectedNextEventID &&
172✔
4052
                decision.StartedEvent.ID == expectedNextEventID+1 {
344✔
4053
                return nil
172✔
4054
        }
172✔
4055

4056
        return fmt.Errorf(
×
4057
                "invalid transient decision: "+
×
4058
                        "expectedScheduledEventID=%v expectedStartedEventID=%v but have scheduledEventID=%v startedEventID=%v",
×
4059
                expectedNextEventID,
×
4060
                expectedNextEventID+1,
×
4061
                decision.ScheduledEvent.ID,
×
4062
                decision.StartedEvent.ID)
×
4063
}
4064

4065
// startRequestProfile initiates recording of request metrics
4066
func (wh *WorkflowHandler) startRequestProfile(ctx context.Context, scope int) (metrics.Scope, metrics.Stopwatch) {
186✔
4067
        metricsScope := wh.GetMetricsClient().Scope(scope).Tagged(metrics.DomainUnknownTag()).Tagged(metrics.GetContextTags(ctx)...)
186✔
4068
        // timer should be emitted with the all tag
186✔
4069
        sw := metricsScope.StartTimer(metrics.CadenceLatency)
186✔
4070
        metricsScope.IncCounter(metrics.CadenceRequests)
186✔
4071
        return metricsScope, sw
186✔
4072
}
186✔
4073

4074
// startRequestProfileWithDomain initiates recording of request metrics and returns a domain tagged scope
4075
func (wh *WorkflowHandler) startRequestProfileWithDomain(ctx context.Context, scope int, d domainGetter) (metrics.Scope, metrics.Stopwatch) {
6,368✔
4076
        metricsScope := getMetricsScopeWithDomain(scope, d, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
6,368✔
4077
        sw := metricsScope.StartTimer(metrics.CadenceLatency)
6,368✔
4078
        metricsScope.IncCounter(metrics.CadenceRequests)
6,368✔
4079
        return metricsScope, sw
6,368✔
4080
}
6,368✔
4081

4082
// getDefaultScope returns a default scope to use for request metrics
4083
func (wh *WorkflowHandler) getDefaultScope(ctx context.Context, scope int) metrics.Scope {
1,760✔
4084
        return wh.GetMetricsClient().Scope(scope).Tagged(metrics.DomainUnknownTag()).Tagged(metrics.GetContextTags(ctx)...)
1,760✔
4085
}
1,760✔
4086

4087
func frontendInternalServiceError(fmtStr string, args ...interface{}) error {
8✔
4088
        // NOTE: For internal error, we can't return thrift error from cadence-frontend.
8✔
4089
        // Because in uber internal metrics, thrift errors are counted as user errors.
8✔
4090
        return fmt.Errorf(fmtStr, args...)
8✔
4091
}
8✔
4092

4093
func (wh *WorkflowHandler) error(err error, scope metrics.Scope, tagsForErrorLog ...tag.Tag) error {
770✔
4094
        switch err := err.(type) {
770✔
4095
        case *types.InternalServiceError:
3✔
4096
                wh.GetLogger().WithTags(tagsForErrorLog...).Error("Internal service error", tag.Error(err))
3✔
4097
                scope.IncCounter(metrics.CadenceFailures)
3✔
4098
                return frontendInternalServiceError("cadence internal error, msg: %v", err.Message)
3✔
4099
        case *types.BadRequestError:
31✔
4100
                scope.IncCounter(metrics.CadenceErrBadRequestCounter)
31✔
4101
                return err
31✔
4102
        case *types.DomainNotActiveError:
×
4103
                scope.IncCounter(metrics.CadenceErrBadRequestCounter)
×
4104
                return err
×
4105
        case *types.ServiceBusyError:
×
4106
                scope.IncCounter(metrics.CadenceErrServiceBusyCounter)
×
4107
                return err
×
4108
        case *types.EntityNotExistsError:
668✔
4109
                scope.IncCounter(metrics.CadenceErrEntityNotExistsCounter)
668✔
4110
                return err
668✔
4111
        case *types.WorkflowExecutionAlreadyCompletedError:
24✔
4112
                scope.IncCounter(metrics.CadenceErrWorkflowExecutionAlreadyCompletedCounter)
24✔
4113
                return err
24✔
4114
        case *types.WorkflowExecutionAlreadyStartedError:
24✔
4115
                scope.IncCounter(metrics.CadenceErrExecutionAlreadyStartedCounter)
24✔
4116
                return err
24✔
4117
        case *types.DomainAlreadyExistsError:
×
4118
                scope.IncCounter(metrics.CadenceErrDomainAlreadyExistsCounter)
×
4119
                return err
×
4120
        case *types.CancellationAlreadyRequestedError:
3✔
4121
                scope.IncCounter(metrics.CadenceErrCancellationAlreadyRequestedCounter)
3✔
4122
                return err
3✔
4123
        case *types.QueryFailedError:
9✔
4124
                scope.IncCounter(metrics.CadenceErrQueryFailedCounter)
9✔
4125
                return err
9✔
4126
        case *types.LimitExceededError:
×
4127
                scope.IncCounter(metrics.CadenceErrLimitExceededCounter)
×
4128
                return err
×
4129
        case *types.ClientVersionNotSupportedError:
×
4130
                scope.IncCounter(metrics.CadenceErrClientVersionNotSupportedCounter)
×
4131
                return err
×
4132
        case *yarpcerrors.Status:
3✔
4133
                if err.Code() == yarpcerrors.CodeDeadlineExceeded {
6✔
4134
                        wh.GetLogger().WithTags(tagsForErrorLog...).Error("Frontend request timedout", tag.Error(err))
3✔
4135
                        scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
3✔
4136
                        return err
3✔
4137
                }
3✔
4138
        }
4139
        if errors.Is(err, context.DeadlineExceeded) {
5✔
4140
                wh.GetLogger().WithTags(tagsForErrorLog...).Error("Frontend request timedout", tag.Error(err))
×
4141
                scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
×
4142
                return err
×
4143
        }
×
4144
        wh.GetLogger().WithTags(tagsForErrorLog...).Error("Uncategorized error", tag.Error(err))
5✔
4145
        scope.IncCounter(metrics.CadenceFailures)
5✔
4146
        return frontendInternalServiceError("cadence internal uncategorized error, msg: %v", err.Error())
5✔
4147
}
4148

4149
func (wh *WorkflowHandler) validateTaskList(t *types.TaskList, scope metrics.Scope, domain string) error {
2,806✔
4150
        if t == nil || t.GetName() == "" {
2,807✔
4151
                return errTaskListNotSet
1✔
4152
        }
1✔
4153

4154
        if !common.ValidIDLength(
2,805✔
4155
                t.GetName(),
2,805✔
4156
                scope,
2,805✔
4157
                wh.config.MaxIDLengthWarnLimit(),
2,805✔
4158
                wh.config.TaskListNameMaxLength(domain),
2,805✔
4159
                metrics.CadenceErrTaskListNameExceededWarnLimit,
2,805✔
4160
                domain,
2,805✔
4161
                wh.GetLogger(),
2,805✔
4162
                tag.IDTypeTaskListName) {
2,805✔
4163
                return errTaskListTooLong
×
4164
        }
×
4165
        return nil
2,805✔
4166
}
4167

4168
func validateExecution(w *types.WorkflowExecution) error {
1,390✔
4169
        if w == nil {
1,390✔
4170
                return errExecutionNotSet
×
4171
        }
×
4172
        if w.GetWorkflowID() == "" {
1,390✔
4173
                return errWorkflowIDNotSet
×
4174
        }
×
4175
        if w.GetRunID() != "" && uuid.Parse(w.GetRunID()) == nil {
1,390✔
4176
                return errInvalidRunID
×
4177
        }
×
4178
        return nil
1,390✔
4179
}
4180

4181
func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
4182
        ctx context.Context,
4183
        scope metrics.Scope,
4184
        domainID string,
4185
        matchingResp *types.MatchingPollForDecisionTaskResponse,
4186
        branchToken []byte,
4187
) (*types.PollForDecisionTaskResponse, error) {
1,224✔
4188

1,224✔
4189
        if matchingResp.WorkflowExecution == nil {
1,282✔
4190
                // this will happen if there is no decision task to be send to worker / caller
58✔
4191
                return &types.PollForDecisionTaskResponse{}, nil
58✔
4192
        }
58✔
4193

4194
        var history *types.History
1,166✔
4195
        var continuation []byte
1,166✔
4196
        var err error
1,166✔
4197

1,166✔
4198
        if matchingResp.GetStickyExecutionEnabled() && matchingResp.Query != nil {
1,172✔
4199
                // meaning sticky query, we should not return any events to worker
6✔
4200
                // since query task only check the current status
6✔
4201
                history = &types.History{
6✔
4202
                        Events: []*types.HistoryEvent{},
6✔
4203
                }
6✔
4204
        } else {
1,166✔
4205
                // here we have 3 cases:
1,160✔
4206
                // 1. sticky && non query task
1,160✔
4207
                // 2. non sticky &&  non query task
1,160✔
4208
                // 3. non sticky && query task
1,160✔
4209
                // for 1, partial history have to be send back
1,160✔
4210
                // for 2 and 3, full history have to be send back
1,160✔
4211

1,160✔
4212
                var persistenceToken []byte
1,160✔
4213

1,160✔
4214
                firstEventID := common.FirstEventID
1,160✔
4215
                nextEventID := matchingResp.GetNextEventID()
1,160✔
4216
                if matchingResp.GetStickyExecutionEnabled() {
1,256✔
4217
                        firstEventID = matchingResp.GetPreviousStartedEventID() + 1
96✔
4218
                }
96✔
4219
                domainName, dErr := wh.GetDomainCache().GetDomainName(domainID)
1,160✔
4220
                if dErr != nil {
1,160✔
4221
                        return nil, dErr
×
4222
                }
×
4223
                scope = scope.Tagged(metrics.DomainTag(domainName))
1,160✔
4224
                history, persistenceToken, err = wh.getHistory(
1,160✔
4225
                        ctx,
1,160✔
4226
                        scope,
1,160✔
4227
                        domainID,
1,160✔
4228
                        domainName,
1,160✔
4229
                        *matchingResp.WorkflowExecution,
1,160✔
4230
                        firstEventID,
1,160✔
4231
                        nextEventID,
1,160✔
4232
                        int32(wh.config.HistoryMaxPageSize(domainName)),
1,160✔
4233
                        nil,
1,160✔
4234
                        matchingResp.DecisionInfo,
1,160✔
4235
                        branchToken,
1,160✔
4236
                )
1,160✔
4237
                if err != nil {
1,160✔
4238
                        return nil, err
×
4239
                }
×
4240

4241
                if len(persistenceToken) != 0 {
1,160✔
4242
                        continuation, err = serializeHistoryToken(&getHistoryContinuationToken{
×
4243
                                RunID:             matchingResp.WorkflowExecution.GetRunID(),
×
4244
                                FirstEventID:      firstEventID,
×
4245
                                NextEventID:       nextEventID,
×
4246
                                PersistenceToken:  persistenceToken,
×
4247
                                TransientDecision: matchingResp.DecisionInfo,
×
4248
                                BranchToken:       branchToken,
×
4249
                        })
×
4250
                        if err != nil {
×
4251
                                return nil, err
×
4252
                        }
×
4253
                }
4254
        }
4255

4256
        resp := &types.PollForDecisionTaskResponse{
1,166✔
4257
                TaskToken:                 matchingResp.TaskToken,
1,166✔
4258
                WorkflowExecution:         matchingResp.WorkflowExecution,
1,166✔
4259
                WorkflowType:              matchingResp.WorkflowType,
1,166✔
4260
                PreviousStartedEventID:    matchingResp.PreviousStartedEventID,
1,166✔
4261
                StartedEventID:            matchingResp.StartedEventID, // this field is not set for query tasks as there's no decision task started event
1,166✔
4262
                Query:                     matchingResp.Query,
1,166✔
4263
                BacklogCountHint:          matchingResp.BacklogCountHint,
1,166✔
4264
                Attempt:                   matchingResp.Attempt,
1,166✔
4265
                History:                   history,
1,166✔
4266
                NextPageToken:             continuation,
1,166✔
4267
                WorkflowExecutionTaskList: matchingResp.WorkflowExecutionTaskList,
1,166✔
4268
                ScheduledTimestamp:        matchingResp.ScheduledTimestamp,
1,166✔
4269
                StartedTimestamp:          matchingResp.StartedTimestamp,
1,166✔
4270
                Queries:                   matchingResp.Queries,
1,166✔
4271
                NextEventID:               matchingResp.NextEventID,
1,166✔
4272
        }
1,166✔
4273

1,166✔
4274
        return resp, nil
1,166✔
4275
}
4276

4277
func verifyHistoryIsComplete(
4278
        events []*types.HistoryEvent,
4279
        expectedFirstEventID int64,
4280
        expectedLastEventID int64,
4281
        isFirstPage bool,
4282
        isLastPage bool,
4283
        pageSize int,
4284
) error {
1,607✔
4285

1,607✔
4286
        nEvents := len(events)
1,607✔
4287
        if nEvents == 0 {
1,619✔
4288
                if isLastPage {
24✔
4289
                        // we seem to be returning a non-nil pageToken on the lastPage which
12✔
4290
                        // in turn cases the client to call getHistory again - only to find
12✔
4291
                        // there are no more events to consume - bail out if this is the case here
12✔
4292
                        return nil
12✔
4293
                }
12✔
4294
                return fmt.Errorf("invalid history: contains zero events")
×
4295
        }
4296

4297
        firstEventID := events[0].ID
1,595✔
4298
        lastEventID := events[nEvents-1].ID
1,595✔
4299

1,595✔
4300
        if !isFirstPage { // atleast one page of history has been read previously
1,631✔
4301
                if firstEventID <= expectedFirstEventID {
36✔
4302
                        // not first page and no events have been read in the previous pages - not possible
×
4303
                        return &types.InternalServiceError{
×
4304
                                Message: fmt.Sprintf(
×
4305
                                        "invalid history: expected first eventID to be > %v but got %v", expectedFirstEventID, firstEventID),
×
4306
                        }
×
4307
                }
×
4308
                expectedFirstEventID = firstEventID
36✔
4309
        }
4310

4311
        if !isLastPage {
1,644✔
4312
                // estimate lastEventID based on pageSize. This is a lower bound
49✔
4313
                // since the persistence layer counts "batch of events" as a single page
49✔
4314
                expectedLastEventID = expectedFirstEventID + int64(pageSize) - 1
49✔
4315
        }
49✔
4316

4317
        nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1
1,595✔
4318

1,595✔
4319
        if firstEventID == expectedFirstEventID &&
1,595✔
4320
                ((isLastPage && lastEventID == expectedLastEventID && int64(nEvents) == nExpectedEvents) ||
1,595✔
4321
                        (!isLastPage && lastEventID >= expectedLastEventID && int64(nEvents) >= nExpectedEvents)) {
3,178✔
4322
                return nil
1,583✔
4323
        }
1,583✔
4324

4325
        return &types.InternalServiceError{
12✔
4326
                Message: fmt.Sprintf(
12✔
4327
                        "incomplete history: "+
12✔
4328
                                "expected events [%v-%v] but got events [%v-%v] of length %v:"+
12✔
4329
                                "isFirstPage=%v,isLastPage=%v,pageSize=%v",
12✔
4330
                        expectedFirstEventID,
12✔
4331
                        expectedLastEventID,
12✔
4332
                        firstEventID,
12✔
4333
                        lastEventID,
12✔
4334
                        nEvents,
12✔
4335
                        isFirstPage,
12✔
4336
                        isLastPage,
12✔
4337
                        pageSize),
12✔
4338
        }
12✔
4339
}
4340

4341
func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) {
44✔
4342
        token := &getHistoryContinuationToken{}
44✔
4343
        err := json.Unmarshal(bytes, token)
44✔
4344
        return token, err
44✔
4345
}
44✔
4346

4347
func serializeHistoryToken(token *getHistoryContinuationToken) ([]byte, error) {
429✔
4348
        if token == nil {
816✔
4349
                return nil, nil
387✔
4350
        }
387✔
4351

4352
        bytes, err := json.Marshal(token)
42✔
4353
        return bytes, err
42✔
4354
}
4355

4356
func createServiceBusyError() *types.ServiceBusyError {
×
4357
        err := &types.ServiceBusyError{}
×
4358
        err.Message = "Too many outstanding requests to the cadence service"
×
4359
        return err
×
4360
}
×
4361

4362
func isFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
4363
        return updateRequest.ActiveClusterName != nil
9✔
4364
}
9✔
4365

4366
func isGraceFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
4367
        return updateRequest.FailoverTimeoutInSeconds != nil
9✔
4368
}
9✔
4369

4370
func (wh *WorkflowHandler) checkOngoingFailover(
4371
        ctx context.Context,
4372
        domainName *string,
4373
) error {
1✔
4374

1✔
4375
        enabledClusters := wh.GetClusterMetadata().GetEnabledClusterInfo()
1✔
4376
        respChan := make(chan *types.DescribeDomainResponse, len(enabledClusters))
1✔
4377

1✔
4378
        g := &errgroup.Group{}
1✔
4379
        for clusterName := range enabledClusters {
3✔
4380
                frontendClient := wh.GetRemoteFrontendClient(clusterName)
2✔
4381
                g.Go(func() (e error) {
4✔
4382
                        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &e) }()
4✔
4383

4384
                        resp, _ := frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{Name: domainName})
2✔
4385
                        respChan <- resp
2✔
4386
                        return nil
2✔
4387
                })
4388
        }
4389
        g.Wait()
1✔
4390
        close(respChan)
1✔
4391

1✔
4392
        var failoverVersion *int64
1✔
4393
        for resp := range respChan {
3✔
4394
                if resp == nil {
2✔
4395
                        return &types.InternalServiceError{
×
4396
                                Message: "Failed to verify failover version from all clusters",
×
4397
                        }
×
4398
                }
×
4399
                if failoverVersion == nil {
3✔
4400
                        failoverVersion = &resp.FailoverVersion
1✔
4401
                }
1✔
4402
                if *failoverVersion != resp.GetFailoverVersion() {
2✔
4403
                        return &types.BadRequestError{
×
4404
                                Message: "Concurrent failover is not allow.",
×
4405
                        }
×
4406
                }
×
4407
        }
4408
        return nil
1✔
4409
}
4410

4411
func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *types.GetWorkflowExecutionHistoryRequest, domainID string) bool {
441✔
4412
        if request.GetExecution() == nil || request.GetExecution().GetRunID() == "" {
476✔
4413
                return false
35✔
4414
        }
35✔
4415
        getMutableStateRequest := &types.GetMutableStateRequest{
406✔
4416
                DomainUUID: domainID,
406✔
4417
                Execution:  request.Execution,
406✔
4418
        }
406✔
4419
        _, err := wh.GetHistoryClient().GetMutableState(ctx, getMutableStateRequest)
406✔
4420
        if err == nil {
786✔
4421
                return false
380✔
4422
        }
380✔
4423
        switch err.(type) {
26✔
4424
        case *types.EntityNotExistsError:
25✔
4425
                // the only case in which history is assumed to be archived is if getting mutable state returns entity not found error
25✔
4426
                return true
25✔
4427
        }
4428
        return false
1✔
4429
}
4430

4431
func (wh *WorkflowHandler) getArchivedHistory(
4432
        ctx context.Context,
4433
        request *types.GetWorkflowExecutionHistoryRequest,
4434
        domainID string,
4435
        scope metrics.Scope,
4436
        tags ...tag.Tag,
4437
) (*types.GetWorkflowExecutionHistoryResponse, error) {
28✔
4438
        entry, err := wh.GetDomainCache().GetDomainByID(domainID)
28✔
4439
        if err != nil {
29✔
4440
                return nil, wh.error(err, scope)
1✔
4441
        }
1✔
4442

4443
        URIString := entry.GetConfig().HistoryArchivalURI
27✔
4444
        if URIString == "" {
28✔
4445
                // if URI is empty, it means the domain has never enabled for archival.
1✔
4446
                // the error is not "workflow has passed retention period", because
1✔
4447
                // we have no way to tell if the requested workflow exists or not.
1✔
4448
                return nil, wh.error(errHistoryNotFound, scope, tags...)
1✔
4449
        }
1✔
4450

4451
        URI, err := archiver.NewURI(URIString)
26✔
4452
        if err != nil {
27✔
4453
                return nil, wh.error(err, scope, tags...)
1✔
4454
        }
1✔
4455

4456
        historyArchiver, err := wh.GetArchiverProvider().GetHistoryArchiver(URI.Scheme(), service.Frontend)
25✔
4457
        if err != nil {
25✔
4458
                return nil, wh.error(err, scope, tags...)
×
4459
        }
×
4460

4461
        resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{
25✔
4462
                DomainID:      domainID,
25✔
4463
                WorkflowID:    request.GetExecution().GetWorkflowID(),
25✔
4464
                RunID:         request.GetExecution().GetRunID(),
25✔
4465
                NextPageToken: request.GetNextPageToken(),
25✔
4466
                PageSize:      int(request.GetMaximumPageSize()),
25✔
4467
        })
25✔
4468
        if err != nil {
28✔
4469
                return nil, wh.error(err, scope, tags...)
3✔
4470
        }
3✔
4471

4472
        history := &types.History{}
22✔
4473
        for _, batch := range resp.HistoryBatches {
279✔
4474
                history.Events = append(history.Events, batch.Events...)
257✔
4475
        }
257✔
4476
        return &types.GetWorkflowExecutionHistoryResponse{
22✔
4477
                History:       history,
22✔
4478
                NextPageToken: resp.NextPageToken,
22✔
4479
                Archived:      true,
22✔
4480
        }, nil
22✔
4481
}
4482

4483
func (wh *WorkflowHandler) convertIndexedKeyToThrift(keys map[string]interface{}) map[string]types.IndexedValueType {
3✔
4484
        converted := make(map[string]types.IndexedValueType)
3✔
4485
        for k, v := range keys {
51✔
4486
                converted[k] = common.ConvertIndexedValueTypeToInternalType(v, wh.GetLogger())
48✔
4487
        }
48✔
4488
        return converted
2✔
4489
}
4490

4491
func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain string) bool {
303✔
4492
        return common.IsAdvancedVisibilityReadingEnabled(wh.config.EnableReadVisibilityFromES(domain), wh.config.IsAdvancedVisConfigExist) &&
303✔
4493
                pageSize > int32(wh.config.ESIndexMaxResultWindow())
303✔
4494
}
303✔
4495

4496
func (wh *WorkflowHandler) allow(requestType ratelimitType, d domainGetter) bool {
6,043✔
4497
        domain := ""
6,043✔
4498
        if d != nil {
12,086✔
4499
                domain = d.GetDomain()
6,043✔
4500
        }
6,043✔
4501
        switch requestType {
6,043✔
4502
        case ratelimitTypeUser:
1,928✔
4503
                return wh.userRateLimiter.Allow(quotas.Info{Domain: domain})
1,928✔
4504
        case ratelimitTypeWorker:
3,828✔
4505
                return wh.workerRateLimiter.Allow(quotas.Info{Domain: domain})
3,828✔
4506
        case ratelimitTypeVisibility:
287✔
4507
                return wh.visibilityRateLimiter.Allow(quotas.Info{Domain: domain})
287✔
4508
        default:
×
4509
                wh.GetLogger().Fatal("coding error, unrecognized request ratelimit type value", tag.Value(requestType))
×
4510
                panic("unreachable")
×
4511
        }
4512
}
4513

4514
// GetClusterInfo return information about cadence deployment
4515
func (wh *WorkflowHandler) GetClusterInfo(
4516
        ctx context.Context,
4517
) (resp *types.ClusterInfo, err error) {
×
4518
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &err) }()
×
4519

4520
        scope := wh.getDefaultScope(ctx, metrics.FrontendClientGetClusterInfoScope)
×
4521
        if ok := wh.allow(ratelimitTypeUser, nil); !ok {
×
4522
                return nil, wh.error(createServiceBusyError(), scope)
×
4523
        }
×
4524

4525
        return &types.ClusterInfo{
×
4526
                SupportedClientVersions: &types.SupportedClientVersions{
×
4527
                        GoSdk:   client.SupportedGoSDKVersion,
×
4528
                        JavaSdk: client.SupportedJavaSDKVersion,
×
4529
                },
×
4530
        }, nil
×
4531
}
4532

4533
func checkPermission(
4534
        config *Config,
4535
        securityToken string,
4536
) error {
61✔
4537
        if config.EnableAdminProtection() {
63✔
4538
                if securityToken == "" {
2✔
4539
                        return errNoPermission
×
4540
                }
×
4541
                requiredToken := config.AdminOperationToken()
2✔
4542
                if securityToken != requiredToken {
3✔
4543
                        return errNoPermission
1✔
4544
                }
1✔
4545
        }
4546
        return nil
60✔
4547
}
4548

4549
func checkFailOverPermission(config *Config, domainName string) error {
2✔
4550
        if config.Lockdown(domainName) {
3✔
4551
                return errDomainInLockdown
1✔
4552
        }
1✔
4553
        return nil
1✔
4554
}
4555

4556
type domainWrapper struct {
4557
        domain string
4558
}
4559

4560
func (d domainWrapper) GetDomain() string {
3,520✔
4561
        return d.domain
3,520✔
4562
}
3,520✔
4563

4564
func (hs HealthStatus) String() string {
×
4565
        switch hs {
×
4566
        case HealthStatusOK:
×
4567
                return "OK"
×
4568
        case HealthStatusWarmingUp:
×
4569
                return "WarmingUp"
×
4570
        case HealthStatusShuttingDown:
×
4571
                return "ShuttingDown"
×
4572
        default:
×
4573
                return "unknown"
×
4574
        }
4575
}
4576

4577
func getDomainWfIDRunIDTags(
4578
        domainName string,
4579
        wf *types.WorkflowExecution,
4580
) []tag.Tag {
5,986✔
4581
        tags := []tag.Tag{tag.WorkflowDomainName(domainName)}
5,986✔
4582
        if wf == nil {
8,297✔
4583
                return tags
2,311✔
4584
        }
2,311✔
4585
        return append(
3,675✔
4586
                tags,
3,675✔
4587
                tag.WorkflowID(wf.GetWorkflowID()),
3,675✔
4588
                tag.WorkflowRunID(wf.GetRunID()),
3,675✔
4589
        )
3,675✔
4590
}
4591

4592
func checkRequiredDomainDataKVs(requiredDomainDataKeys map[string]interface{}, domainData map[string]string) error {
45✔
4593
        // check requiredDomainDataKeys
45✔
4594
        for k := range requiredDomainDataKeys {
46✔
4595
                _, ok := domainData[k]
1✔
4596
                if !ok {
2✔
4597
                        return fmt.Errorf("domain data error, missing required key %v . All required keys: %v", k, requiredDomainDataKeys)
1✔
4598
                }
1✔
4599
        }
4600
        return nil
44✔
4601
}
4602

4603
// Some error types are introduced later that some clients might not support
4604
// To make them backward compatible, we continue returning the legacy error types
4605
// for older clients
4606
func (wh *WorkflowHandler) normalizeVersionedErrors(ctx context.Context, err error) error {
66✔
4607
        switch err.(type) {
66✔
4608
        case *types.WorkflowExecutionAlreadyCompletedError:
24✔
4609
                call := yarpc.CallFromContext(ctx)
24✔
4610
                clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
24✔
4611
                clientImpl := call.Header(common.ClientImplHeaderName)
24✔
4612
                featureFlags := client.GetFeatureFlagsFromHeader(call)
24✔
4613

24✔
4614
                vErr := wh.versionChecker.SupportsWorkflowAlreadyCompletedError(clientImpl, clientFeatureVersion, featureFlags)
24✔
4615
                if vErr == nil {
27✔
4616
                        return err
3✔
4617
                }
3✔
4618
                return &types.EntityNotExistsError{Message: "Workflow execution already completed."}
21✔
4619
        default:
42✔
4620
                return err
42✔
4621
        }
4622
}
4623
func constructRestartWorkflowRequest(w *types.WorkflowExecutionStartedEventAttributes, domain string, identity string, workflowID string) *types.StartWorkflowExecutionRequest {
1✔
4624

1✔
4625
        startRequest := &types.StartWorkflowExecutionRequest{
1✔
4626
                RequestID:  uuid.New(),
1✔
4627
                Domain:     domain,
1✔
4628
                WorkflowID: workflowID,
1✔
4629
                WorkflowType: &types.WorkflowType{
1✔
4630
                        Name: w.WorkflowType.Name,
1✔
4631
                },
1✔
4632
                TaskList: &types.TaskList{
1✔
4633
                        Name: w.TaskList.Name,
1✔
4634
                },
1✔
4635
                Input:                               w.Input,
1✔
4636
                ExecutionStartToCloseTimeoutSeconds: w.ExecutionStartToCloseTimeoutSeconds,
1✔
4637
                TaskStartToCloseTimeoutSeconds:      w.TaskStartToCloseTimeoutSeconds,
1✔
4638
                Identity:                            identity,
1✔
4639
                WorkflowIDReusePolicy:               types.WorkflowIDReusePolicyTerminateIfRunning.Ptr(),
1✔
4640
        }
1✔
4641
        startRequest.CronSchedule = w.CronSchedule
1✔
4642
        startRequest.RetryPolicy = w.RetryPolicy
1✔
4643
        startRequest.DelayStartSeconds = w.FirstDecisionTaskBackoffSeconds
1✔
4644
        startRequest.Header = w.Header
1✔
4645
        startRequest.Memo = w.Memo
1✔
4646
        startRequest.SearchAttributes = w.SearchAttributes
1✔
4647

1✔
4648
        return startRequest
1✔
4649
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc