• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.24
/service/frontend/workflowHandler.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package frontend
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "errors"
27
        "fmt"
28
        "sync/atomic"
29
        "time"
30

31
        "github.com/pborman/uuid"
32
        "go.uber.org/yarpc"
33
        "go.uber.org/yarpc/yarpcerrors"
34
        "golang.org/x/sync/errgroup"
35

36
        "github.com/uber/cadence/common"
37
        "github.com/uber/cadence/common/archiver"
38
        "github.com/uber/cadence/common/backoff"
39
        "github.com/uber/cadence/common/cache"
40
        "github.com/uber/cadence/common/client"
41
        "github.com/uber/cadence/common/domain"
42
        "github.com/uber/cadence/common/elasticsearch/validator"
43
        "github.com/uber/cadence/common/log"
44
        "github.com/uber/cadence/common/log/tag"
45
        "github.com/uber/cadence/common/metrics"
46
        "github.com/uber/cadence/common/partition"
47
        "github.com/uber/cadence/common/persistence"
48
        persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
49
        "github.com/uber/cadence/common/quotas"
50
        "github.com/uber/cadence/common/resource"
51
        "github.com/uber/cadence/common/service"
52
        "github.com/uber/cadence/common/types"
53
)
54

55
const (
56
        getDomainReplicationMessageBatchSize = 100
57
        defaultLastMessageID                 = int64(-1)
58
)
59

60
const (
61
        // HealthStatusOK is used when this node is healthy and rpc requests are allowed
62
        HealthStatusOK HealthStatus = iota + 1
63
        // HealthStatusWarmingUp is used when the rpc handler is warming up
64
        HealthStatusWarmingUp
65
        // HealthStatusShuttingDown is used when the rpc handler is shutting down
66
        HealthStatusShuttingDown
67
)
68

69
var _ Handler = (*WorkflowHandler)(nil)
70

71
// ratelimitType differentiates between the three categories of ratelimiters
72
type ratelimitType int
73

74
const (
75
        ratelimitTypeUser ratelimitType = iota + 1
76
        ratelimitTypeWorker
77
        ratelimitTypeVisibility
78
)
79

80
type (
81
        // WorkflowHandler - Thrift handler interface for workflow service
82
        WorkflowHandler struct {
83
                resource.Resource
84

85
                shuttingDown              int32
86
                healthStatus              int32
87
                tokenSerializer           common.TaskTokenSerializer
88
                userRateLimiter           quotas.Policy
89
                workerRateLimiter         quotas.Policy
90
                visibilityRateLimiter     quotas.Policy
91
                config                    *Config
92
                versionChecker            client.VersionChecker
93
                domainHandler             domain.Handler
94
                visibilityQueryValidator  *validator.VisibilityQueryValidator
95
                searchAttributesValidator *validator.SearchAttributesValidator
96
                throttleRetry             *backoff.ThrottleRetry
97
        }
98

99
        getHistoryContinuationToken struct {
100
                RunID             string
101
                FirstEventID      int64
102
                NextEventID       int64
103
                IsWorkflowRunning bool
104
                PersistenceToken  []byte
105
                TransientDecision *types.TransientDecisionInfo
106
                BranchToken       []byte
107
        }
108

109
        domainGetter interface {
110
                GetDomain() string
111
        }
112

113
        // HealthStatus is an enum that refers to the rpc handler health status
114
        HealthStatus int32
115
)
116

117
var (
118
        errDomainNotSet                               = &types.BadRequestError{Message: "Domain not set on request."}
119
        errTaskTokenNotSet                            = &types.BadRequestError{Message: "Task token not set on request."}
120
        errInvalidTaskToken                           = &types.BadRequestError{Message: "Invalid TaskToken."}
121
        errTaskListNotSet                             = &types.BadRequestError{Message: "TaskList is not set on request."}
122
        errTaskListTypeNotSet                         = &types.BadRequestError{Message: "TaskListType is not set on request."}
123
        errExecutionNotSet                            = &types.BadRequestError{Message: "Execution is not set on request."}
124
        errWorkflowIDNotSet                           = &types.BadRequestError{Message: "WorkflowId is not set on request."}
125
        errActivityIDNotSet                           = &types.BadRequestError{Message: "ActivityID is not set on request."}
126
        errSignalNameNotSet                           = &types.BadRequestError{Message: "SignalName is not set on request."}
127
        errInvalidRunID                               = &types.BadRequestError{Message: "Invalid RunId."}
128
        errInvalidNextPageToken                       = &types.BadRequestError{Message: "Invalid NextPageToken."}
129
        errNextPageTokenRunIDMismatch                 = &types.BadRequestError{Message: "RunID in the request does not match the NextPageToken."}
130
        errQueryNotSet                                = &types.BadRequestError{Message: "WorkflowQuery is not set on request."}
131
        errQueryTypeNotSet                            = &types.BadRequestError{Message: "QueryType is not set on request."}
132
        errRequestNotSet                              = &types.BadRequestError{Message: "Request is nil."}
133
        errNoPermission                               = &types.BadRequestError{Message: "No permission to do this operation."}
134
        errRequestIDNotSet                            = &types.BadRequestError{Message: "RequestId is not set on request."}
135
        errWorkflowTypeNotSet                         = &types.BadRequestError{Message: "WorkflowType is not set on request."}
136
        errInvalidRetention                           = &types.BadRequestError{Message: "RetentionDays is invalid."}
137
        errInvalidExecutionStartToCloseTimeoutSeconds = &types.BadRequestError{Message: "A valid ExecutionStartToCloseTimeoutSeconds is not set on request."}
138
        errInvalidTaskStartToCloseTimeoutSeconds      = &types.BadRequestError{Message: "A valid TaskStartToCloseTimeoutSeconds is not set on request."}
139
        errInvalidDelayStartSeconds                   = &types.BadRequestError{Message: "A valid DelayStartSeconds is not set on request."}
140
        errInvalidJitterStartSeconds                  = &types.BadRequestError{Message: "A valid JitterStartSeconds is not set on request (negative)."}
141
        errInvalidJitterStartSeconds2                 = &types.BadRequestError{Message: "A valid JitterStartSeconds is not set on request (larger than cron duration)."}
142
        errQueryDisallowedForDomain                   = &types.BadRequestError{Message: "Domain is not allowed to query, please contact cadence team to re-enable queries."}
143
        errClusterNameNotSet                          = &types.BadRequestError{Message: "Cluster name is not set."}
144
        errEmptyReplicationInfo                       = &types.BadRequestError{Message: "Replication task info is not set."}
145
        errEmptyQueueType                             = &types.BadRequestError{Message: "Queue type is not set."}
146
        errDomainInLockdown                           = &types.BadRequestError{Message: "Domain is not accepting fail overs at this time due to lockdown."}
147
        errShuttingDown                               = &types.InternalServiceError{Message: "Shutting down"}
148

149
        // err for archival
150
        errHistoryNotFound = &types.BadRequestError{Message: "Requested workflow history not found, may have passed retention period."}
151

152
        // err for string too long
153
        errDomainTooLong       = &types.BadRequestError{Message: "Domain length exceeds limit."}
154
        errWorkflowTypeTooLong = &types.BadRequestError{Message: "WorkflowType length exceeds limit."}
155
        errWorkflowIDTooLong   = &types.BadRequestError{Message: "WorkflowID length exceeds limit."}
156
        errSignalNameTooLong   = &types.BadRequestError{Message: "SignalName length exceeds limit."}
157
        errTaskListTooLong     = &types.BadRequestError{Message: "TaskList length exceeds limit."}
158
        errRequestIDTooLong    = &types.BadRequestError{Message: "RequestID length exceeds limit."}
159
        errIdentityTooLong     = &types.BadRequestError{Message: "Identity length exceeds limit."}
160

161
        frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
162
)
163

164
// NewWorkflowHandler creates a thrift handler for the cadence service
165
func NewWorkflowHandler(
166
        resource resource.Resource,
167
        config *Config,
168
        versionChecker client.VersionChecker,
169
        domainHandler domain.Handler,
170
) *WorkflowHandler {
102✔
171
        return &WorkflowHandler{
102✔
172
                Resource:        resource,
102✔
173
                config:          config,
102✔
174
                healthStatus:    int32(HealthStatusWarmingUp),
102✔
175
                tokenSerializer: common.NewJSONTaskTokenSerializer(),
102✔
176
                userRateLimiter: quotas.NewMultiStageRateLimiter(
102✔
177
                        quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()),
102✔
178
                        quotas.NewCollection(func(domain string) quotas.Limiter {
140✔
179
                                return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic(
38✔
180
                                        service.Frontend,
38✔
181
                                        config.GlobalDomainUserRPS.AsFloat64(domain),
38✔
182
                                        config.MaxDomainUserRPSPerInstance.AsFloat64(domain),
38✔
183
                                        resource.GetMembershipResolver(),
38✔
184
                                ))
38✔
185
                        }),
38✔
186
                ),
187
                workerRateLimiter: quotas.NewMultiStageRateLimiter(
188
                        quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()),
189
                        quotas.NewCollection(func(domain string) quotas.Limiter {
26✔
190
                                return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic(
26✔
191
                                        service.Frontend,
26✔
192
                                        config.GlobalDomainWorkerRPS.AsFloat64(domain),
26✔
193
                                        config.MaxDomainWorkerRPSPerInstance.AsFloat64(domain),
26✔
194
                                        resource.GetMembershipResolver(),
26✔
195
                                ))
26✔
196
                        }),
26✔
197
                ),
198
                visibilityRateLimiter: quotas.NewMultiStageRateLimiter(
199
                        quotas.NewDynamicRateLimiter(config.VisibilityRPS.AsFloat64()),
200
                        quotas.NewCollection(func(domain string) quotas.Limiter {
20✔
201
                                return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic(
20✔
202
                                        service.Frontend,
20✔
203
                                        config.GlobalDomainVisibilityRPS.AsFloat64(domain),
20✔
204
                                        config.MaxDomainVisibilityRPSPerInstance.AsFloat64(domain),
20✔
205
                                        resource.GetMembershipResolver(),
20✔
206
                                ))
20✔
207
                        }),
20✔
208
                ),
209
                versionChecker: versionChecker,
210
                domainHandler:  domainHandler,
211
                visibilityQueryValidator: validator.NewQueryValidator(
212
                        config.ValidSearchAttributes,
213
                        config.EnableQueryAttributeValidation,
214
                ),
215
                searchAttributesValidator: validator.NewSearchAttributesValidator(
216
                        resource.GetLogger(),
217
                        config.EnableQueryAttributeValidation,
218
                        config.ValidSearchAttributes,
219
                        config.SearchAttributesNumberOfKeysLimit,
220
                        config.SearchAttributesSizeOfValueLimit,
221
                        config.SearchAttributesTotalSizeLimit,
222
                ),
223
                throttleRetry: backoff.NewThrottleRetry(
224
                        backoff.WithRetryPolicy(frontendServiceRetryPolicy),
225
                        backoff.WithRetryableError(common.IsServiceTransientError),
226
                ),
227
        }
228
}
229

230
// Start starts the handler
231
func (wh *WorkflowHandler) Start() {
15✔
232
        // TODO: Get warmup duration from config. Even better, run proactive checks such as probing downstream connections.
15✔
233
        const warmUpDuration = 30 * time.Second
15✔
234

15✔
235
        warmupTimer := time.NewTimer(warmUpDuration)
15✔
236
        go func() {
30✔
237
                <-warmupTimer.C
15✔
238
                wh.GetLogger().Warn("Service warmup duration has elapsed.")
15✔
239
                if atomic.CompareAndSwapInt32(&wh.healthStatus, int32(HealthStatusWarmingUp), int32(HealthStatusOK)) {
27✔
240
                        wh.GetLogger().Warn("Warmup time has elapsed. Service is healthy.")
12✔
241
                } else {
12✔
242
                        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
×
243
                        wh.GetLogger().Warn(fmt.Sprintf("Warmup time has elapsed. Service status is: %v", status.String()))
×
244
                }
×
245
        }()
246
}
247

248
// Stop stops the handler
249
func (wh *WorkflowHandler) Stop() {
15✔
250
        atomic.StoreInt32(&wh.shuttingDown, 1)
15✔
251
}
15✔
252

253
// UpdateHealthStatus sets the health status for this rpc handler.
254
// This health status will be used within the rpc health check handler
255
func (wh *WorkflowHandler) UpdateHealthStatus(status HealthStatus) {
15✔
256
        atomic.StoreInt32(&wh.healthStatus, int32(status))
15✔
257
}
15✔
258

259
func (wh *WorkflowHandler) isShuttingDown() bool {
6,532✔
260
        return atomic.LoadInt32(&wh.shuttingDown) != 0
6,532✔
261
}
6,532✔
262

263
// Health is for health check
264
func (wh *WorkflowHandler) Health(ctx context.Context) (*types.HealthStatus, error) {
×
265
        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
×
266
        msg := status.String()
×
267

×
268
        if status != HealthStatusOK {
×
269
                wh.GetLogger().Warn(fmt.Sprintf("Service status is: %v", msg))
×
270
        }
×
271

272
        return &types.HealthStatus{
×
273
                Ok:  status == HealthStatusOK,
×
274
                Msg: msg,
×
275
        }, nil
×
276
}
277

278
// RegisterDomain creates a new domain which can be used as a container for all resources.  Domain is a top level
279
// entity within Cadence, used as a container for all resources like workflow executions, tasklists, etc.  Domain
280
// acts as a sandbox and provides isolation for all resources within the domain.  All resources belongs to exactly one
281
// domain.
282
func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *types.RegisterDomainRequest) (retError error) {
45✔
283
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
90✔
284

285
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendRegisterDomainScope)
45✔
286
        defer sw.Stop()
45✔
287

45✔
288
        if wh.isShuttingDown() {
45✔
289
                return errShuttingDown
×
290
        }
×
291

292
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
45✔
293
                return wh.error(err, scope)
×
294
        }
×
295

296
        if registerRequest == nil {
45✔
297
                return errRequestNotSet
×
298
        }
×
299

300
        if registerRequest.GetWorkflowExecutionRetentionPeriodInDays() > int32(wh.config.domainConfig.MaxRetentionDays()) {
45✔
301
                return errInvalidRetention
×
302
        }
×
303

304
        if err := checkPermission(wh.config, registerRequest.SecurityToken); err != nil {
45✔
305
                return err
×
306
        }
×
307

308
        if err := checkRequiredDomainDataKVs(wh.config.domainConfig.RequiredDomainDataKeys(), registerRequest.GetData()); err != nil {
46✔
309
                return err
1✔
310
        }
1✔
311

312
        if registerRequest.GetName() == "" {
44✔
313
                return errDomainNotSet
×
314
        }
×
315

316
        err := wh.domainHandler.RegisterDomain(ctx, registerRequest)
44✔
317
        if err != nil {
45✔
318
                return wh.error(err, scope)
1✔
319
        }
1✔
320
        return nil
43✔
321
}
322

323
// ListDomains returns the information and configuration for a registered domain.
324
func (wh *WorkflowHandler) ListDomains(
325
        ctx context.Context,
326
        listRequest *types.ListDomainsRequest,
327
) (response *types.ListDomainsResponse, retError error) {
×
328
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
329

330
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendListDomainsScope)
×
331
        defer sw.Stop()
×
332

×
333
        if wh.isShuttingDown() {
×
334
                return nil, errShuttingDown
×
335
        }
×
336

337
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
338
                return nil, wh.error(err, scope)
×
339
        }
×
340

341
        if listRequest == nil {
×
342
                return nil, errRequestNotSet
×
343
        }
×
344

345
        resp, err := wh.domainHandler.ListDomains(ctx, listRequest)
×
346
        if err != nil {
×
347
                return resp, wh.error(err, scope)
×
348
        }
×
349
        return resp, err
×
350
}
351

352
// DescribeDomain returns the information and configuration for a registered domain.
353
func (wh *WorkflowHandler) DescribeDomain(
354
        ctx context.Context,
355
        describeRequest *types.DescribeDomainRequest,
356
) (response *types.DescribeDomainResponse, retError error) {
134✔
357
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
268✔
358

359
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendDescribeDomainScope)
134✔
360
        defer sw.Stop()
134✔
361

134✔
362
        if wh.isShuttingDown() {
134✔
363
                return nil, errShuttingDown
×
364
        }
×
365

366
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
134✔
367
                return nil, wh.error(err, scope)
×
368
        }
×
369

370
        if describeRequest == nil {
134✔
371
                return nil, errRequestNotSet
×
372
        }
×
373

374
        if describeRequest.GetName() == "" && describeRequest.GetUUID() == "" {
134✔
375
                return nil, errDomainNotSet
×
376
        }
×
377

378
        resp, err := wh.domainHandler.DescribeDomain(ctx, describeRequest)
134✔
379
        if err != nil {
134✔
380
                return resp, wh.error(err, scope)
×
381
        }
×
382

383
        if resp.GetFailoverInfo() != nil && resp.GetFailoverInfo().GetFailoverExpireTimestamp() > 0 {
134✔
384
                // fetch ongoing failover info from history service
×
385
                failoverResp, err := wh.GetHistoryClient().GetFailoverInfo(ctx, &types.GetFailoverInfoRequest{
×
386
                        DomainID: resp.GetDomainInfo().UUID,
×
387
                })
×
388
                if err != nil {
×
389
                        // despite the error from history, return describe domain response
×
390
                        wh.GetLogger().Error(
×
391
                                fmt.Sprintf("Failed to get failover info for domain %s", resp.DomainInfo.GetName()),
×
392
                                tag.Error(err),
×
393
                        )
×
394
                        return resp, nil
×
395
                }
×
396
                resp.FailoverInfo.CompletedShardCount = failoverResp.GetCompletedShardCount()
×
397
                resp.FailoverInfo.PendingShards = failoverResp.GetPendingShards()
×
398
        }
399
        return resp, err
134✔
400
}
401

402
// UpdateDomain is used to update the information and configuration for a registered domain.
403
func (wh *WorkflowHandler) UpdateDomain(
404
        ctx context.Context,
405
        updateRequest *types.UpdateDomainRequest,
406
) (resp *types.UpdateDomainResponse, retError error) {
9✔
407
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
18✔
408

409
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendUpdateDomainScope)
9✔
410
        defer sw.Stop()
9✔
411

9✔
412
        domainName := ""
9✔
413
        if updateRequest != nil {
18✔
414
                domainName = updateRequest.GetName()
9✔
415
        }
9✔
416

417
        logger := wh.GetLogger().WithTags(
9✔
418
                tag.WorkflowDomainName(domainName),
9✔
419
                tag.OperationName("DomainUpdate"))
9✔
420

9✔
421
        if updateRequest == nil {
9✔
422
                logger.Error("Nil domain update request.",
×
423
                        tag.Error(errRequestNotSet))
×
424
                return nil, errRequestNotSet
×
425
        }
×
426

427
        isFailover := isFailoverRequest(updateRequest)
9✔
428
        isGraceFailover := isGraceFailoverRequest(updateRequest)
9✔
429
        logger.Info(fmt.Sprintf(
9✔
430
                "Domain Update requested. isFailover: %v, isGraceFailover: %v, Request: %#v.",
9✔
431
                isFailover,
9✔
432
                isGraceFailover,
9✔
433
                updateRequest))
9✔
434

9✔
435
        if wh.isShuttingDown() {
9✔
436
                logger.Error("Won't apply the domain update since workflowHandler is shutting down.",
×
437
                        tag.Error(errShuttingDown))
×
438
                return nil, errShuttingDown
×
439
        }
×
440

441
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
9✔
442
                logger.Error("Won't apply the domain update since client version is not supported.",
×
443
                        tag.Error(err))
×
444
                return nil, wh.error(err, scope)
×
445
        }
×
446

447
        // don't require permission for failover request
448
        if isFailover {
11✔
449
                // reject the failover if the cluster is in lockdown
2✔
450
                if err := checkFailOverPermission(wh.config, updateRequest.GetName()); err != nil {
3✔
451
                        logger.Error("Domain failover request rejected since domain is in lockdown.",
1✔
452
                                tag.Error(err))
1✔
453
                        return nil, err
1✔
454
                }
1✔
455
        } else {
7✔
456
                if err := checkPermission(wh.config, updateRequest.SecurityToken); err != nil {
7✔
457
                        logger.Error("Domain update request rejected due to failing permissions.",
×
458
                                tag.Error(err))
×
459
                        return nil, err
×
460
                }
×
461
        }
462

463
        if isGraceFailover {
9✔
464
                if err := wh.checkOngoingFailover(
1✔
465
                        ctx,
1✔
466
                        &updateRequest.Name,
1✔
467
                ); err != nil {
1✔
468
                        logger.Error("Graceful domain failover request failed. Not able to check ongoing failovers.",
×
469
                                tag.Error(err))
×
470
                        return nil, err
×
471
                }
×
472
        }
473

474
        if updateRequest.GetName() == "" {
8✔
475
                logger.Error("Domain not set on request.",
×
476
                        tag.Error(errDomainNotSet))
×
477
                return nil, errDomainNotSet
×
478
        }
×
479
        // TODO: call remote clusters to verify domain data
480
        resp, err := wh.domainHandler.UpdateDomain(ctx, updateRequest)
8✔
481
        if err != nil {
10✔
482
                logger.Error("Domain update operation failed.",
2✔
483
                        tag.Error(err))
2✔
484
                return resp, wh.error(err, scope)
2✔
485
        }
2✔
486
        logger.Info("Domain update operation succeeded.")
6✔
487
        return resp, err
6✔
488
}
489

490
// DeprecateDomain us used to update status of a registered domain to DEPRECATED. Once the domain is deprecated
491
// it cannot be used to start new workflow executions.  Existing workflow executions will continue to run on
492
// deprecated domains.
493
func (wh *WorkflowHandler) DeprecateDomain(ctx context.Context, deprecateRequest *types.DeprecateDomainRequest) (retError error) {
×
494
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
495

496
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendDeprecateDomainScope)
×
497
        defer sw.Stop()
×
498

×
499
        if wh.isShuttingDown() {
×
500
                return errShuttingDown
×
501
        }
×
502

503
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
504
                return wh.error(err, scope)
×
505
        }
×
506

507
        if deprecateRequest == nil {
×
508
                return errRequestNotSet
×
509
        }
×
510

511
        if err := checkPermission(wh.config, deprecateRequest.SecurityToken); err != nil {
×
512
                return err
×
513
        }
×
514

515
        if deprecateRequest.GetName() == "" {
×
516
                return errDomainNotSet
×
517
        }
×
518

519
        err := wh.domainHandler.DeprecateDomain(ctx, deprecateRequest)
×
520
        if err != nil {
×
521
                return wh.error(err, scope)
×
522
        }
×
523
        return err
×
524
}
525

526
// PollForActivityTask - Poll for an activity task.
527
func (wh *WorkflowHandler) PollForActivityTask(
528
        ctx context.Context,
529
        pollRequest *types.PollForActivityTaskRequest,
530
) (resp *types.PollForActivityTaskResponse, retError error) {
753✔
531
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
1,506✔
532

533
        callTime := time.Now()
753✔
534

753✔
535
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendPollForActivityTaskScope, pollRequest)
753✔
536
        defer sw.Stop()
753✔
537

753✔
538
        if wh.isShuttingDown() {
753✔
539
                return nil, errShuttingDown
×
540
        }
×
541

542
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
753✔
543
                return nil, wh.error(err, scope)
×
544
        }
×
545

546
        if pollRequest == nil {
753✔
547
                return nil, wh.error(errRequestNotSet, scope)
×
548
        }
×
549

550
        domainName := pollRequest.GetDomain()
753✔
551
        tags := getDomainWfIDRunIDTags(domainName, nil)
753✔
552

753✔
553
        if domainName == "" {
753✔
554
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
555
        }
×
556

557
        wh.GetLogger().Debug("Received PollForActivityTask")
753✔
558
        if err := common.ValidateLongPollContextTimeout(
753✔
559
                ctx,
753✔
560
                "PollForActivityTask",
753✔
561
                wh.GetThrottledLogger(),
753✔
562
        ); err != nil {
755✔
563
                return nil, wh.error(err, scope, tags...)
2✔
564
        }
2✔
565

566
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
751✔
567
        if !common.ValidIDLength(
751✔
568
                domainName,
751✔
569
                scope,
751✔
570
                idLengthWarnLimit,
751✔
571
                wh.config.DomainNameMaxLength(domainName),
751✔
572
                metrics.CadenceErrDomainNameExceededWarnLimit,
751✔
573
                domainName,
751✔
574
                wh.GetLogger(),
751✔
575
                tag.IDTypeDomainName) {
751✔
576
                return nil, wh.error(errDomainTooLong, scope, tags...)
×
577
        }
×
578

579
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
751✔
580
                return nil, wh.error(err, scope, tags...)
×
581
        }
×
582

583
        if !common.ValidIDLength(
751✔
584
                pollRequest.GetIdentity(),
751✔
585
                scope,
751✔
586
                idLengthWarnLimit,
751✔
587
                wh.config.IdentityMaxLength(domainName),
751✔
588
                metrics.CadenceErrIdentityExceededWarnLimit,
751✔
589
                domainName,
751✔
590
                wh.GetLogger(),
751✔
591
                tag.IDTypeIdentity) {
751✔
592
                return nil, wh.error(errIdentityTooLong, scope, tags...)
×
593
        }
×
594

595
        if ok := wh.allow(ratelimitTypeWorker, pollRequest); !ok {
751✔
596
                // pollers exponentially back off up to 10s
×
597
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
598
        }
×
599

600
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
751✔
601
        if err != nil {
1,053✔
602
                return nil, wh.error(err, scope, tags...)
302✔
603
        }
302✔
604

605
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
449✔
606
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
450✔
607
                return &types.PollForActivityTaskResponse{}, nil
1✔
608
        }
1✔
609
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
610
        // in this case, return an empty response
611
        if err := common.ValidateLongPollContextTimeout(
448✔
612
                ctx,
448✔
613
                "PollForActivityTask",
448✔
614
                wh.GetThrottledLogger(),
448✔
615
        ); err != nil {
448✔
616
                return &types.PollForActivityTaskResponse{}, nil
×
617
        }
×
618
        pollerID := uuid.New()
448✔
619
        op := func() error {
896✔
620
                resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{
448✔
621
                        DomainUUID:     domainID,
448✔
622
                        PollerID:       pollerID,
448✔
623
                        PollRequest:    pollRequest,
448✔
624
                        IsolationGroup: isolationGroup,
448✔
625
                })
448✔
626
                return err
448✔
627
        }
448✔
628

629
        err = wh.throttleRetry.Do(ctx, op)
448✔
630
        if err != nil {
514✔
631
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeActivity, pollRequest.TaskList, pollerID)
66✔
632
                if err != nil {
66✔
633
                        // For all other errors log an error and return it back to client.
×
634
                        ctxTimeout := "not-set"
×
635
                        ctxDeadline, ok := ctx.Deadline()
×
636
                        if ok {
×
637
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
638
                        }
×
639
                        wh.GetLogger().Error("PollForActivityTask failed.",
×
640
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
641
                                tag.Value(ctxTimeout),
×
642
                                tag.Error(err))
×
643
                        return nil, wh.error(err, scope, tags...)
×
644
                }
645
        }
646
        return resp, nil
448✔
647
}
648

649
// PollForDecisionTask - Poll for a decision task.
650
func (wh *WorkflowHandler) PollForDecisionTask(
651
        ctx context.Context,
652
        pollRequest *types.PollForDecisionTaskRequest,
653
) (resp *types.PollForDecisionTaskResponse, retError error) {
1,536✔
654
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
3,072✔
655

656
        callTime := time.Now()
1,536✔
657

1,536✔
658
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendPollForDecisionTaskScope, pollRequest)
1,536✔
659
        defer sw.Stop()
1,536✔
660

1,536✔
661
        if wh.isShuttingDown() {
1,536✔
662
                return nil, errShuttingDown
×
663
        }
×
664

665
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1,536✔
666
                return nil, wh.error(err, scope)
×
667
        }
×
668

669
        if pollRequest == nil {
1,536✔
670
                return nil, wh.error(errRequestNotSet, scope)
×
671
        }
×
672

673
        domainName := pollRequest.GetDomain()
1,536✔
674
        tags := getDomainWfIDRunIDTags(domainName, nil)
1,536✔
675

1,536✔
676
        if domainName == "" {
1,536✔
677
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
678
        }
×
679

680
        wh.GetLogger().Debug("Received PollForDecisionTask")
1,536✔
681
        if err := common.ValidateLongPollContextTimeout(
1,536✔
682
                ctx,
1,536✔
683
                "PollForDecisionTask",
1,536✔
684
                wh.GetThrottledLogger(),
1,536✔
685
        ); err != nil {
1,538✔
686
                return nil, wh.error(err, scope, tags...)
2✔
687
        }
2✔
688

689
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
1,534✔
690
        if !common.ValidIDLength(
1,534✔
691
                domainName,
1,534✔
692
                scope,
1,534✔
693
                idLengthWarnLimit,
1,534✔
694
                wh.config.DomainNameMaxLength(domainName),
1,534✔
695
                metrics.CadenceErrDomainNameExceededWarnLimit,
1,534✔
696
                domainName,
1,534✔
697
                wh.GetLogger(),
1,534✔
698
                tag.IDTypeDomainName) {
1,534✔
699
                return nil, wh.error(errDomainTooLong, scope, tags...)
×
700
        }
×
701

702
        if !common.ValidIDLength(
1,534✔
703
                pollRequest.GetIdentity(),
1,534✔
704
                scope,
1,534✔
705
                idLengthWarnLimit,
1,534✔
706
                wh.config.IdentityMaxLength(domainName),
1,534✔
707
                metrics.CadenceErrIdentityExceededWarnLimit,
1,534✔
708
                domainName,
1,534✔
709
                wh.GetLogger(),
1,534✔
710
                tag.IDTypeIdentity) {
1,534✔
711
                return nil, wh.error(errIdentityTooLong, scope, tags...)
×
712
        }
×
713

714
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
1,534✔
715
                return nil, wh.error(err, scope, tags...)
×
716
        }
×
717

718
        domainEntry, err := wh.GetDomainCache().GetDomain(domainName)
1,534✔
719
        if err != nil {
1,837✔
720
                return nil, wh.error(err, scope, tags...)
303✔
721
        }
303✔
722
        domainID := domainEntry.GetInfo().ID
1,231✔
723

1,231✔
724
        wh.GetLogger().Debug("Poll for decision.", tag.WorkflowDomainName(domainName), tag.WorkflowDomainID(domainID))
1,231✔
725
        if err := wh.checkBadBinary(domainEntry, pollRequest.GetBinaryChecksum()); err != nil {
1,231✔
726
                return nil, wh.error(err, scope, tags...)
×
727
        }
×
728

729
        if ok := wh.allow(ratelimitTypeWorker, pollRequest); !ok {
1,231✔
730
                // pollers exponentially back off up to 10s
×
731
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
732
        }
×
733

734
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
1,231✔
735
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
1,232✔
736
                return &types.PollForDecisionTaskResponse{}, nil
1✔
737
        }
1✔
738
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
739
        // in this case, return an empty response
740
        if err := common.ValidateLongPollContextTimeout(
1,230✔
741
                ctx,
1,230✔
742
                "PollForDecisionTask",
1,230✔
743
                wh.GetThrottledLogger(),
1,230✔
744
        ); err != nil {
1,230✔
745
                return &types.PollForDecisionTaskResponse{}, nil
×
746
        }
×
747

748
        pollerID := uuid.New()
1,230✔
749
        var matchingResp *types.MatchingPollForDecisionTaskResponse
1,230✔
750
        op := func() error {
2,460✔
751
                matchingResp, err = wh.GetMatchingClient().PollForDecisionTask(ctx, &types.MatchingPollForDecisionTaskRequest{
1,230✔
752
                        DomainUUID:     domainID,
1,230✔
753
                        PollerID:       pollerID,
1,230✔
754
                        PollRequest:    pollRequest,
1,230✔
755
                        IsolationGroup: isolationGroup,
1,230✔
756
                })
1,230✔
757
                return err
1,230✔
758
        }
1,230✔
759

760
        err = wh.throttleRetry.Do(ctx, op)
1,230✔
761
        if err != nil {
1,296✔
762
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeDecision, pollRequest.TaskList, pollerID)
66✔
763
                if err != nil {
66✔
764
                        // For all other errors log an error and return it back to client.
×
765
                        ctxTimeout := "not-set"
×
766
                        ctxDeadline, ok := ctx.Deadline()
×
767
                        if ok {
×
768
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
769
                        }
×
770
                        wh.GetLogger().Error("PollForDecisionTask failed.",
×
771
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
772
                                tag.Value(ctxTimeout),
×
773
                                tag.Error(err))
×
774
                        return nil, wh.error(err, scope)
×
775
                }
776

777
                // Must be cancellation error.  Does'nt matter what we return here.  Client already went away.
778
                return nil, nil
66✔
779
        }
780

781
        tags = append(tags, []tag.Tag{tag.WorkflowID(
1,164✔
782
                matchingResp.GetWorkflowExecution().GetWorkflowID()),
1,164✔
783
                tag.WorkflowRunID(matchingResp.GetWorkflowExecution().GetRunID())}...)
1,164✔
784
        resp, err = wh.createPollForDecisionTaskResponse(ctx, scope, domainID, matchingResp, matchingResp.GetBranchToken())
1,164✔
785
        if err != nil {
1,164✔
786
                return nil, wh.error(err, scope, tags...)
×
787
        }
×
788
        return resp, nil
1,164✔
789
}
790

791
func (wh *WorkflowHandler) getIsolationGroup(ctx context.Context, domainName string) string {
2,883✔
792
        if wh.config.EnableTasklistIsolation(domainName) {
2,887✔
793
                return partition.IsolationGroupFromContext(ctx)
4✔
794
        }
4✔
795
        return ""
2,879✔
796
}
797

798
func (wh *WorkflowHandler) getPartitionConfig(ctx context.Context, domainName string) map[string]string {
478✔
799
        if wh.config.EnableTasklistIsolation(domainName) {
478✔
800
                return partition.ConfigFromContext(ctx)
×
801
        }
×
802
        return nil
478✔
803
}
804

805
func (wh *WorkflowHandler) isIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,203✔
806
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,205✔
807
                isDrained, err := wh.GetIsolationGroupState().IsDrained(ctx, domainName, isolationGroup)
2✔
808
                if err != nil {
2✔
809
                        wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
810
                        return true
×
811
                }
×
812
                return !isDrained
2✔
813
        }
814
        return true
1,201✔
815
}
816

817
func (wh *WorkflowHandler) waitUntilIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,680✔
818
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,682✔
819
                ticker := time.NewTicker(time.Second * 30)
2✔
820
                defer ticker.Stop()
2✔
821
                childCtx, cancel := common.CreateChildContext(ctx, 0.05)
2✔
822
                defer cancel()
2✔
823
                for {
4✔
824
                        isDrained, err := wh.GetIsolationGroupState().IsDrained(childCtx, domainName, isolationGroup)
2✔
825
                        if err != nil {
2✔
826
                                wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
827
                                return true
×
828
                        }
×
829
                        if !isDrained {
2✔
830
                                break
×
831
                        }
832
                        select {
2✔
833
                        case <-childCtx.Done():
2✔
834
                                return false
2✔
835
                        case <-ticker.C:
×
836
                        }
837
                }
838
        }
839
        return true
1,678✔
840
}
841

842
func (wh *WorkflowHandler) checkBadBinary(domainEntry *cache.DomainCacheEntry, binaryChecksum string) error {
1,231✔
843
        if domainEntry.GetConfig().BadBinaries.Binaries != nil {
2,461✔
844
                badBinaries := domainEntry.GetConfig().BadBinaries.Binaries
1,230✔
845
                _, ok := badBinaries[binaryChecksum]
1,230✔
846
                if ok {
1,230✔
847
                        wh.GetMetricsClient().IncCounter(metrics.FrontendPollForDecisionTaskScope, metrics.CadenceErrBadBinaryCounter)
×
848
                        return &types.BadRequestError{
×
849
                                Message: fmt.Sprintf("binary %v already marked as bad deployment", binaryChecksum),
×
850
                        }
×
851
                }
×
852
        }
853
        return nil
1,231✔
854
}
855

856
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error, domainID string, taskListType int32,
857
        taskList *types.TaskList, pollerID string) error {
132✔
858
        // First check if this err is due to context cancellation.  This means client connection to frontend is closed.
132✔
859
        if ctx.Err() == context.Canceled {
264✔
860
                // Our rpc stack does not propagates context cancellation to the other service.  Lets make an explicit
132✔
861
                // call to matching to notify this poller is gone to prevent any tasks being dispatched to zombie pollers.
132✔
862
                err = wh.GetMatchingClient().CancelOutstandingPoll(context.Background(), &types.CancelOutstandingPollRequest{
132✔
863
                        DomainUUID:   domainID,
132✔
864
                        TaskListType: common.Int32Ptr(taskListType),
132✔
865
                        TaskList:     taskList,
132✔
866
                        PollerID:     pollerID,
132✔
867
                })
132✔
868
                // We can not do much if this call fails.  Just log the error and move on
132✔
869
                if err != nil {
132✔
870
                        wh.GetLogger().Warn("Failed to cancel outstanding poller.",
×
871
                                tag.WorkflowTaskListName(taskList.GetName()), tag.Error(err))
×
872
                }
×
873

874
                // Clear error as we don't want to report context cancellation error to count against our SLA
875
                return nil
132✔
876
        }
877

878
        return err
×
879
}
880

881
// RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
882
func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
883
        ctx context.Context,
884
        heartbeatRequest *types.RecordActivityTaskHeartbeatRequest,
885
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
381✔
886
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
762✔
887

888
        scope := wh.getDefaultScope(ctx, metrics.FrontendRecordActivityTaskHeartbeatScope)
381✔
889

381✔
890
        if wh.isShuttingDown() {
381✔
891
                return nil, errShuttingDown
×
892
        }
×
893

894
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
381✔
895
                return nil, wh.error(err, scope)
×
896
        }
×
897

898
        if heartbeatRequest == nil {
381✔
899
                return nil, wh.error(errRequestNotSet, scope)
×
900
        }
×
901

902
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeat")
381✔
903
        if heartbeatRequest.TaskToken == nil {
381✔
904
                return nil, wh.error(errTaskTokenNotSet, scope)
×
905
        }
×
906
        taskToken, err := wh.tokenSerializer.Deserialize(heartbeatRequest.TaskToken)
381✔
907
        if err != nil {
381✔
908
                return nil, wh.error(err, scope)
×
909
        }
×
910
        if taskToken.DomainID == "" {
381✔
911
                return nil, wh.error(errDomainNotSet, scope)
×
912
        }
×
913

914
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
381✔
915
        if err != nil {
381✔
916
                return nil, wh.error(err, scope)
×
917
        }
×
918

919
        dw := domainWrapper{
381✔
920
                domain: domainName,
381✔
921
        }
381✔
922
        scope, sw := wh.startRequestProfileWithDomain(
381✔
923
                ctx,
381✔
924
                metrics.FrontendRecordActivityTaskHeartbeatScope,
381✔
925
                dw,
381✔
926
        )
381✔
927
        defer sw.Stop()
381✔
928

381✔
929
        // Count the request in the host RPS,
381✔
930
        // but we still accept it even if RPS is exceeded
381✔
931
        wh.allow(ratelimitTypeWorker, dw)
381✔
932

381✔
933
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
381✔
934
                WorkflowID: taskToken.WorkflowID,
381✔
935
                RunID:      taskToken.RunID,
381✔
936
        })
381✔
937

381✔
938
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
381✔
939
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
381✔
940

381✔
941
        if err := common.CheckEventBlobSizeLimit(
381✔
942
                len(heartbeatRequest.Details),
381✔
943
                sizeLimitWarn,
381✔
944
                sizeLimitError,
381✔
945
                taskToken.DomainID,
381✔
946
                taskToken.WorkflowID,
381✔
947
                taskToken.RunID,
381✔
948
                scope,
381✔
949
                wh.GetThrottledLogger(),
381✔
950
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeat"),
381✔
951
        ); err != nil {
381✔
952
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
953
                failRequest := &types.RespondActivityTaskFailedRequest{
×
954
                        TaskToken: heartbeatRequest.TaskToken,
×
955
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
956
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
957
                        Identity:  heartbeatRequest.Identity,
×
958
                }
×
959
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
960
                        DomainUUID:    taskToken.DomainID,
×
961
                        FailedRequest: failRequest,
×
962
                })
×
963
                if err != nil {
×
964
                        return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
965
                }
×
966
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
967
        } else {
381✔
968
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
381✔
969
                        DomainUUID:       taskToken.DomainID,
381✔
970
                        HeartbeatRequest: heartbeatRequest,
381✔
971
                })
381✔
972
                if err != nil {
381✔
973
                        return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
974
                }
×
975
        }
976

977
        return resp, nil
381✔
978
}
979

980
// RecordActivityTaskHeartbeatByID - Record Activity Task Heart beat.
981
func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
982
        ctx context.Context,
983
        heartbeatRequest *types.RecordActivityTaskHeartbeatByIDRequest,
984
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
×
985
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
986

987
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRecordActivityTaskHeartbeatByIDScope, heartbeatRequest)
×
988
        defer sw.Stop()
×
989

×
990
        if wh.isShuttingDown() {
×
991
                return nil, errShuttingDown
×
992
        }
×
993

994
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
995
                return nil, wh.error(err, scope)
×
996
        }
×
997

998
        if heartbeatRequest == nil {
×
999
                return nil, wh.error(errRequestNotSet, scope)
×
1000
        }
×
1001

1002
        domainName := heartbeatRequest.GetDomain()
×
1003
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
×
1004
                WorkflowID: heartbeatRequest.GetWorkflowID(),
×
1005
                RunID:      heartbeatRequest.GetRunID(),
×
1006
        })
×
1007

×
1008
        if domainName == "" {
×
1009
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
1010
        }
×
1011

1012
        // Count the request in the host RPS,
1013
        // but we still accept it even if RPS is exceeded
1014
        wh.allow(ratelimitTypeWorker, heartbeatRequest)
×
1015

×
1016
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID")
×
1017
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1018
        if err != nil {
×
1019
                return nil, wh.error(err, scope, tags...)
×
1020
        }
×
1021
        workflowID := heartbeatRequest.GetWorkflowID()
×
1022
        runID := heartbeatRequest.GetRunID() // runID is optional so can be empty
×
1023
        activityID := heartbeatRequest.GetActivityID()
×
1024

×
1025
        if domainID == "" {
×
1026
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
1027
        }
×
1028
        if workflowID == "" {
×
1029
                return nil, wh.error(errWorkflowIDNotSet, scope, tags...)
×
1030
        }
×
1031
        if activityID == "" {
×
1032
                return nil, wh.error(errActivityIDNotSet, scope, tags...)
×
1033
        }
×
1034

1035
        taskToken := &common.TaskToken{
×
1036
                DomainID:   domainID,
×
1037
                RunID:      runID,
×
1038
                WorkflowID: workflowID,
×
1039
                ScheduleID: common.EmptyEventID,
×
1040
                ActivityID: activityID,
×
1041
        }
×
1042
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1043
        if err != nil {
×
1044
                return nil, wh.error(err, scope, tags...)
×
1045
        }
×
1046

1047
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1048
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1049

×
1050
        if err := common.CheckEventBlobSizeLimit(
×
1051
                len(heartbeatRequest.Details),
×
1052
                sizeLimitWarn,
×
1053
                sizeLimitError,
×
1054
                taskToken.DomainID,
×
1055
                taskToken.WorkflowID,
×
1056
                taskToken.RunID,
×
1057
                scope,
×
1058
                wh.GetThrottledLogger(),
×
1059
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeatByID"),
×
1060
        ); err != nil {
×
1061
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
1062
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1063
                        TaskToken: token,
×
1064
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
1065
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
1066
                        Identity:  heartbeatRequest.Identity,
×
1067
                }
×
1068
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1069
                        DomainUUID:    taskToken.DomainID,
×
1070
                        FailedRequest: failRequest,
×
1071
                })
×
1072
                if err != nil {
×
1073
                        return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1074
                }
×
1075
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
1076
        } else {
×
1077
                req := &types.RecordActivityTaskHeartbeatRequest{
×
1078
                        TaskToken: token,
×
1079
                        Details:   heartbeatRequest.Details,
×
1080
                        Identity:  heartbeatRequest.Identity,
×
1081
                }
×
1082

×
1083
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
×
1084
                        DomainUUID:       taskToken.DomainID,
×
1085
                        HeartbeatRequest: req,
×
1086
                })
×
1087
                if err != nil {
×
1088
                        return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1089
                }
×
1090
        }
1091

1092
        return resp, nil
×
1093
}
1094

1095
// RespondActivityTaskCompleted - response to an activity task
1096
func (wh *WorkflowHandler) RespondActivityTaskCompleted(
1097
        ctx context.Context,
1098
        completeRequest *types.RespondActivityTaskCompletedRequest,
1099
) (retError error) {
246✔
1100
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
492✔
1101

1102
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondActivityTaskCompletedScope)
246✔
1103

246✔
1104
        if wh.isShuttingDown() {
246✔
1105
                return errShuttingDown
×
1106
        }
×
1107

1108
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
246✔
1109
                return wh.error(err, scope)
×
1110
        }
×
1111

1112
        if completeRequest == nil {
246✔
1113
                return wh.error(errRequestNotSet, scope)
×
1114
        }
×
1115

1116
        if completeRequest.TaskToken == nil {
246✔
1117
                return wh.error(errTaskTokenNotSet, scope)
×
1118
        }
×
1119
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
246✔
1120
        if err != nil {
246✔
1121
                return wh.error(err, scope)
×
1122
        }
×
1123
        if taskToken.DomainID == "" {
246✔
1124
                return wh.error(errDomainNotSet, scope)
×
1125
        }
×
1126

1127
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
246✔
1128
        if err != nil {
246✔
1129
                return wh.error(err, scope)
×
1130
        }
×
1131

1132
        dw := domainWrapper{
246✔
1133
                domain: domainName,
246✔
1134
        }
246✔
1135
        scope, sw := wh.startRequestProfileWithDomain(
246✔
1136
                ctx,
246✔
1137
                metrics.FrontendRespondActivityTaskCompletedScope,
246✔
1138
                dw,
246✔
1139
        )
246✔
1140
        defer sw.Stop()
246✔
1141

246✔
1142
        // Count the request in the host RPS,
246✔
1143
        // but we still accept it even if RPS is exceeded
246✔
1144
        wh.allow(ratelimitTypeWorker, dw)
246✔
1145

246✔
1146
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
246✔
1147
                WorkflowID: taskToken.WorkflowID,
246✔
1148
                RunID:      taskToken.RunID,
246✔
1149
        })
246✔
1150

246✔
1151
        if !common.ValidIDLength(
246✔
1152
                completeRequest.GetIdentity(),
246✔
1153
                scope,
246✔
1154
                wh.config.MaxIDLengthWarnLimit(),
246✔
1155
                wh.config.IdentityMaxLength(domainName),
246✔
1156
                metrics.CadenceErrIdentityExceededWarnLimit,
246✔
1157
                domainName,
246✔
1158
                wh.GetLogger(),
246✔
1159
                tag.IDTypeIdentity) {
246✔
1160
                return wh.error(errIdentityTooLong, scope, tags...)
×
1161
        }
×
1162

1163
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
246✔
1164
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
246✔
1165

246✔
1166
        if err := common.CheckEventBlobSizeLimit(
246✔
1167
                len(completeRequest.Result),
246✔
1168
                sizeLimitWarn,
246✔
1169
                sizeLimitError,
246✔
1170
                taskToken.DomainID,
246✔
1171
                taskToken.WorkflowID,
246✔
1172
                taskToken.RunID,
246✔
1173
                scope,
246✔
1174
                wh.GetThrottledLogger(),
246✔
1175
                tag.BlobSizeViolationOperation("RespondActivityTaskCompleted"),
246✔
1176
        ); err != nil {
246✔
1177
                // result exceeds blob size limit, we would record it as failure
×
1178
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1179
                        TaskToken: completeRequest.TaskToken,
×
1180
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
1181
                        Details:   completeRequest.Result[0:sizeLimitError],
×
1182
                        Identity:  completeRequest.Identity,
×
1183
                }
×
1184
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1185
                        DomainUUID:    taskToken.DomainID,
×
1186
                        FailedRequest: failRequest,
×
1187
                })
×
1188
                if err != nil {
×
1189
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1190
                }
×
1191
        } else {
246✔
1192
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
246✔
1193
                        DomainUUID:      taskToken.DomainID,
246✔
1194
                        CompleteRequest: completeRequest,
246✔
1195
                })
246✔
1196
                if err != nil {
291✔
1197
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
45✔
1198
                }
45✔
1199
        }
1200

1201
        return nil
201✔
1202
}
1203

1204
// RespondActivityTaskCompletedByID - response to an activity task
1205
func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
1206
        ctx context.Context,
1207
        completeRequest *types.RespondActivityTaskCompletedByIDRequest,
1208
) (retError error) {
75✔
1209
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
150✔
1210

1211
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRespondActivityTaskCompletedByIDScope, completeRequest)
75✔
1212
        defer sw.Stop()
75✔
1213

75✔
1214
        if wh.isShuttingDown() {
75✔
1215
                return errShuttingDown
×
1216
        }
×
1217

1218
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
75✔
1219
                return wh.error(err, scope)
×
1220
        }
×
1221

1222
        if completeRequest == nil {
75✔
1223
                return wh.error(errRequestNotSet, scope)
×
1224
        }
×
1225

1226
        domainName := completeRequest.GetDomain()
75✔
1227
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
75✔
1228
                WorkflowID: completeRequest.GetWorkflowID(),
75✔
1229
                RunID:      completeRequest.GetRunID(),
75✔
1230
        })
75✔
1231

75✔
1232
        if domainName == "" {
75✔
1233
                return wh.error(errDomainNotSet, scope, tags...)
×
1234
        }
×
1235

1236
        // Count the request in the host RPS,
1237
        // but we still accept it even if RPS is exceeded
1238
        wh.allow(ratelimitTypeWorker, completeRequest)
75✔
1239

75✔
1240
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
75✔
1241
        if err != nil {
75✔
1242
                return wh.error(err, scope, tags...)
×
1243
        }
×
1244
        workflowID := completeRequest.GetWorkflowID()
75✔
1245
        runID := completeRequest.GetRunID() // runID is optional so can be empty
75✔
1246
        activityID := completeRequest.GetActivityID()
75✔
1247

75✔
1248
        if domainID == "" {
75✔
1249
                return wh.error(errDomainNotSet, scope, tags...)
×
1250
        }
×
1251
        if workflowID == "" {
75✔
1252
                return wh.error(errWorkflowIDNotSet, scope, tags...)
×
1253
        }
×
1254
        if activityID == "" {
75✔
1255
                return wh.error(errActivityIDNotSet, scope, tags...)
×
1256
        }
×
1257

1258
        if !common.ValidIDLength(
75✔
1259
                completeRequest.GetIdentity(),
75✔
1260
                scope,
75✔
1261
                wh.config.MaxIDLengthWarnLimit(),
75✔
1262
                wh.config.IdentityMaxLength(domainName),
75✔
1263
                metrics.CadenceErrIdentityExceededWarnLimit,
75✔
1264
                domainName,
75✔
1265
                wh.GetLogger(),
75✔
1266
                tag.IDTypeIdentity) {
75✔
1267
                return wh.error(errIdentityTooLong, scope)
×
1268
        }
×
1269

1270
        taskToken := &common.TaskToken{
75✔
1271
                DomainID:   domainID,
75✔
1272
                RunID:      runID,
75✔
1273
                WorkflowID: workflowID,
75✔
1274
                ScheduleID: common.EmptyEventID,
75✔
1275
                ActivityID: activityID,
75✔
1276
        }
75✔
1277
        token, err := wh.tokenSerializer.Serialize(taskToken)
75✔
1278
        if err != nil {
75✔
1279
                return wh.error(err, scope)
×
1280
        }
×
1281

1282
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
75✔
1283
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
75✔
1284

75✔
1285
        if err := common.CheckEventBlobSizeLimit(
75✔
1286
                len(completeRequest.Result),
75✔
1287
                sizeLimitWarn,
75✔
1288
                sizeLimitError,
75✔
1289
                taskToken.DomainID,
75✔
1290
                taskToken.WorkflowID,
75✔
1291
                taskToken.RunID,
75✔
1292
                scope,
75✔
1293
                wh.GetThrottledLogger(),
75✔
1294
                tag.BlobSizeViolationOperation("RespondActivityTaskCompletedByID"),
75✔
1295
        ); err != nil {
75✔
1296
                // result exceeds blob size limit, we would record it as failure
×
1297
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1298
                        TaskToken: token,
×
1299
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
1300
                        Details:   completeRequest.Result[0:sizeLimitError],
×
1301
                        Identity:  completeRequest.Identity,
×
1302
                }
×
1303
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1304
                        DomainUUID:    taskToken.DomainID,
×
1305
                        FailedRequest: failRequest,
×
1306
                })
×
1307
                if err != nil {
×
1308
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1309
                }
×
1310
        } else {
75✔
1311
                req := &types.RespondActivityTaskCompletedRequest{
75✔
1312
                        TaskToken: token,
75✔
1313
                        Result:    completeRequest.Result,
75✔
1314
                        Identity:  completeRequest.Identity,
75✔
1315
                }
75✔
1316

75✔
1317
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
75✔
1318
                        DomainUUID:      taskToken.DomainID,
75✔
1319
                        CompleteRequest: req,
75✔
1320
                })
75✔
1321
                if err != nil {
75✔
1322
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1323
                }
×
1324
        }
1325

1326
        return nil
75✔
1327
}
1328

1329
// RespondActivityTaskFailed - response to an activity task failure
1330
func (wh *WorkflowHandler) RespondActivityTaskFailed(
1331
        ctx context.Context,
1332
        failedRequest *types.RespondActivityTaskFailedRequest,
1333
) (retError error) {
12✔
1334
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
24✔
1335

1336
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondActivityTaskFailedScope)
12✔
1337

12✔
1338
        if wh.isShuttingDown() {
12✔
1339
                return errShuttingDown
×
1340
        }
×
1341

1342
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
12✔
1343
                return wh.error(err, scope)
×
1344
        }
×
1345

1346
        if failedRequest == nil {
12✔
1347
                return wh.error(errRequestNotSet, scope)
×
1348
        }
×
1349

1350
        if failedRequest.TaskToken == nil {
12✔
1351
                return wh.error(errTaskTokenNotSet, scope)
×
1352
        }
×
1353
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
12✔
1354
        if err != nil {
12✔
1355
                return wh.error(err, scope)
×
1356
        }
×
1357
        if taskToken.DomainID == "" {
12✔
1358
                return wh.error(errDomainNotSet, scope)
×
1359
        }
×
1360

1361
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
12✔
1362
        if err != nil {
12✔
1363
                return wh.error(err, scope)
×
1364
        }
×
1365

1366
        dw := domainWrapper{
12✔
1367
                domain: domainName,
12✔
1368
        }
12✔
1369
        scope, sw := wh.startRequestProfileWithDomain(
12✔
1370
                ctx,
12✔
1371
                metrics.FrontendRespondActivityTaskFailedScope,
12✔
1372
                dw,
12✔
1373
        )
12✔
1374
        defer sw.Stop()
12✔
1375

12✔
1376
        // Count the request in the host RPS,
12✔
1377
        // but we still accept it even if RPS is exceeded
12✔
1378
        wh.allow(ratelimitTypeWorker, dw)
12✔
1379

12✔
1380
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
12✔
1381
                WorkflowID: taskToken.WorkflowID,
12✔
1382
                RunID:      taskToken.RunID,
12✔
1383
        })
12✔
1384

12✔
1385
        if !common.ValidIDLength(
12✔
1386
                failedRequest.GetIdentity(),
12✔
1387
                scope,
12✔
1388
                wh.config.MaxIDLengthWarnLimit(),
12✔
1389
                wh.config.IdentityMaxLength(domainName),
12✔
1390
                metrics.CadenceErrIdentityExceededWarnLimit,
12✔
1391
                domainName,
12✔
1392
                wh.GetLogger(),
12✔
1393
                tag.IDTypeIdentity) {
12✔
1394
                return wh.error(errIdentityTooLong, scope, tags...)
×
1395
        }
×
1396

1397
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
12✔
1398
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
12✔
1399

12✔
1400
        if err := common.CheckEventBlobSizeLimit(
12✔
1401
                len(failedRequest.Details),
12✔
1402
                sizeLimitWarn,
12✔
1403
                sizeLimitError,
12✔
1404
                taskToken.DomainID,
12✔
1405
                taskToken.WorkflowID,
12✔
1406
                taskToken.RunID,
12✔
1407
                scope,
12✔
1408
                wh.GetThrottledLogger(),
12✔
1409
                tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
12✔
1410
        ); err != nil {
12✔
1411
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1412
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1413
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1414
        }
×
1415

1416
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
12✔
1417
                DomainUUID:    taskToken.DomainID,
12✔
1418
                FailedRequest: failedRequest,
12✔
1419
        })
12✔
1420
        if err != nil {
12✔
1421
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1422
        }
×
1423
        return nil
12✔
1424
}
1425

1426
// RespondActivityTaskFailedByID - response to an activity task failure
1427
func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
1428
        ctx context.Context,
1429
        failedRequest *types.RespondActivityTaskFailedByIDRequest,
1430
) (retError error) {
×
1431
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
1432

1433
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRespondActivityTaskFailedByIDScope, failedRequest)
×
1434
        defer sw.Stop()
×
1435

×
1436
        if wh.isShuttingDown() {
×
1437
                return errShuttingDown
×
1438
        }
×
1439

1440
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1441
                return wh.error(err, scope)
×
1442
        }
×
1443

1444
        if failedRequest == nil {
×
1445
                return wh.error(errRequestNotSet, scope)
×
1446
        }
×
1447

1448
        domainName := failedRequest.GetDomain()
×
1449
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
×
1450
                WorkflowID: failedRequest.GetWorkflowID(),
×
1451
                RunID:      failedRequest.GetRunID(),
×
1452
        })
×
1453

×
1454
        if domainName == "" {
×
1455
                return wh.error(errDomainNotSet, scope, tags...)
×
1456
        }
×
1457

1458
        // Count the request in the host RPS,
1459
        // but we still accept it even if RPS is exceeded
1460
        wh.allow(ratelimitTypeWorker, failedRequest)
×
1461

×
1462
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1463
        if err != nil {
×
1464
                return wh.error(err, scope, tags...)
×
1465
        }
×
1466
        workflowID := failedRequest.GetWorkflowID()
×
1467
        runID := failedRequest.GetRunID() // runID is optional so can be empty
×
1468
        activityID := failedRequest.GetActivityID()
×
1469

×
1470
        if domainID == "" {
×
1471
                return wh.error(errDomainNotSet, scope, tags...)
×
1472
        }
×
1473
        if workflowID == "" {
×
1474
                return wh.error(errWorkflowIDNotSet, scope, tags...)
×
1475
        }
×
1476
        if activityID == "" {
×
1477
                return wh.error(errActivityIDNotSet, scope, tags...)
×
1478
        }
×
1479

1480
        if !common.ValidIDLength(
×
1481
                failedRequest.GetIdentity(),
×
1482
                scope,
×
1483
                wh.config.MaxIDLengthWarnLimit(),
×
1484
                wh.config.IdentityMaxLength(failedRequest.GetDomain()),
×
1485
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1486
                domainName,
×
1487
                wh.GetLogger(),
×
1488
                tag.IDTypeIdentity) {
×
1489
                return wh.error(errIdentityTooLong, scope, tags...)
×
1490
        }
×
1491

1492
        taskToken := &common.TaskToken{
×
1493
                DomainID:   domainID,
×
1494
                RunID:      runID,
×
1495
                WorkflowID: workflowID,
×
1496
                ScheduleID: common.EmptyEventID,
×
1497
                ActivityID: activityID,
×
1498
        }
×
1499
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1500
        if err != nil {
×
1501
                return wh.error(err, scope, tags...)
×
1502
        }
×
1503

1504
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1505
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1506

×
1507
        if err := common.CheckEventBlobSizeLimit(
×
1508
                len(failedRequest.Details),
×
1509
                sizeLimitWarn,
×
1510
                sizeLimitError,
×
1511
                taskToken.DomainID,
×
1512
                taskToken.WorkflowID,
×
1513
                taskToken.RunID,
×
1514
                scope,
×
1515
                wh.GetThrottledLogger(),
×
1516
                tag.BlobSizeViolationOperation("RespondActivityTaskFailedByID"),
×
1517
        ); err != nil {
×
1518
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1519
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1520
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1521
        }
×
1522

1523
        req := &types.RespondActivityTaskFailedRequest{
×
1524
                TaskToken: token,
×
1525
                Reason:    failedRequest.Reason,
×
1526
                Details:   failedRequest.Details,
×
1527
                Identity:  failedRequest.Identity,
×
1528
        }
×
1529

×
1530
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1531
                DomainUUID:    taskToken.DomainID,
×
1532
                FailedRequest: req,
×
1533
        })
×
1534
        if err != nil {
×
1535
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1536
        }
×
1537
        return nil
×
1538
}
1539

1540
// RespondActivityTaskCanceled - called to cancel an activity task
1541
func (wh *WorkflowHandler) RespondActivityTaskCanceled(
1542
        ctx context.Context,
1543
        cancelRequest *types.RespondActivityTaskCanceledRequest,
1544
) (retError error) {
×
1545
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
1546

1547
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondActivityTaskCanceledScope)
×
1548

×
1549
        if wh.isShuttingDown() {
×
1550
                return errShuttingDown
×
1551
        }
×
1552

1553
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1554
                return wh.error(err, scope)
×
1555
        }
×
1556

1557
        if cancelRequest == nil {
×
1558
                return wh.error(errRequestNotSet, scope)
×
1559
        }
×
1560

1561
        if cancelRequest.TaskToken == nil {
×
1562
                return wh.error(errTaskTokenNotSet, scope)
×
1563
        }
×
1564

1565
        taskToken, err := wh.tokenSerializer.Deserialize(cancelRequest.TaskToken)
×
1566
        if err != nil {
×
1567
                return wh.error(err, scope)
×
1568
        }
×
1569

1570
        if taskToken.DomainID == "" {
×
1571
                return wh.error(errDomainNotSet, scope)
×
1572
        }
×
1573

1574
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
×
1575
        if err != nil {
×
1576
                return wh.error(err, scope)
×
1577
        }
×
1578

1579
        dw := domainWrapper{
×
1580
                domain: domainName,
×
1581
        }
×
1582
        scope, sw := wh.startRequestProfileWithDomain(
×
1583
                ctx,
×
1584
                metrics.FrontendRespondActivityTaskCanceledScope,
×
1585
                dw,
×
1586
        )
×
1587
        defer sw.Stop()
×
1588

×
1589
        // Count the request in the host RPS,
×
1590
        // but we still accept it even if RPS is exceeded
×
1591
        wh.allow(ratelimitTypeWorker, dw)
×
1592

×
1593
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
×
1594
                WorkflowID: taskToken.WorkflowID,
×
1595
                RunID:      taskToken.RunID,
×
1596
        })
×
1597

×
1598
        if !common.ValidIDLength(
×
1599
                cancelRequest.GetIdentity(),
×
1600
                scope,
×
1601
                wh.config.MaxIDLengthWarnLimit(),
×
1602
                wh.config.IdentityMaxLength(domainName),
×
1603
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1604
                domainName,
×
1605
                wh.GetLogger(),
×
1606
                tag.IDTypeIdentity) {
×
1607
                return wh.error(errIdentityTooLong, scope, tags...)
×
1608
        }
×
1609

1610
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1611
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1612

×
1613
        if err := common.CheckEventBlobSizeLimit(
×
1614
                len(cancelRequest.Details),
×
1615
                sizeLimitWarn,
×
1616
                sizeLimitError,
×
1617
                taskToken.DomainID,
×
1618
                taskToken.WorkflowID,
×
1619
                taskToken.RunID,
×
1620
                scope,
×
1621
                wh.GetThrottledLogger(),
×
1622
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceled"),
×
1623
        ); err != nil {
×
1624
                // details exceeds blob size limit, we would record it as failure
×
1625
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1626
                        TaskToken: cancelRequest.TaskToken,
×
1627
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1628
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1629
                        Identity:  cancelRequest.Identity,
×
1630
                }
×
1631
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1632
                        DomainUUID:    taskToken.DomainID,
×
1633
                        FailedRequest: failRequest,
×
1634
                })
×
1635
                if err != nil {
×
1636
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1637
                }
×
1638
        } else {
×
1639
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1640
                        DomainUUID:    taskToken.DomainID,
×
1641
                        CancelRequest: cancelRequest,
×
1642
                })
×
1643
                if err != nil {
×
1644
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1645
                }
×
1646
        }
1647

1648
        return nil
×
1649
}
1650

1651
// RespondActivityTaskCanceledByID - called to cancel an activity task
1652
func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
1653
        ctx context.Context,
1654
        cancelRequest *types.RespondActivityTaskCanceledByIDRequest,
1655
) (retError error) {
×
1656
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
1657

1658
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRespondActivityTaskCanceledScope, cancelRequest)
×
1659
        defer sw.Stop()
×
1660

×
1661
        if wh.isShuttingDown() {
×
1662
                return errShuttingDown
×
1663
        }
×
1664

1665
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1666
                return wh.error(err, scope)
×
1667
        }
×
1668

1669
        if cancelRequest == nil {
×
1670
                return wh.error(errRequestNotSet, scope)
×
1671
        }
×
1672

1673
        domainName := cancelRequest.GetDomain()
×
1674
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
×
1675
                WorkflowID: cancelRequest.GetWorkflowID(),
×
1676
                RunID:      cancelRequest.GetRunID(),
×
1677
        })
×
1678

×
1679
        if domainName == "" {
×
1680
                return wh.error(errDomainNotSet, scope, tags...)
×
1681
        }
×
1682

1683
        // Count the request in the host RPS,
1684
        // but we still accept it even if RPS is exceeded
1685
        wh.allow(ratelimitTypeWorker, cancelRequest)
×
1686

×
1687
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1688
        if err != nil {
×
1689
                return wh.error(err, scope, tags...)
×
1690
        }
×
1691
        workflowID := cancelRequest.GetWorkflowID()
×
1692
        runID := cancelRequest.GetRunID() // runID is optional so can be empty
×
1693
        activityID := cancelRequest.GetActivityID()
×
1694

×
1695
        if domainID == "" {
×
1696
                return wh.error(errDomainNotSet, scope, tags...)
×
1697
        }
×
1698
        if workflowID == "" {
×
1699
                return wh.error(errWorkflowIDNotSet, scope, tags...)
×
1700
        }
×
1701
        if activityID == "" {
×
1702
                return wh.error(errActivityIDNotSet, scope, tags...)
×
1703
        }
×
1704

1705
        if !common.ValidIDLength(
×
1706
                cancelRequest.GetIdentity(),
×
1707
                scope,
×
1708
                wh.config.MaxIDLengthWarnLimit(),
×
1709
                wh.config.IdentityMaxLength(cancelRequest.GetDomain()),
×
1710
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1711
                domainName,
×
1712
                wh.GetLogger(),
×
1713
                tag.IDTypeIdentity) {
×
1714
                return wh.error(errIdentityTooLong, scope, tags...)
×
1715
        }
×
1716

1717
        taskToken := &common.TaskToken{
×
1718
                DomainID:   domainID,
×
1719
                RunID:      runID,
×
1720
                WorkflowID: workflowID,
×
1721
                ScheduleID: common.EmptyEventID,
×
1722
                ActivityID: activityID,
×
1723
        }
×
1724
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1725
        if err != nil {
×
1726
                return wh.error(err, scope, tags...)
×
1727
        }
×
1728

1729
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1730
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1731

×
1732
        if err := common.CheckEventBlobSizeLimit(
×
1733
                len(cancelRequest.Details),
×
1734
                sizeLimitWarn,
×
1735
                sizeLimitError,
×
1736
                taskToken.DomainID,
×
1737
                taskToken.WorkflowID,
×
1738
                taskToken.RunID,
×
1739
                scope,
×
1740
                wh.GetThrottledLogger(),
×
1741
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceledByID"),
×
1742
        ); err != nil {
×
1743
                // details exceeds blob size limit, we would record it as failure
×
1744
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1745
                        TaskToken: token,
×
1746
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1747
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1748
                        Identity:  cancelRequest.Identity,
×
1749
                }
×
1750
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1751
                        DomainUUID:    taskToken.DomainID,
×
1752
                        FailedRequest: failRequest,
×
1753
                })
×
1754
                if err != nil {
×
1755
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1756
                }
×
1757
        } else {
×
1758
                req := &types.RespondActivityTaskCanceledRequest{
×
1759
                        TaskToken: token,
×
1760
                        Details:   cancelRequest.Details,
×
1761
                        Identity:  cancelRequest.Identity,
×
1762
                }
×
1763

×
1764
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1765
                        DomainUUID:    taskToken.DomainID,
×
1766
                        CancelRequest: req,
×
1767
                })
×
1768
                if err != nil {
×
1769
                        return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1770
                }
×
1771
        }
1772

1773
        return nil
×
1774
}
1775

1776
// RespondDecisionTaskCompleted - response to a decision task
1777
func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
1778
        ctx context.Context,
1779
        completeRequest *types.RespondDecisionTaskCompletedRequest,
1780
) (resp *types.RespondDecisionTaskCompletedResponse, retError error) {
931✔
1781
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
1,862✔
1782

1783
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondDecisionTaskCompletedScope)
931✔
1784

931✔
1785
        if wh.isShuttingDown() {
931✔
1786
                return nil, errShuttingDown
×
1787
        }
×
1788

1789
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
931✔
1790
                return nil, wh.error(err, scope)
×
1791
        }
×
1792

1793
        if completeRequest == nil {
931✔
1794
                return nil, wh.error(errRequestNotSet, scope)
×
1795
        }
×
1796

1797
        if completeRequest.TaskToken == nil {
931✔
1798
                return nil, wh.error(errTaskTokenNotSet, scope)
×
1799
        }
×
1800
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
931✔
1801
        if err != nil {
931✔
1802
                return nil, wh.error(err, scope)
×
1803
        }
×
1804
        if taskToken.DomainID == "" {
931✔
1805
                return nil, wh.error(errDomainNotSet, scope)
×
1806
        }
×
1807

1808
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
931✔
1809
        if err != nil {
931✔
1810
                return nil, wh.error(err, scope)
×
1811
        }
×
1812

1813
        dw := domainWrapper{
931✔
1814
                domain: domainName,
931✔
1815
        }
931✔
1816
        scope, sw := wh.startRequestProfileWithDomain(
931✔
1817
                ctx,
931✔
1818
                metrics.FrontendRespondDecisionTaskCompletedScope,
931✔
1819
                dw,
931✔
1820
        )
931✔
1821
        defer sw.Stop()
931✔
1822

931✔
1823
        // Count the request in the host RPS,
931✔
1824
        // but we still accept it even if RPS is exceeded
931✔
1825
        wh.allow(ratelimitTypeWorker, dw)
931✔
1826

931✔
1827
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
931✔
1828
                WorkflowID: taskToken.WorkflowID,
931✔
1829
                RunID:      taskToken.RunID,
931✔
1830
        })
931✔
1831

931✔
1832
        if !common.ValidIDLength(
931✔
1833
                completeRequest.GetIdentity(),
931✔
1834
                scope,
931✔
1835
                wh.config.MaxIDLengthWarnLimit(),
931✔
1836
                wh.config.IdentityMaxLength(domainName),
931✔
1837
                metrics.CadenceErrIdentityExceededWarnLimit,
931✔
1838
                domainName,
931✔
1839
                wh.GetLogger(),
931✔
1840
                tag.IDTypeIdentity) {
931✔
1841
                return nil, wh.error(errIdentityTooLong, scope, tags...)
×
1842
        }
×
1843

1844
        if err := common.CheckDecisionResultLimit(
931✔
1845
                len(completeRequest.Decisions),
931✔
1846
                wh.config.DecisionResultCountLimit(domainName),
931✔
1847
                scope); err != nil {
931✔
1848
                return nil, wh.error(err, scope)
×
1849
        }
×
1850

1851
        histResp, err := wh.GetHistoryClient().RespondDecisionTaskCompleted(ctx, &types.HistoryRespondDecisionTaskCompletedRequest{
931✔
1852
                DomainUUID:      taskToken.DomainID,
931✔
1853
                CompleteRequest: completeRequest},
931✔
1854
        )
931✔
1855
        if err != nil {
940✔
1856
                return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
9✔
1857
        }
9✔
1858

1859
        completedResp := &types.RespondDecisionTaskCompletedResponse{}
922✔
1860
        completedResp.ActivitiesToDispatchLocally = histResp.ActivitiesToDispatchLocally
922✔
1861
        if completeRequest.GetReturnNewDecisionTask() && histResp != nil && histResp.StartedResponse != nil {
982✔
1862
                taskToken := &common.TaskToken{
60✔
1863
                        DomainID:        taskToken.DomainID,
60✔
1864
                        WorkflowID:      taskToken.WorkflowID,
60✔
1865
                        RunID:           taskToken.RunID,
60✔
1866
                        ScheduleID:      histResp.StartedResponse.GetScheduledEventID(),
60✔
1867
                        ScheduleAttempt: histResp.StartedResponse.GetAttempt(),
60✔
1868
                }
60✔
1869
                token, _ := wh.tokenSerializer.Serialize(taskToken)
60✔
1870
                workflowExecution := &types.WorkflowExecution{
60✔
1871
                        WorkflowID: taskToken.WorkflowID,
60✔
1872
                        RunID:      taskToken.RunID,
60✔
1873
                }
60✔
1874
                matchingResp := common.CreateMatchingPollForDecisionTaskResponse(histResp.StartedResponse, workflowExecution, token)
60✔
1875

60✔
1876
                newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, scope, taskToken.DomainID, matchingResp, matchingResp.GetBranchToken())
60✔
1877
                if err != nil {
60✔
1878
                        return nil, wh.error(err, scope, tags...)
×
1879
                }
×
1880
                completedResp.DecisionTask = newDecisionTask
60✔
1881
        }
1882

1883
        return completedResp, nil
922✔
1884
}
1885

1886
// RespondDecisionTaskFailed - failed response to a decision task
1887
func (wh *WorkflowHandler) RespondDecisionTaskFailed(
1888
        ctx context.Context,
1889
        failedRequest *types.RespondDecisionTaskFailedRequest,
1890
) (retError error) {
159✔
1891
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
318✔
1892

1893
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondDecisionTaskFailedScope)
159✔
1894

159✔
1895
        if wh.isShuttingDown() {
159✔
1896
                return errShuttingDown
×
1897
        }
×
1898

1899
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
159✔
1900
                return wh.error(err, scope)
×
1901
        }
×
1902

1903
        if failedRequest == nil {
159✔
1904
                return wh.error(errRequestNotSet, scope)
×
1905
        }
×
1906

1907
        if failedRequest.TaskToken == nil {
159✔
1908
                return wh.error(errTaskTokenNotSet, scope)
×
1909
        }
×
1910
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
159✔
1911
        if err != nil {
159✔
1912
                return wh.error(err, scope)
×
1913
        }
×
1914
        if taskToken.DomainID == "" {
159✔
1915
                return wh.error(errDomainNotSet, scope)
×
1916
        }
×
1917

1918
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
159✔
1919
        if err != nil {
159✔
1920
                return wh.error(err, scope)
×
1921
        }
×
1922

1923
        dw := domainWrapper{
159✔
1924
                domain: domainName,
159✔
1925
        }
159✔
1926
        scope, sw := wh.startRequestProfileWithDomain(
159✔
1927
                ctx,
159✔
1928
                metrics.FrontendRespondDecisionTaskFailedScope,
159✔
1929
                dw,
159✔
1930
        )
159✔
1931
        defer sw.Stop()
159✔
1932

159✔
1933
        // Count the request in the host RPS,
159✔
1934
        // but we still accept it even if RPS is exceeded
159✔
1935
        wh.allow(ratelimitTypeWorker, dw)
159✔
1936

159✔
1937
        tags := getDomainWfIDRunIDTags(domainName, &types.WorkflowExecution{
159✔
1938
                WorkflowID: taskToken.WorkflowID,
159✔
1939
                RunID:      taskToken.RunID,
159✔
1940
        })
159✔
1941

159✔
1942
        if !common.ValidIDLength(
159✔
1943
                failedRequest.GetIdentity(),
159✔
1944
                scope,
159✔
1945
                wh.config.MaxIDLengthWarnLimit(),
159✔
1946
                wh.config.IdentityMaxLength(domainName),
159✔
1947
                metrics.CadenceErrIdentityExceededWarnLimit,
159✔
1948
                domainName,
159✔
1949
                wh.GetLogger(),
159✔
1950
                tag.IDTypeIdentity) {
159✔
1951
                return wh.error(errIdentityTooLong, scope, tags...)
×
1952
        }
×
1953

1954
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
159✔
1955
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
159✔
1956

159✔
1957
        if err := common.CheckEventBlobSizeLimit(
159✔
1958
                len(failedRequest.Details),
159✔
1959
                sizeLimitWarn,
159✔
1960
                sizeLimitError,
159✔
1961
                taskToken.DomainID,
159✔
1962
                taskToken.WorkflowID,
159✔
1963
                taskToken.RunID,
159✔
1964
                scope,
159✔
1965
                wh.GetThrottledLogger(),
159✔
1966
                tag.BlobSizeViolationOperation("RespondDecisionTaskFailed"),
159✔
1967
        ); err != nil {
159✔
1968
                // details exceed, we would just truncate the size for decision task failed as the details is not used anywhere by client code
×
1969
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1970
        }
×
1971

1972
        err = wh.GetHistoryClient().RespondDecisionTaskFailed(ctx, &types.HistoryRespondDecisionTaskFailedRequest{
159✔
1973
                DomainUUID:    taskToken.DomainID,
159✔
1974
                FailedRequest: failedRequest,
159✔
1975
        })
159✔
1976
        if err != nil {
159✔
1977
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
1978
        }
×
1979
        return nil
159✔
1980
}
1981

1982
// RespondQueryTaskCompleted - response to a query task
1983
func (wh *WorkflowHandler) RespondQueryTaskCompleted(
1984
        ctx context.Context,
1985
        completeRequest *types.RespondQueryTaskCompletedRequest,
1986
) (retError error) {
30✔
1987
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
60✔
1988

1989
        scope := wh.getDefaultScope(ctx, metrics.FrontendRespondQueryTaskCompletedScope)
30✔
1990

30✔
1991
        if wh.isShuttingDown() {
30✔
1992
                return errShuttingDown
×
1993
        }
×
1994

1995
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
30✔
1996
                return wh.error(err, scope)
×
1997
        }
×
1998

1999
        if completeRequest == nil {
30✔
2000
                return wh.error(errRequestNotSet, scope)
×
2001
        }
×
2002

2003
        if completeRequest.TaskToken == nil {
30✔
2004
                return wh.error(errTaskTokenNotSet, scope)
×
2005
        }
×
2006
        queryTaskToken, err := wh.tokenSerializer.DeserializeQueryTaskToken(completeRequest.TaskToken)
30✔
2007
        if err != nil {
30✔
2008
                return wh.error(err, scope)
×
2009
        }
×
2010
        if queryTaskToken.DomainID == "" || queryTaskToken.TaskList == "" || queryTaskToken.TaskID == "" {
30✔
2011
                return wh.error(errInvalidTaskToken, scope)
×
2012
        }
×
2013

2014
        domainName, err := wh.GetDomainCache().GetDomainName(queryTaskToken.DomainID)
30✔
2015
        if err != nil {
30✔
2016
                return wh.error(err, scope)
×
2017
        }
×
2018

2019
        dw := domainWrapper{
30✔
2020
                domain: domainName,
30✔
2021
        }
30✔
2022
        scope, sw := wh.startRequestProfileWithDomain(
30✔
2023
                ctx,
30✔
2024
                metrics.FrontendRespondQueryTaskCompletedScope,
30✔
2025
                dw,
30✔
2026
        )
30✔
2027
        defer sw.Stop()
30✔
2028

30✔
2029
        // Count the request in the host RPS,
30✔
2030
        // but we still accept it even if RPS is exceeded
30✔
2031
        wh.allow(ratelimitTypeWorker, dw)
30✔
2032

30✔
2033
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
30✔
2034
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
30✔
2035

30✔
2036
        if err := common.CheckEventBlobSizeLimit(
30✔
2037
                len(completeRequest.GetQueryResult()),
30✔
2038
                sizeLimitWarn,
30✔
2039
                sizeLimitError,
30✔
2040
                queryTaskToken.DomainID,
30✔
2041
                "",
30✔
2042
                "",
30✔
2043
                scope,
30✔
2044
                wh.GetThrottledLogger(),
30✔
2045
                tag.BlobSizeViolationOperation("RespondQueryTaskCompleted"),
30✔
2046
        ); err != nil {
30✔
2047
                completeRequest = &types.RespondQueryTaskCompletedRequest{
×
2048
                        TaskToken:     completeRequest.TaskToken,
×
2049
                        CompletedType: types.QueryTaskCompletedTypeFailed.Ptr(),
×
2050
                        QueryResult:   nil,
×
2051
                        ErrorMessage:  err.Error(),
×
2052
                }
×
2053
        }
×
2054

2055
        call := yarpc.CallFromContext(ctx)
30✔
2056

30✔
2057
        completeRequest.WorkerVersionInfo = &types.WorkerVersionInfo{
30✔
2058
                Impl:           call.Header(common.ClientImplHeaderName),
30✔
2059
                FeatureVersion: call.Header(common.FeatureVersionHeaderName),
30✔
2060
        }
30✔
2061
        matchingRequest := &types.MatchingRespondQueryTaskCompletedRequest{
30✔
2062
                DomainUUID:       queryTaskToken.DomainID,
30✔
2063
                TaskList:         &types.TaskList{Name: queryTaskToken.TaskList},
30✔
2064
                TaskID:           queryTaskToken.TaskID,
30✔
2065
                CompletedRequest: completeRequest,
30✔
2066
        }
30✔
2067

30✔
2068
        err = wh.GetMatchingClient().RespondQueryTaskCompleted(ctx, matchingRequest)
30✔
2069
        if err != nil {
30✔
2070
                return wh.error(err, scope)
×
2071
        }
×
2072
        return nil
30✔
2073
}
2074

2075
// StartWorkflowExecution - Creates a new workflow execution
2076
func (wh *WorkflowHandler) StartWorkflowExecution(
2077
        ctx context.Context,
2078
        startRequest *types.StartWorkflowExecutionRequest,
2079
) (resp *types.StartWorkflowExecutionResponse, retError error) {
454✔
2080
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
908✔
2081

2082
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendStartWorkflowExecutionScope, startRequest)
454✔
2083
        defer sw.Stop()
454✔
2084

454✔
2085
        if wh.isShuttingDown() {
454✔
2086
                return nil, errShuttingDown
×
2087
        }
×
2088

2089
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
454✔
2090
                return nil, wh.error(err, scope)
×
2091
        }
×
2092

2093
        if startRequest == nil {
455✔
2094
                return nil, wh.error(errRequestNotSet, scope)
1✔
2095
        }
1✔
2096

2097
        domainName := startRequest.GetDomain()
453✔
2098
        wfExecution := &types.WorkflowExecution{
453✔
2099
                WorkflowID: startRequest.GetWorkflowID(),
453✔
2100
        }
453✔
2101
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
453✔
2102

453✔
2103
        if domainName == "" {
454✔
2104
                return nil, wh.error(errDomainNotSet, scope, tags...)
1✔
2105
        }
1✔
2106

2107
        if ok := wh.allow(ratelimitTypeUser, startRequest); !ok {
452✔
2108
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
2109
        }
×
2110

2111
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
452✔
2112
        if !common.ValidIDLength(
452✔
2113
                domainName,
452✔
2114
                scope,
452✔
2115
                idLengthWarnLimit,
452✔
2116
                wh.config.DomainNameMaxLength(domainName),
452✔
2117
                metrics.CadenceErrDomainNameExceededWarnLimit,
452✔
2118
                domainName,
452✔
2119
                wh.GetLogger(),
452✔
2120
                tag.IDTypeDomainName) {
452✔
2121
                return nil, wh.error(errDomainTooLong, scope, tags...)
×
2122
        }
×
2123

2124
        if startRequest.GetWorkflowID() == "" {
453✔
2125
                return nil, wh.error(errWorkflowIDNotSet, scope, tags...)
1✔
2126
        }
1✔
2127

2128
        if !common.ValidIDLength(
451✔
2129
                startRequest.GetWorkflowID(),
451✔
2130
                scope,
451✔
2131
                idLengthWarnLimit,
451✔
2132
                wh.config.WorkflowIDMaxLength(domainName),
451✔
2133
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
451✔
2134
                domainName,
451✔
2135
                wh.GetLogger(),
451✔
2136
                tag.IDTypeWorkflowID) {
451✔
2137
                return nil, wh.error(errWorkflowIDTooLong, scope, tags...)
×
2138
        }
×
2139

2140
        if err := common.ValidateRetryPolicy(startRequest.RetryPolicy); err != nil {
451✔
2141
                return nil, wh.error(err, scope, tags...)
×
2142
        }
×
2143

2144
        if startRequest.GetCronSchedule() != "" {
469✔
2145
                if _, err := backoff.ValidateSchedule(startRequest.GetCronSchedule()); err != nil {
18✔
2146
                        return nil, wh.error(err, scope, tags...)
×
2147
                }
×
2148
        }
2149

2150
        wh.GetLogger().Debug(
451✔
2151
                "Received StartWorkflowExecution. WorkflowID",
451✔
2152
                tag.WorkflowID(startRequest.GetWorkflowID()))
451✔
2153

451✔
2154
        if startRequest.WorkflowType == nil || startRequest.WorkflowType.GetName() == "" {
452✔
2155
                return nil, wh.error(errWorkflowTypeNotSet, scope, tags...)
1✔
2156
        }
1✔
2157

2158
        if !common.ValidIDLength(
450✔
2159
                startRequest.WorkflowType.GetName(),
450✔
2160
                scope,
450✔
2161
                idLengthWarnLimit,
450✔
2162
                wh.config.WorkflowTypeMaxLength(domainName),
450✔
2163
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
450✔
2164
                domainName,
450✔
2165
                wh.GetLogger(),
450✔
2166
                tag.IDTypeWorkflowType) {
450✔
2167
                return nil, wh.error(errWorkflowTypeTooLong, scope, tags...)
×
2168
        }
×
2169

2170
        if err := wh.validateTaskList(startRequest.TaskList, scope, domainName); err != nil {
451✔
2171
                return nil, wh.error(err, scope, tags...)
1✔
2172
        }
1✔
2173

2174
        if startRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
450✔
2175
                return nil, wh.error(errInvalidExecutionStartToCloseTimeoutSeconds, scope, tags...)
1✔
2176
        }
1✔
2177

2178
        if startRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
449✔
2179
                return nil, wh.error(errInvalidTaskStartToCloseTimeoutSeconds, scope, tags...)
1✔
2180
        }
1✔
2181

2182
        if startRequest.GetDelayStartSeconds() < 0 {
448✔
2183
                return nil, wh.error(errInvalidDelayStartSeconds, scope, tags...)
1✔
2184
        }
1✔
2185

2186
        if startRequest.GetJitterStartSeconds() < 0 {
446✔
2187
                return nil, wh.error(errInvalidJitterStartSeconds, scope, tags...)
×
2188
        }
×
2189

2190
        jitter := startRequest.GetJitterStartSeconds()
446✔
2191
        cron := startRequest.GetCronSchedule()
446✔
2192
        if jitter > 0 && cron != "" {
446✔
2193
                // Calculate the cron duration and ensure that jitter is not greater than the cron duration,
×
2194
                // because that would be confusing to users.
×
2195

×
2196
                // Request using start/end time zero value, which will get us an exact answer (i.e. its not in the
×
2197
                // middle of a minute)
×
2198
                backoffSeconds, err := backoff.GetBackoffForNextScheduleInSeconds(cron, time.Time{}, time.Time{}, jitter)
×
2199
                if err != nil {
×
2200
                        tags = append(tags, tag.WorkflowCronSchedule(cron))
×
2201
                        return nil, wh.error(err, scope, tags...)
×
2202
                }
×
2203
                if jitter > backoffSeconds {
×
2204
                        return nil, wh.error(errInvalidJitterStartSeconds2, scope, tags...)
×
2205
                }
×
2206
        }
2207

2208
        if startRequest.GetRequestID() == "" {
447✔
2209
                return nil, wh.error(errRequestIDNotSet, scope, tags...)
1✔
2210
        }
1✔
2211

2212
        if !common.ValidIDLength(
445✔
2213
                startRequest.GetRequestID(),
445✔
2214
                scope,
445✔
2215
                idLengthWarnLimit,
445✔
2216
                wh.config.RequestIDMaxLength(domainName),
445✔
2217
                metrics.CadenceErrRequestIDExceededWarnLimit,
445✔
2218
                domainName,
445✔
2219
                wh.GetLogger(),
445✔
2220
                tag.IDTypeRequestID) {
445✔
2221
                return nil, wh.error(errRequestIDTooLong, scope, tags...)
×
2222
        }
×
2223

2224
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(startRequest.SearchAttributes, domainName); err != nil {
445✔
2225
                return nil, wh.error(err, scope, tags...)
×
2226
        }
×
2227

2228
        wh.GetLogger().Debug("Start workflow execution request domain", tag.WorkflowDomainName(domainName))
445✔
2229
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
445✔
2230
        if err != nil {
445✔
2231
                return nil, wh.error(err, scope, tags...)
×
2232
        }
×
2233

2234
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
445✔
2235
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
445✔
2236
        actualSize := len(startRequest.Input)
445✔
2237
        if startRequest.Memo != nil {
457✔
2238
                actualSize += common.GetSizeOfMapStringToByteArray(startRequest.Memo.GetFields())
12✔
2239
        }
12✔
2240
        if err := common.CheckEventBlobSizeLimit(
445✔
2241
                actualSize,
445✔
2242
                sizeLimitWarn,
445✔
2243
                sizeLimitError,
445✔
2244
                domainID,
445✔
2245
                startRequest.GetWorkflowID(),
445✔
2246
                "",
445✔
2247
                scope,
445✔
2248
                wh.GetThrottledLogger(),
445✔
2249
                tag.BlobSizeViolationOperation("StartWorkflowExecution"),
445✔
2250
        ); err != nil {
445✔
2251
                return nil, wh.error(err, scope, tags...)
×
2252
        }
×
2253

2254
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
445✔
2255
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
446✔
2256
                return nil, wh.error(&types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}, scope, tags...)
1✔
2257
        }
1✔
2258

2259
        wh.GetLogger().Debug("Start workflow execution request domainID", tag.WorkflowDomainID(domainID))
444✔
2260
        historyRequest, err := common.CreateHistoryStartWorkflowRequest(
444✔
2261
                domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
444✔
2262
        if err != nil {
444✔
2263
                return nil, err
×
2264
        }
×
2265

2266
        resp, err = wh.GetHistoryClient().StartWorkflowExecution(ctx, historyRequest)
444✔
2267
        if err != nil {
462✔
2268
                return nil, wh.error(err, scope, tags...)
18✔
2269
        }
18✔
2270
        return resp, nil
426✔
2271
}
2272

2273
// GetWorkflowExecutionHistory - retrieves the history of workflow execution
2274
func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
2275
        ctx context.Context,
2276
        getRequest *types.GetWorkflowExecutionHistoryRequest,
2277
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
453✔
2278
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
906✔
2279

2280
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendGetWorkflowExecutionHistoryScope, getRequest)
453✔
2281
        defer sw.Stop()
453✔
2282

453✔
2283
        if wh.isShuttingDown() {
453✔
2284
                return nil, errShuttingDown
×
2285
        }
×
2286

2287
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
453✔
2288
                return nil, wh.error(err, scope)
×
2289
        }
×
2290

2291
        if getRequest == nil {
453✔
2292
                return nil, wh.error(errRequestNotSet, scope)
×
2293
        }
×
2294

2295
        domainName := getRequest.GetDomain()
453✔
2296
        wfExecution := getRequest.GetExecution()
453✔
2297
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
453✔
2298

453✔
2299
        if domainName == "" {
453✔
2300
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
2301
        }
×
2302

2303
        if ok := wh.allow(ratelimitTypeUser, getRequest); !ok {
453✔
2304
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
2305
        }
×
2306

2307
        if err := validateExecution(wfExecution); err != nil {
453✔
2308
                return nil, wh.error(err, scope, tags...)
×
2309
        }
×
2310

2311
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
453✔
2312
        if err != nil {
453✔
2313
                return nil, wh.error(err, scope, tags...)
×
2314
        }
×
2315

2316
        if getRequest.GetMaximumPageSize() <= 0 {
774✔
2317
                getRequest.MaximumPageSize = int32(wh.config.HistoryMaxPageSize(getRequest.GetDomain()))
321✔
2318
        }
321✔
2319
        // force limit page size if exceed
2320
        if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize {
453✔
2321
                wh.GetThrottledLogger().Warn("GetHistory page size is larger than threshold",
×
2322
                        tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2323
                        tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2324
                        tag.WorkflowDomainID(domainID),
×
2325
                        tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))
×
2326
                getRequest.MaximumPageSize = common.GetHistoryMaxPageSize
×
2327
        }
×
2328

2329
        if !getRequest.GetSkipArchival() {
888✔
2330
                enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled()
435✔
2331
                historyArchived := wh.historyArchived(ctx, getRequest, domainID)
435✔
2332
                if enableArchivalRead && historyArchived {
459✔
2333
                        return wh.getArchivedHistory(ctx, getRequest, domainID, scope, tags...)
24✔
2334
                }
24✔
2335
        }
2336

2337
        // this function return the following 6 things,
2338
        // 1. branch token
2339
        // 2. the workflow run ID
2340
        // 3. the last first event ID (the event ID of the last batch of events in the history)
2341
        // 4. the next event ID
2342
        // 5. whether the workflow is closed
2343
        // 6. error if any
2344
        queryHistory := func(
429✔
2345
                domainUUID string,
429✔
2346
                execution *types.WorkflowExecution,
429✔
2347
                expectedNextEventID int64,
429✔
2348
                currentBranchToken []byte,
429✔
2349
        ) ([]byte, string, int64, int64, bool, error) {
814✔
2350
                response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
385✔
2351
                        DomainUUID:          domainUUID,
385✔
2352
                        Execution:           execution,
385✔
2353
                        ExpectedNextEventID: expectedNextEventID,
385✔
2354
                        CurrentBranchToken:  currentBranchToken,
385✔
2355
                })
385✔
2356

385✔
2357
                if err != nil {
385✔
2358
                        return nil, "", 0, 0, false, err
×
2359
                }
×
2360
                isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
385✔
2361

385✔
2362
                return response.CurrentBranchToken,
385✔
2363
                        response.Execution.GetRunID(),
385✔
2364
                        response.GetLastFirstEventID(),
385✔
2365
                        response.GetNextEventID(),
385✔
2366
                        isWorkflowRunning,
385✔
2367
                        nil
385✔
2368
        }
2369

2370
        isLongPoll := getRequest.GetWaitForNewEvent()
429✔
2371
        isCloseEventOnly := getRequest.GetHistoryEventFilterType() == types.HistoryEventFilterTypeCloseEvent
429✔
2372
        execution := getRequest.Execution
429✔
2373
        token := &getHistoryContinuationToken{}
429✔
2374

429✔
2375
        var runID string
429✔
2376
        lastFirstEventID := common.FirstEventID
429✔
2377
        var nextEventID int64
429✔
2378
        var isWorkflowRunning bool
429✔
2379

429✔
2380
        // process the token for paging
429✔
2381
        queryNextEventID := common.EndEventID
429✔
2382
        if getRequest.NextPageToken != nil {
473✔
2383
                token, err = deserializeHistoryToken(getRequest.NextPageToken)
44✔
2384
                if err != nil {
44✔
2385
                        return nil, wh.error(errInvalidNextPageToken, scope, tags...)
×
2386
                }
×
2387
                if execution.RunID != "" && execution.GetRunID() != token.RunID {
44✔
2388
                        return nil, wh.error(errNextPageTokenRunIDMismatch, scope, tags...)
×
2389
                }
×
2390

2391
                execution.RunID = token.RunID
44✔
2392

44✔
2393
                // we need to update the current next event ID and whether workflow is running
44✔
2394
                if len(token.PersistenceToken) == 0 && isLongPoll && token.IsWorkflowRunning {
44✔
2395
                        logger := wh.GetLogger().WithTags(
×
2396
                                tag.WorkflowDomainName(getRequest.GetDomain()),
×
2397
                                tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2398
                                tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2399
                        )
×
2400
                        // TODO: for now we only log the invalid timeout (this is done inside the helper function) in case
×
2401
                        // this change breaks existing customers. Once we are sure no one is calling this API with very short timeout
×
2402
                        // we can return the error.
×
2403
                        _ = common.ValidateLongPollContextTimeout(ctx, "GetWorkflowExecutionHistory", logger)
×
2404

×
2405
                        if !isCloseEventOnly {
×
2406
                                queryNextEventID = token.NextEventID
×
2407
                        }
×
2408
                        token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
×
2409
                                queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
×
2410
                        if err != nil {
×
2411
                                return nil, wh.error(err, scope, tags...)
×
2412
                        }
×
2413
                        token.FirstEventID = token.NextEventID
×
2414
                        token.NextEventID = nextEventID
×
2415
                        token.IsWorkflowRunning = isWorkflowRunning
×
2416
                }
2417
        } else {
385✔
2418
                if !isCloseEventOnly {
752✔
2419
                        queryNextEventID = common.FirstEventID
367✔
2420
                }
367✔
2421
                token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
385✔
2422
                        queryHistory(domainID, execution, queryNextEventID, nil)
385✔
2423
                if err != nil {
385✔
2424
                        return nil, wh.error(err, scope, tags...)
×
2425
                }
×
2426

2427
                execution.RunID = runID
385✔
2428

385✔
2429
                token.RunID = runID
385✔
2430
                token.FirstEventID = common.FirstEventID
385✔
2431
                token.NextEventID = nextEventID
385✔
2432
                token.IsWorkflowRunning = isWorkflowRunning
385✔
2433
                token.PersistenceToken = nil
385✔
2434
        }
2435

2436
        call := yarpc.CallFromContext(ctx)
429✔
2437
        clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
429✔
2438
        clientImpl := call.Header(common.ClientImplHeaderName)
429✔
2439
        supportsRawHistoryQuery := wh.versionChecker.SupportsRawHistoryQuery(clientImpl, clientFeatureVersion) == nil
429✔
2440
        isRawHistoryEnabled := wh.config.SendRawWorkflowHistory(domainName) && supportsRawHistoryQuery
429✔
2441

429✔
2442
        history := &types.History{}
429✔
2443
        history.Events = []*types.HistoryEvent{}
429✔
2444
        var historyBlob []*types.DataBlob
429✔
2445

429✔
2446
        // helper function to just getHistory
429✔
2447
        getHistory := func(firstEventID, nextEventID int64, nextPageToken []byte) error {
858✔
2448
                if isRawHistoryEnabled {
431✔
2449
                        historyBlob, token.PersistenceToken, err = wh.getRawHistory(
2✔
2450
                                ctx,
2✔
2451
                                scope,
2✔
2452
                                domainID,
2✔
2453
                                domainName,
2✔
2454
                                *execution,
2✔
2455
                                firstEventID,
2✔
2456
                                nextEventID,
2✔
2457
                                getRequest.GetMaximumPageSize(),
2✔
2458
                                nextPageToken,
2✔
2459
                                token.TransientDecision,
2✔
2460
                                token.BranchToken,
2✔
2461
                        )
2✔
2462
                } else {
429✔
2463
                        history, token.PersistenceToken, err = wh.getHistory(
427✔
2464
                                ctx,
427✔
2465
                                scope,
427✔
2466
                                domainID,
427✔
2467
                                domainName,
427✔
2468
                                *execution,
427✔
2469
                                firstEventID,
427✔
2470
                                nextEventID,
427✔
2471
                                getRequest.GetMaximumPageSize(),
427✔
2472
                                nextPageToken,
427✔
2473
                                token.TransientDecision,
427✔
2474
                                token.BranchToken,
427✔
2475
                        )
427✔
2476
                }
427✔
2477
                if err != nil {
429✔
2478
                        return err
×
2479
                }
×
2480
                return nil
429✔
2481
        }
2482

2483
        if isCloseEventOnly {
447✔
2484
                if !isWorkflowRunning {
36✔
2485
                        if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil {
18✔
2486
                                return nil, wh.error(err, scope, tags...)
×
2487
                        }
×
2488
                        if isRawHistoryEnabled {
18✔
2489
                                // since getHistory func will not return empty history, so the below is safe
×
2490
                                historyBlob = historyBlob[len(historyBlob)-1:]
×
2491
                        } else {
18✔
2492
                                // since getHistory func will not return empty history, so the below is safe
18✔
2493
                                history.Events = history.Events[len(history.Events)-1:]
18✔
2494
                        }
18✔
2495
                        token = nil
18✔
2496
                } else if isLongPoll {
×
2497
                        // set the persistence token to be nil so next time we will query history for updates
×
2498
                        token.PersistenceToken = nil
×
2499
                } else {
×
2500
                        token = nil
×
2501
                }
×
2502
        } else {
411✔
2503
                // return all events
411✔
2504
                if token.FirstEventID >= token.NextEventID {
411✔
2505
                        // currently there is no new event
×
2506
                        history.Events = []*types.HistoryEvent{}
×
2507
                        if !isWorkflowRunning {
×
2508
                                token = nil
×
2509
                        }
×
2510
                } else {
411✔
2511
                        if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil {
411✔
2512
                                return nil, wh.error(err, scope, tags...)
×
2513
                        }
×
2514
                        // here, for long pull on history events, we need to intercept the paging token from cassandra
2515
                        // and do something clever
2516
                        if len(token.PersistenceToken) == 0 && (!token.IsWorkflowRunning || !isLongPoll) {
780✔
2517
                                // meaning, there is no more history to be returned
369✔
2518
                                token = nil
369✔
2519
                        }
369✔
2520
                }
2521
        }
2522

2523
        nextToken, err := serializeHistoryToken(token)
429✔
2524
        if err != nil {
429✔
2525
                return nil, wh.error(err, scope, tags...)
×
2526
        }
×
2527
        return &types.GetWorkflowExecutionHistoryResponse{
429✔
2528
                History:       history,
429✔
2529
                RawHistory:    historyBlob,
429✔
2530
                NextPageToken: nextToken,
429✔
2531
                Archived:      false,
429✔
2532
        }, nil
429✔
2533
}
2534

2535
func (wh *WorkflowHandler) withSignalName(
2536
        ctx context.Context,
2537
        domainName string,
2538
        signalName string,
2539
) context.Context {
724✔
2540
        if wh.config.EmitSignalNameMetricsTag(domainName) {
725✔
2541
                return metrics.TagContext(ctx, metrics.SignalNameTag(signalName))
1✔
2542
        }
1✔
2543
        return ctx
723✔
2544
}
2545

2546
// SignalWorkflowExecution is used to send a signal event to running workflow execution.  This results in
2547
// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
2548
func (wh *WorkflowHandler) SignalWorkflowExecution(
2549
        ctx context.Context,
2550
        signalRequest *types.SignalWorkflowExecutionRequest,
2551
) (retError error) {
724✔
2552
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
1,448✔
2553

2554
        ctx = wh.withSignalName(ctx, signalRequest.GetDomain(), signalRequest.GetSignalName())
724✔
2555
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendSignalWorkflowExecutionScope, signalRequest)
724✔
2556
        defer sw.Stop()
724✔
2557

724✔
2558
        if wh.isShuttingDown() {
724✔
2559
                return errShuttingDown
×
2560
        }
×
2561

2562
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
724✔
2563
                return wh.error(err, scope)
×
2564
        }
×
2565

2566
        if signalRequest == nil {
724✔
2567
                return wh.error(errRequestNotSet, scope)
×
2568
        }
×
2569

2570
        domainName := signalRequest.GetDomain()
724✔
2571
        wfExecution := signalRequest.GetWorkflowExecution()
724✔
2572
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
724✔
2573

724✔
2574
        if domainName == "" {
725✔
2575
                return wh.error(errDomainNotSet, scope, tags...)
1✔
2576
        }
1✔
2577

2578
        if ok := wh.allow(ratelimitTypeUser, signalRequest); !ok {
723✔
2579
                return wh.error(createServiceBusyError(), scope, tags...)
×
2580
        }
×
2581

2582
        if err := validateExecution(wfExecution); err != nil {
723✔
2583
                return wh.error(err, scope, tags...)
×
2584
        }
×
2585

2586
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
723✔
2587
        if !common.ValidIDLength(
723✔
2588
                domainName,
723✔
2589
                scope,
723✔
2590
                idLengthWarnLimit,
723✔
2591
                wh.config.DomainNameMaxLength(domainName),
723✔
2592
                metrics.CadenceErrDomainNameExceededWarnLimit,
723✔
2593
                domainName,
723✔
2594
                wh.GetLogger(),
723✔
2595
                tag.IDTypeDomainName) {
723✔
2596
                return wh.error(errDomainTooLong, scope, tags...)
×
2597
        }
×
2598

2599
        if signalRequest.GetSignalName() == "" {
723✔
2600
                return wh.error(errSignalNameNotSet, scope, tags...)
×
2601
        }
×
2602

2603
        if !common.ValidIDLength(
723✔
2604
                signalRequest.GetSignalName(),
723✔
2605
                scope,
723✔
2606
                idLengthWarnLimit,
723✔
2607
                wh.config.SignalNameMaxLength(domainName),
723✔
2608
                metrics.CadenceErrSignalNameExceededWarnLimit,
723✔
2609
                domainName,
723✔
2610
                wh.GetLogger(),
723✔
2611
                tag.IDTypeSignalName) {
723✔
2612
                return wh.error(errSignalNameTooLong, scope, tags...)
×
2613
        }
×
2614

2615
        if !common.ValidIDLength(
723✔
2616
                signalRequest.GetRequestID(),
723✔
2617
                scope,
723✔
2618
                idLengthWarnLimit,
723✔
2619
                wh.config.RequestIDMaxLength(domainName),
723✔
2620
                metrics.CadenceErrRequestIDExceededWarnLimit,
723✔
2621
                domainName,
723✔
2622
                wh.GetLogger(),
723✔
2623
                tag.IDTypeRequestID) {
723✔
2624
                return wh.error(errRequestIDTooLong, scope, tags...)
×
2625
        }
×
2626

2627
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
723✔
2628
        if err != nil {
723✔
2629
                return wh.error(err, scope, tags...)
×
2630
        }
×
2631

2632
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
723✔
2633
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
723✔
2634
        if err := common.CheckEventBlobSizeLimit(
723✔
2635
                len(signalRequest.Input),
723✔
2636
                sizeLimitWarn,
723✔
2637
                sizeLimitError,
723✔
2638
                domainID,
723✔
2639
                signalRequest.GetWorkflowExecution().GetWorkflowID(),
723✔
2640
                signalRequest.GetWorkflowExecution().GetRunID(),
723✔
2641
                scope,
723✔
2642
                wh.GetThrottledLogger(),
723✔
2643
                tag.BlobSizeViolationOperation("SignalWorkflowExecution"),
723✔
2644
        ); err != nil {
723✔
2645
                return wh.error(err, scope, tags...)
×
2646
        }
×
2647

2648
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
723✔
2649
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
723✔
2650
                return wh.error(&types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}, scope, tags...)
×
2651
        }
×
2652

2653
        err = wh.GetHistoryClient().SignalWorkflowExecution(ctx, &types.HistorySignalWorkflowExecutionRequest{
723✔
2654
                DomainUUID:    domainID,
723✔
2655
                SignalRequest: signalRequest,
723✔
2656
        })
723✔
2657
        if err != nil {
732✔
2658
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
9✔
2659
        }
9✔
2660

2661
        return nil
714✔
2662
}
2663

2664
// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
2665
// If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history
2666
// and a decision task being created for the execution.
2667
// If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled
2668
// event recorded in history, and a decision task being created for the execution
2669
func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(
2670
        ctx context.Context,
2671
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest,
2672
) (resp *types.StartWorkflowExecutionResponse, retError error) {
33✔
2673
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
66✔
2674

2675
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendSignalWithStartWorkflowExecutionScope, signalWithStartRequest)
33✔
2676
        defer sw.Stop()
33✔
2677

33✔
2678
        if wh.isShuttingDown() {
33✔
2679
                return nil, errShuttingDown
×
2680
        }
×
2681

2682
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
33✔
2683
                return nil, wh.error(err, scope)
×
2684
        }
×
2685

2686
        if signalWithStartRequest == nil {
33✔
2687
                return nil, wh.error(errRequestNotSet, scope)
×
2688
        }
×
2689

2690
        domainName := signalWithStartRequest.GetDomain()
33✔
2691
        wfExecution := &types.WorkflowExecution{
33✔
2692
                WorkflowID: signalWithStartRequest.GetWorkflowID(),
33✔
2693
        }
33✔
2694
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
33✔
2695

33✔
2696
        if domainName == "" {
33✔
2697
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
2698
        }
×
2699

2700
        if ok := wh.allow(ratelimitTypeUser, signalWithStartRequest); !ok {
33✔
2701
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
2702
        }
×
2703

2704
        if signalWithStartRequest.GetWorkflowID() == "" {
33✔
2705
                return nil, wh.error(errWorkflowIDNotSet, scope, tags...)
×
2706
        }
×
2707

2708
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
33✔
2709
        if !common.ValidIDLength(
33✔
2710
                domainName,
33✔
2711
                scope,
33✔
2712
                idLengthWarnLimit,
33✔
2713
                wh.config.DomainNameMaxLength(domainName),
33✔
2714
                metrics.CadenceErrDomainNameExceededWarnLimit,
33✔
2715
                domainName,
33✔
2716
                wh.GetLogger(),
33✔
2717
                tag.IDTypeDomainName) {
33✔
2718
                return nil, wh.error(errDomainTooLong, scope, tags...)
×
2719
        }
×
2720

2721
        if !common.ValidIDLength(
33✔
2722
                signalWithStartRequest.GetWorkflowID(),
33✔
2723
                scope,
33✔
2724
                idLengthWarnLimit,
33✔
2725
                wh.config.WorkflowIDMaxLength(domainName),
33✔
2726
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
33✔
2727
                domainName,
33✔
2728
                wh.GetLogger(),
33✔
2729
                tag.IDTypeWorkflowID) {
33✔
2730
                return nil, wh.error(errWorkflowIDTooLong, scope, tags...)
×
2731
        }
×
2732

2733
        if signalWithStartRequest.GetSignalName() == "" {
33✔
2734
                return nil, wh.error(errSignalNameNotSet, scope, tags...)
×
2735
        }
×
2736

2737
        if !common.ValidIDLength(
33✔
2738
                signalWithStartRequest.GetSignalName(),
33✔
2739
                scope,
33✔
2740
                idLengthWarnLimit,
33✔
2741
                wh.config.SignalNameMaxLength(domainName),
33✔
2742
                metrics.CadenceErrSignalNameExceededWarnLimit,
33✔
2743
                domainName,
33✔
2744
                wh.GetLogger(),
33✔
2745
                tag.IDTypeSignalName) {
33✔
2746
                return nil, wh.error(errSignalNameTooLong, scope, tags...)
×
2747
        }
×
2748

2749
        if signalWithStartRequest.WorkflowType == nil || signalWithStartRequest.WorkflowType.GetName() == "" {
33✔
2750
                return nil, wh.error(errWorkflowTypeNotSet, scope, tags...)
×
2751
        }
×
2752

2753
        if !common.ValidIDLength(
33✔
2754
                signalWithStartRequest.WorkflowType.GetName(),
33✔
2755
                scope,
33✔
2756
                idLengthWarnLimit,
33✔
2757
                wh.config.WorkflowTypeMaxLength(domainName),
33✔
2758
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
33✔
2759
                domainName,
33✔
2760
                wh.GetLogger(),
33✔
2761
                tag.IDTypeWorkflowType) {
33✔
2762
                return nil, wh.error(errWorkflowTypeTooLong, scope, tags...)
×
2763
        }
×
2764

2765
        if err := wh.validateTaskList(signalWithStartRequest.TaskList, scope, domainName); err != nil {
33✔
2766
                return nil, wh.error(err, scope, tags...)
×
2767
        }
×
2768

2769
        if !common.ValidIDLength(
33✔
2770
                signalWithStartRequest.GetRequestID(),
33✔
2771
                scope,
33✔
2772
                idLengthWarnLimit,
33✔
2773
                wh.config.RequestIDMaxLength(domainName),
33✔
2774
                metrics.CadenceErrRequestIDExceededWarnLimit,
33✔
2775
                domainName,
33✔
2776
                wh.GetLogger(),
33✔
2777
                tag.IDTypeRequestID) {
33✔
2778
                return nil, wh.error(errRequestIDTooLong, scope, tags...)
×
2779
        }
×
2780

2781
        if signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
33✔
2782
                return nil, wh.error(errInvalidExecutionStartToCloseTimeoutSeconds, scope, tags...)
×
2783
        }
×
2784

2785
        if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
33✔
2786
                return nil, wh.error(errInvalidTaskStartToCloseTimeoutSeconds, scope, tags...)
×
2787
        }
×
2788

2789
        if err := common.ValidateRetryPolicy(signalWithStartRequest.RetryPolicy); err != nil {
33✔
2790
                return nil, wh.error(err, scope, tags...)
×
2791
        }
×
2792

2793
        if signalWithStartRequest.GetCronSchedule() != "" {
33✔
2794
                if _, err := backoff.ValidateSchedule(signalWithStartRequest.GetCronSchedule()); err != nil {
×
2795
                        return nil, wh.error(err, scope, tags...)
×
2796
                }
×
2797
        }
2798

2799
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(signalWithStartRequest.SearchAttributes, domainName); err != nil {
33✔
2800
                return nil, wh.error(err, scope, tags...)
×
2801
        }
×
2802

2803
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
33✔
2804
        if err != nil {
33✔
2805
                return nil, wh.error(err, scope, tags...)
×
2806
        }
×
2807

2808
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
33✔
2809
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
33✔
2810
        if err := common.CheckEventBlobSizeLimit(
33✔
2811
                len(signalWithStartRequest.SignalInput),
33✔
2812
                sizeLimitWarn,
33✔
2813
                sizeLimitError,
33✔
2814
                domainID,
33✔
2815
                signalWithStartRequest.GetWorkflowID(),
33✔
2816
                "",
33✔
2817
                scope,
33✔
2818
                wh.GetThrottledLogger(),
33✔
2819
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
33✔
2820
        ); err != nil {
33✔
2821
                return nil, wh.error(err, scope, tags...)
×
2822
        }
×
2823
        actualSize := len(signalWithStartRequest.Input) + common.GetSizeOfMapStringToByteArray(signalWithStartRequest.Memo.GetFields())
33✔
2824
        if err := common.CheckEventBlobSizeLimit(
33✔
2825
                actualSize,
33✔
2826
                sizeLimitWarn,
33✔
2827
                sizeLimitError,
33✔
2828
                domainID,
33✔
2829
                signalWithStartRequest.GetWorkflowID(),
33✔
2830
                "",
33✔
2831
                scope,
33✔
2832
                wh.GetThrottledLogger(),
33✔
2833
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
33✔
2834
        ); err != nil {
33✔
2835
                return nil, wh.error(err, scope, tags...)
×
2836
        }
×
2837

2838
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
33✔
2839
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
33✔
2840
                return nil, wh.error(&types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}, scope, tags...)
×
2841
        }
×
2842

2843
        resp, err = wh.GetHistoryClient().SignalWithStartWorkflowExecution(ctx, &types.HistorySignalWithStartWorkflowExecutionRequest{
33✔
2844
                DomainUUID:             domainID,
33✔
2845
                SignalWithStartRequest: signalWithStartRequest,
33✔
2846
                PartitionConfig:        wh.getPartitionConfig(ctx, domainName),
33✔
2847
        })
33✔
2848
        if err != nil {
39✔
2849
                return nil, wh.error(err, scope, tags...)
6✔
2850
        }
6✔
2851

2852
        return resp, nil
27✔
2853
}
2854

2855
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event
2856
// in the history and immediately terminating the execution instance.
2857
func (wh *WorkflowHandler) TerminateWorkflowExecution(
2858
        ctx context.Context,
2859
        terminateRequest *types.TerminateWorkflowExecutionRequest,
2860
) (retError error) {
48✔
2861
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
96✔
2862

2863
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendTerminateWorkflowExecutionScope, terminateRequest)
48✔
2864
        defer sw.Stop()
48✔
2865

48✔
2866
        if wh.isShuttingDown() {
48✔
2867
                return errShuttingDown
×
2868
        }
×
2869

2870
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
48✔
2871
                return wh.error(err, scope)
×
2872
        }
×
2873

2874
        if terminateRequest == nil {
48✔
2875
                return wh.error(errRequestNotSet, scope)
×
2876
        }
×
2877

2878
        domainName := terminateRequest.GetDomain()
48✔
2879
        wfExecution := terminateRequest.GetWorkflowExecution()
48✔
2880
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
48✔
2881

48✔
2882
        if terminateRequest.GetDomain() == "" {
48✔
2883
                return wh.error(errDomainNotSet, scope, tags...)
×
2884
        }
×
2885

2886
        if ok := wh.allow(ratelimitTypeUser, terminateRequest); !ok {
48✔
2887
                return wh.error(createServiceBusyError(), scope, tags...)
×
2888
        }
×
2889

2890
        if err := validateExecution(wfExecution); err != nil {
48✔
2891
                return wh.error(err, scope, tags...)
×
2892
        }
×
2893

2894
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
48✔
2895
        if err != nil {
48✔
2896
                return wh.error(err, scope, tags...)
×
2897
        }
×
2898

2899
        err = wh.GetHistoryClient().TerminateWorkflowExecution(ctx, &types.HistoryTerminateWorkflowExecutionRequest{
48✔
2900
                DomainUUID:       domainID,
48✔
2901
                TerminateRequest: terminateRequest,
48✔
2902
        })
48✔
2903
        if err != nil {
48✔
2904
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
2905
        }
×
2906

2907
        return nil
48✔
2908
}
2909

2910
// ResetWorkflowExecution reset an existing workflow execution to the nextFirstEventID
2911
// in the history and immediately terminating the current execution instance.
2912
func (wh *WorkflowHandler) ResetWorkflowExecution(
2913
        ctx context.Context,
2914
        resetRequest *types.ResetWorkflowExecutionRequest,
2915
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
15✔
2916
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
30✔
2917

2918
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendResetWorkflowExecutionScope, resetRequest)
15✔
2919
        defer sw.Stop()
15✔
2920

15✔
2921
        if wh.isShuttingDown() {
15✔
2922
                return nil, errShuttingDown
×
2923
        }
×
2924

2925
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2926
                return nil, wh.error(err, scope)
×
2927
        }
×
2928

2929
        if resetRequest == nil {
15✔
2930
                return nil, wh.error(errRequestNotSet, scope)
×
2931
        }
×
2932

2933
        domainName := resetRequest.GetDomain()
15✔
2934
        wfExecution := resetRequest.GetWorkflowExecution()
15✔
2935
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
15✔
2936

15✔
2937
        if domainName == "" {
15✔
2938
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
2939
        }
×
2940

2941
        if ok := wh.allow(ratelimitTypeUser, resetRequest); !ok {
15✔
2942
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
2943
        }
×
2944

2945
        if err := validateExecution(wfExecution); err != nil {
15✔
2946
                return nil, wh.error(err, scope, tags...)
×
2947
        }
×
2948

2949
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
15✔
2950
        if err != nil {
15✔
2951
                return nil, wh.error(err, scope, tags...)
×
2952
        }
×
2953

2954
        resp, err = wh.GetHistoryClient().ResetWorkflowExecution(ctx, &types.HistoryResetWorkflowExecutionRequest{
15✔
2955
                DomainUUID:   domainID,
15✔
2956
                ResetRequest: resetRequest,
15✔
2957
        })
15✔
2958
        if err != nil {
15✔
2959
                return nil, wh.error(err, scope, tags...)
×
2960
        }
×
2961

2962
        return resp, nil
15✔
2963
}
2964

2965
// RequestCancelWorkflowExecution - requests to cancel a workflow execution
2966
func (wh *WorkflowHandler) RequestCancelWorkflowExecution(
2967
        ctx context.Context,
2968
        cancelRequest *types.RequestCancelWorkflowExecutionRequest,
2969
) (retError error) {
6✔
2970
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
12✔
2971

2972
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRequestCancelWorkflowExecutionScope, cancelRequest)
6✔
2973
        defer sw.Stop()
6✔
2974

6✔
2975
        if wh.isShuttingDown() {
6✔
2976
                return errShuttingDown
×
2977
        }
×
2978

2979
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
6✔
2980
                return wh.error(err, scope)
×
2981
        }
×
2982

2983
        if cancelRequest == nil {
6✔
2984
                return wh.error(errRequestNotSet, scope)
×
2985
        }
×
2986

2987
        domainName := cancelRequest.GetDomain()
6✔
2988
        wfExecution := cancelRequest.GetWorkflowExecution()
6✔
2989
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
6✔
2990

6✔
2991
        if domainName == "" {
6✔
2992
                return wh.error(errDomainNotSet, scope, tags...)
×
2993
        }
×
2994

2995
        if ok := wh.allow(ratelimitTypeUser, cancelRequest); !ok {
6✔
2996
                return wh.error(createServiceBusyError(), scope, tags...)
×
2997
        }
×
2998

2999
        if err := validateExecution(wfExecution); err != nil {
6✔
3000
                return wh.error(err, scope, tags...)
×
3001
        }
×
3002

3003
        domainID, err := wh.GetDomainCache().GetDomainID(cancelRequest.GetDomain())
6✔
3004
        if err != nil {
6✔
3005
                return wh.error(err, scope, tags...)
×
3006
        }
×
3007

3008
        err = wh.GetHistoryClient().RequestCancelWorkflowExecution(ctx, &types.HistoryRequestCancelWorkflowExecutionRequest{
6✔
3009
                DomainUUID:    domainID,
6✔
3010
                CancelRequest: cancelRequest,
6✔
3011
        })
6✔
3012
        if err != nil {
9✔
3013
                return wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
3✔
3014
        }
3✔
3015

3016
        return nil
3✔
3017
}
3018

3019
// ListOpenWorkflowExecutions - retrieves info for open workflow executions in a domain
3020
func (wh *WorkflowHandler) ListOpenWorkflowExecutions(
3021
        ctx context.Context,
3022
        listRequest *types.ListOpenWorkflowExecutionsRequest,
3023
) (resp *types.ListOpenWorkflowExecutionsResponse, retError error) {
103✔
3024
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
206✔
3025

3026
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendListOpenWorkflowExecutionsScope, listRequest)
103✔
3027
        defer sw.Stop()
103✔
3028

103✔
3029
        if wh.isShuttingDown() {
103✔
3030
                return nil, errShuttingDown
×
3031
        }
×
3032

3033
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
103✔
3034
                return nil, wh.error(err, scope)
×
3035
        }
×
3036

3037
        if listRequest == nil {
103✔
3038
                return nil, wh.error(errRequestNotSet, scope)
×
3039
        }
×
3040

3041
        if listRequest.GetDomain() == "" {
103✔
3042
                return nil, wh.error(errDomainNotSet, scope)
×
3043
        }
×
3044

3045
        if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok {
103✔
3046
                return nil, wh.error(createServiceBusyError(), scope)
×
3047
        }
×
3048

3049
        if listRequest.StartTimeFilter == nil {
103✔
3050
                return nil, wh.error(&types.BadRequestError{Message: "StartTimeFilter is required"}, scope)
×
3051
        }
×
3052

3053
        if listRequest.StartTimeFilter.EarliestTime == nil {
103✔
3054
                return nil, wh.error(&types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}, scope)
×
3055
        }
×
3056

3057
        if listRequest.StartTimeFilter.LatestTime == nil {
103✔
3058
                return nil, wh.error(&types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}, scope)
×
3059
        }
×
3060

3061
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
103✔
3062
                return nil, wh.error(&types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}, scope)
×
3063
        }
×
3064

3065
        if listRequest.ExecutionFilter != nil && listRequest.TypeFilter != nil {
103✔
3066
                return nil, wh.error(&types.BadRequestError{
×
3067
                        Message: "Only one of ExecutionFilter or TypeFilter is allowed"}, scope)
×
3068
        }
×
3069

3070
        if listRequest.GetMaximumPageSize() <= 0 {
164✔
3071
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
61✔
3072
        }
61✔
3073

3074
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
103✔
3075
                return nil, wh.error(&types.BadRequestError{
×
3076
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
×
3077
        }
×
3078

3079
        domain := listRequest.GetDomain()
103✔
3080
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
103✔
3081
        if err != nil {
103✔
3082
                return nil, wh.error(err, scope)
×
3083
        }
×
3084

3085
        baseReq := persistence.ListWorkflowExecutionsRequest{
103✔
3086
                DomainUUID:    domainID,
103✔
3087
                Domain:        domain,
103✔
3088
                PageSize:      int(listRequest.GetMaximumPageSize()),
103✔
3089
                NextPageToken: listRequest.NextPageToken,
103✔
3090
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
103✔
3091
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
103✔
3092
        }
103✔
3093

103✔
3094
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
103✔
3095
        if listRequest.ExecutionFilter != nil {
200✔
3096
                if wh.config.DisableListVisibilityByFilter(domain) {
98✔
3097
                        err = errNoPermission
1✔
3098
                } else {
97✔
3099
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByWorkflowID(
96✔
3100
                                ctx,
96✔
3101
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
96✔
3102
                                        ListWorkflowExecutionsRequest: baseReq,
96✔
3103
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
96✔
3104
                                })
96✔
3105
                }
96✔
3106
                wh.GetLogger().Debug("List open workflow with filter",
97✔
3107
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
97✔
3108
        } else if listRequest.TypeFilter != nil {
7✔
3109
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
3110
                        err = errNoPermission
1✔
3111
                } else {
1✔
3112
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByType(
×
3113
                                ctx,
×
3114
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
3115
                                        ListWorkflowExecutionsRequest: baseReq,
×
3116
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
3117
                                },
×
3118
                        )
×
3119
                }
×
3120
                wh.GetLogger().Debug("List open workflow with filter",
1✔
3121
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
3122
        } else {
5✔
3123
                persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutions(ctx, &baseReq)
5✔
3124
        }
5✔
3125

3126
        if err != nil {
105✔
3127
                return nil, wh.error(err, scope)
2✔
3128
        }
2✔
3129

3130
        resp = &types.ListOpenWorkflowExecutionsResponse{}
101✔
3131
        resp.Executions = persistenceResp.Executions
101✔
3132
        resp.NextPageToken = persistenceResp.NextPageToken
101✔
3133
        return resp, nil
101✔
3134
}
3135

3136
// ListArchivedWorkflowExecutions - retrieves archived info for closed workflow executions in a domain
3137
func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(
3138
        ctx context.Context,
3139
        listRequest *types.ListArchivedWorkflowExecutionsRequest,
3140
) (resp *types.ListArchivedWorkflowExecutionsResponse, retError error) {
15✔
3141
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
30✔
3142

3143
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendListArchivedWorkflowExecutionsScope, listRequest)
15✔
3144
        defer sw.Stop()
15✔
3145

15✔
3146
        if wh.isShuttingDown() {
15✔
3147
                return nil, errShuttingDown
×
3148
        }
×
3149

3150
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
3151
                return nil, wh.error(err, scope)
×
3152
        }
×
3153

3154
        if listRequest == nil {
15✔
3155
                return nil, wh.error(errRequestNotSet, scope)
×
3156
        }
×
3157

3158
        if listRequest.GetDomain() == "" {
16✔
3159
                return nil, wh.error(errDomainNotSet, scope)
1✔
3160
        }
1✔
3161

3162
        if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok {
14✔
3163
                return nil, wh.error(createServiceBusyError(), scope)
×
3164
        }
×
3165

3166
        if listRequest.GetPageSize() <= 0 {
14✔
3167
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3168
        }
×
3169

3170
        maxPageSize := wh.config.VisibilityArchivalQueryMaxPageSize()
14✔
3171
        if int(listRequest.GetPageSize()) > maxPageSize {
14✔
3172
                return nil, wh.error(&types.BadRequestError{
×
3173
                        Message: fmt.Sprintf("Pagesize is larger than allowed %d", maxPageSize)}, scope)
×
3174
        }
×
3175

3176
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() {
15✔
3177
                return nil, wh.error(&types.BadRequestError{Message: "Cluster is not configured for visibility archival"}, scope)
1✔
3178
        }
1✔
3179

3180
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ReadEnabled() {
13✔
3181
                return nil, wh.error(&types.BadRequestError{Message: "Cluster is not configured for reading archived visibility records"}, scope)
×
3182
        }
×
3183

3184
        entry, err := wh.GetDomainCache().GetDomain(listRequest.GetDomain())
13✔
3185
        if err != nil {
14✔
3186
                return nil, wh.error(err, scope)
1✔
3187
        }
1✔
3188

3189
        if entry.GetConfig().VisibilityArchivalStatus != types.ArchivalStatusEnabled {
14✔
3190
                return nil, wh.error(&types.BadRequestError{Message: "Domain is not configured for visibility archival"}, scope)
2✔
3191
        }
2✔
3192

3193
        URI, err := archiver.NewURI(entry.GetConfig().VisibilityArchivalURI)
10✔
3194
        if err != nil {
10✔
3195
                return nil, wh.error(err, scope)
×
3196
        }
×
3197

3198
        visibilityArchiver, err := wh.GetArchiverProvider().GetVisibilityArchiver(URI.Scheme(), service.Frontend)
10✔
3199
        if err != nil {
10✔
3200
                return nil, wh.error(err, scope)
×
3201
        }
×
3202

3203
        archiverRequest := &archiver.QueryVisibilityRequest{
10✔
3204
                DomainID:      entry.GetInfo().ID,
10✔
3205
                PageSize:      int(listRequest.GetPageSize()),
10✔
3206
                NextPageToken: listRequest.NextPageToken,
10✔
3207
                Query:         listRequest.GetQuery(),
10✔
3208
        }
10✔
3209

10✔
3210
        archiverResponse, err := visibilityArchiver.Query(ctx, URI, archiverRequest)
10✔
3211
        if err != nil {
10✔
3212
                return nil, wh.error(err, scope)
×
3213
        }
×
3214

3215
        // special handling of ExecutionTime for cron or retry
3216
        for _, execution := range archiverResponse.Executions {
25✔
3217
                if execution.GetExecutionTime() == 0 {
30✔
3218
                        execution.ExecutionTime = common.Int64Ptr(execution.GetStartTime())
15✔
3219
                }
15✔
3220
        }
3221

3222
        return &types.ListArchivedWorkflowExecutionsResponse{
10✔
3223
                Executions:    archiverResponse.Executions,
10✔
3224
                NextPageToken: archiverResponse.NextPageToken,
10✔
3225
        }, nil
10✔
3226
}
3227

3228
// ListClosedWorkflowExecutions - retrieves info for closed workflow executions in a domain
3229
func (wh *WorkflowHandler) ListClosedWorkflowExecutions(
3230
        ctx context.Context,
3231
        listRequest *types.ListClosedWorkflowExecutionsRequest,
3232
) (resp *types.ListClosedWorkflowExecutionsResponse, retError error) {
27✔
3233
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
54✔
3234

3235
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendListClosedWorkflowExecutionsScope, listRequest)
27✔
3236
        defer sw.Stop()
27✔
3237

27✔
3238
        if wh.isShuttingDown() {
27✔
3239
                return nil, errShuttingDown
×
3240
        }
×
3241

3242
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
27✔
3243
                return nil, wh.error(err, scope)
×
3244
        }
×
3245

3246
        if listRequest == nil {
27✔
3247
                return nil, wh.error(errRequestNotSet, scope)
×
3248
        }
×
3249

3250
        if listRequest.GetDomain() == "" {
27✔
3251
                return nil, wh.error(errDomainNotSet, scope)
×
3252
        }
×
3253

3254
        if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok {
27✔
3255
                return nil, wh.error(createServiceBusyError(), scope)
×
3256
        }
×
3257

3258
        if listRequest.StartTimeFilter == nil {
27✔
3259
                return nil, wh.error(&types.BadRequestError{Message: "StartTimeFilter is required"}, scope)
×
3260
        }
×
3261

3262
        if listRequest.StartTimeFilter.EarliestTime == nil {
27✔
3263
                return nil, wh.error(&types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}, scope)
×
3264
        }
×
3265

3266
        if listRequest.StartTimeFilter.LatestTime == nil {
27✔
3267
                return nil, wh.error(&types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}, scope)
×
3268
        }
×
3269

3270
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
27✔
3271
                return nil, wh.error(&types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}, scope)
×
3272
        }
×
3273

3274
        filterCount := 0
27✔
3275
        if listRequest.TypeFilter != nil {
28✔
3276
                filterCount++
1✔
3277
        }
1✔
3278
        if listRequest.StatusFilter != nil {
28✔
3279
                filterCount++
1✔
3280
        }
1✔
3281

3282
        if filterCount > 1 {
27✔
3283
                return nil, wh.error(&types.BadRequestError{
×
3284
                        Message: "Only one of ExecutionFilter, TypeFilter or StatusFilter is allowed"}, scope)
×
3285
        } // If ExecutionFilter is provided with one of TypeFilter or StatusFilter, use ExecutionFilter and ignore other filter
×
3286

3287
        if listRequest.GetMaximumPageSize() <= 0 {
28✔
3288
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
1✔
3289
        }
1✔
3290

3291
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
27✔
3292
                return nil, wh.error(&types.BadRequestError{
×
3293
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
×
3294
        }
×
3295

3296
        domain := listRequest.GetDomain()
27✔
3297
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
27✔
3298
        if err != nil {
27✔
3299
                return nil, wh.error(err, scope)
×
3300
        }
×
3301

3302
        baseReq := persistence.ListWorkflowExecutionsRequest{
27✔
3303
                DomainUUID:    domainID,
27✔
3304
                Domain:        domain,
27✔
3305
                PageSize:      int(listRequest.GetMaximumPageSize()),
27✔
3306
                NextPageToken: listRequest.NextPageToken,
27✔
3307
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
27✔
3308
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
27✔
3309
        }
27✔
3310

27✔
3311
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
27✔
3312
        if listRequest.ExecutionFilter != nil {
43✔
3313
                if wh.config.DisableListVisibilityByFilter(domain) {
17✔
3314
                        err = errNoPermission
1✔
3315
                } else {
16✔
3316
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByWorkflowID(
15✔
3317
                                ctx,
15✔
3318
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
15✔
3319
                                        ListWorkflowExecutionsRequest: baseReq,
15✔
3320
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
15✔
3321
                                },
15✔
3322
                        )
15✔
3323
                }
15✔
3324
                wh.GetLogger().Debug("List closed workflow with filter",
16✔
3325
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
16✔
3326
        } else if listRequest.TypeFilter != nil {
12✔
3327
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
3328
                        err = errNoPermission
1✔
3329
                } else {
1✔
3330
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByType(
×
3331
                                ctx,
×
3332
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
3333
                                        ListWorkflowExecutionsRequest: baseReq,
×
3334
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
3335
                                },
×
3336
                        )
×
3337
                }
×
3338
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
3339
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
3340
        } else if listRequest.StatusFilter != nil {
11✔
3341
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
3342
                        err = errNoPermission
1✔
3343
                } else {
1✔
3344
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByStatus(
×
3345
                                ctx,
×
3346
                                &persistence.ListClosedWorkflowExecutionsByStatusRequest{
×
3347
                                        ListWorkflowExecutionsRequest: baseReq,
×
3348
                                        Status:                        listRequest.GetStatusFilter(),
×
3349
                                },
×
3350
                        )
×
3351
                }
×
3352
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
3353
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByStatus)
1✔
3354
        } else {
9✔
3355
                persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutions(ctx, &baseReq)
9✔
3356
        }
9✔
3357

3358
        if err != nil {
30✔
3359
                return nil, wh.error(err, scope)
3✔
3360
        }
3✔
3361

3362
        resp = &types.ListClosedWorkflowExecutionsResponse{}
24✔
3363
        resp.Executions = persistenceResp.Executions
24✔
3364
        resp.NextPageToken = persistenceResp.NextPageToken
24✔
3365
        return resp, nil
24✔
3366
}
3367

3368
// ListWorkflowExecutions - retrieves info for workflow executions in a domain
3369
func (wh *WorkflowHandler) ListWorkflowExecutions(
3370
        ctx context.Context,
3371
        listRequest *types.ListWorkflowExecutionsRequest,
3372
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
143✔
3373
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
286✔
3374

3375
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendListWorkflowExecutionsScope, listRequest)
143✔
3376
        defer sw.Stop()
143✔
3377

143✔
3378
        if wh.isShuttingDown() {
143✔
3379
                return nil, errShuttingDown
×
3380
        }
×
3381

3382
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
143✔
3383
                return nil, wh.error(err, scope)
×
3384
        }
×
3385

3386
        if listRequest == nil {
143✔
3387
                return nil, wh.error(errRequestNotSet, scope)
×
3388
        }
×
3389

3390
        if listRequest.GetDomain() == "" {
143✔
3391
                return nil, wh.error(errDomainNotSet, scope)
×
3392
        }
×
3393

3394
        if ok := wh.allow(ratelimitTypeVisibility, listRequest); !ok {
143✔
3395
                return nil, wh.error(createServiceBusyError(), scope)
×
3396
        }
×
3397

3398
        if listRequest.GetPageSize() <= 0 {
143✔
3399
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3400
        }
×
3401

3402
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
143✔
3403
                return nil, wh.error(&types.BadRequestError{
×
3404
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
×
3405
        }
×
3406

3407
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
143✔
3408
        if err != nil {
146✔
3409
                return nil, wh.error(err, scope)
3✔
3410
        }
3✔
3411

3412
        domain := listRequest.GetDomain()
140✔
3413
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
140✔
3414
        if err != nil {
140✔
3415
                return nil, wh.error(err, scope)
×
3416
        }
×
3417

3418
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
140✔
3419
                DomainUUID:    domainID,
140✔
3420
                Domain:        domain,
140✔
3421
                PageSize:      int(listRequest.GetPageSize()),
140✔
3422
                NextPageToken: listRequest.NextPageToken,
140✔
3423
                Query:         validatedQuery,
140✔
3424
        }
140✔
3425
        persistenceResp, err := wh.GetVisibilityManager().ListWorkflowExecutions(ctx, req)
140✔
3426
        if err != nil {
140✔
3427
                return nil, wh.error(err, scope)
×
3428
        }
×
3429

3430
        resp = &types.ListWorkflowExecutionsResponse{}
140✔
3431
        resp.Executions = persistenceResp.Executions
140✔
3432
        resp.NextPageToken = persistenceResp.NextPageToken
140✔
3433
        return resp, nil
140✔
3434
}
3435

3436
// RestartWorkflowExecution - retrieves info for an existing workflow then restarts it
3437
func (wh *WorkflowHandler) RestartWorkflowExecution(ctx context.Context, request *types.RestartWorkflowExecutionRequest) (resp *types.RestartWorkflowExecutionResponse, retError error) {
2✔
3438
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
4✔
3439

3440
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendRestartWorkflowExecutionScope, request)
2✔
3441
        defer sw.Stop()
2✔
3442

2✔
3443
        if wh.isShuttingDown() {
2✔
3444
                return nil, errShuttingDown
×
3445
        }
×
3446

3447
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
3448
                return nil, wh.error(err, scope)
×
3449
        }
×
3450

3451
        if request == nil {
2✔
3452
                return nil, wh.error(errRequestNotSet, scope)
×
3453
        }
×
3454

3455
        domainName := request.GetDomain()
2✔
3456
        wfExecution := request.GetWorkflowExecution()
2✔
3457
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
2✔
3458

2✔
3459
        if request.GetDomain() == "" {
2✔
3460
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
3461
        }
×
3462

3463
        if ok := wh.allow(ratelimitTypeUser, request); !ok {
2✔
3464
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
3465
        }
×
3466

3467
        if err := validateExecution(wfExecution); err != nil {
2✔
3468
                return nil, wh.error(err, scope, tags...)
×
3469
        }
×
3470

3471
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
2✔
3472
        if err != nil {
2✔
3473
                return nil, wh.error(err, scope, tags...)
×
3474
        }
×
3475

3476
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
2✔
3477
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
3✔
3478
                return nil, wh.error(&types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}, scope, tags...)
1✔
3479
        }
1✔
3480

3481
        history, err := wh.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
1✔
3482
                Domain: domainName,
1✔
3483
                Execution: &types.WorkflowExecution{
1✔
3484
                        WorkflowID: wfExecution.WorkflowID,
1✔
3485
                        RunID:      wfExecution.RunID,
1✔
3486
                },
1✔
3487
                SkipArchival: true,
1✔
3488
        })
1✔
3489
        if err != nil {
1✔
3490
                return nil, wh.error(errHistoryNotFound, scope, tags...)
×
3491
        }
×
3492
        startRequest := constructRestartWorkflowRequest(history.History.Events[0].WorkflowExecutionStartedEventAttributes,
1✔
3493
                domainName, request.Identity, wfExecution.WorkflowID)
1✔
3494
        req, err := common.CreateHistoryStartWorkflowRequest(domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
1✔
3495
        if err != nil {
1✔
3496
                return nil, err
×
3497
        }
×
3498
        startResp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, req)
1✔
3499
        if err != nil {
1✔
3500
                return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
3501
        }
×
3502
        resp = &types.RestartWorkflowExecutionResponse{
1✔
3503
                RunID: startResp.RunID,
1✔
3504
        }
1✔
3505

1✔
3506
        return resp, nil
1✔
3507
}
3508

3509
// ScanWorkflowExecutions - retrieves info for large amount of workflow executions in a domain without order
3510
func (wh *WorkflowHandler) ScanWorkflowExecutions(
3511
        ctx context.Context,
3512
        listRequest *types.ListWorkflowExecutionsRequest,
3513
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
32✔
3514
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
64✔
3515

3516
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendScanWorkflowExecutionsScope, listRequest)
32✔
3517
        defer sw.Stop()
32✔
3518

32✔
3519
        if wh.isShuttingDown() {
32✔
3520
                return nil, errShuttingDown
×
3521
        }
×
3522

3523
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
32✔
3524
                return nil, wh.error(err, scope)
×
3525
        }
×
3526

3527
        if listRequest == nil {
32✔
3528
                return nil, wh.error(errRequestNotSet, scope)
×
3529
        }
×
3530

3531
        if listRequest.GetDomain() == "" {
32✔
3532
                return nil, wh.error(errDomainNotSet, scope)
×
3533
        }
×
3534

3535
        if ok := wh.allow(ratelimitTypeUser, listRequest); !ok {
32✔
3536
                return nil, wh.error(createServiceBusyError(), scope)
×
3537
        }
×
3538

3539
        if listRequest.GetPageSize() <= 0 {
32✔
3540
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3541
        }
×
3542

3543
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
32✔
3544
                return nil, wh.error(&types.BadRequestError{
×
3545
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
×
3546
        }
×
3547

3548
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
32✔
3549
        if err != nil {
33✔
3550
                return nil, wh.error(err, scope)
1✔
3551
        }
1✔
3552

3553
        domain := listRequest.GetDomain()
31✔
3554
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
31✔
3555
        if err != nil {
31✔
3556
                return nil, wh.error(err, scope)
×
3557
        }
×
3558

3559
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
31✔
3560
                DomainUUID:    domainID,
31✔
3561
                Domain:        domain,
31✔
3562
                PageSize:      int(listRequest.GetPageSize()),
31✔
3563
                NextPageToken: listRequest.NextPageToken,
31✔
3564
                Query:         validatedQuery,
31✔
3565
        }
31✔
3566
        persistenceResp, err := wh.GetVisibilityManager().ScanWorkflowExecutions(ctx, req)
31✔
3567
        if err != nil {
31✔
3568
                return nil, wh.error(err, scope)
×
3569
        }
×
3570

3571
        resp = &types.ListWorkflowExecutionsResponse{}
31✔
3572
        resp.Executions = persistenceResp.Executions
31✔
3573
        resp.NextPageToken = persistenceResp.NextPageToken
31✔
3574
        return resp, nil
31✔
3575
}
3576

3577
// CountWorkflowExecutions - count number of workflow executions in a domain
3578
func (wh *WorkflowHandler) CountWorkflowExecutions(
3579
        ctx context.Context,
3580
        countRequest *types.CountWorkflowExecutionsRequest,
3581
) (resp *types.CountWorkflowExecutionsResponse, retError error) {
15✔
3582
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
30✔
3583

3584
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendCountWorkflowExecutionsScope, countRequest)
15✔
3585
        defer sw.Stop()
15✔
3586

15✔
3587
        if wh.isShuttingDown() {
15✔
3588
                return nil, errShuttingDown
×
3589
        }
×
3590

3591
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
3592
                return nil, wh.error(err, scope)
×
3593
        }
×
3594

3595
        if countRequest == nil {
16✔
3596
                return nil, wh.error(errRequestNotSet, scope)
1✔
3597
        }
1✔
3598

3599
        if countRequest.GetDomain() == "" {
14✔
3600
                return nil, wh.error(errDomainNotSet, scope)
×
3601
        }
×
3602

3603
        if ok := wh.allow(ratelimitTypeUser, countRequest); !ok {
14✔
3604
                return nil, wh.error(createServiceBusyError(), scope)
×
3605
        }
×
3606

3607
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(countRequest.GetQuery())
14✔
3608
        if err != nil {
15✔
3609
                return nil, wh.error(err, scope)
1✔
3610
        }
1✔
3611

3612
        domain := countRequest.GetDomain()
13✔
3613
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
13✔
3614
        if err != nil {
13✔
3615
                return nil, wh.error(err, scope)
×
3616
        }
×
3617

3618
        req := &persistence.CountWorkflowExecutionsRequest{
13✔
3619
                DomainUUID: domainID,
13✔
3620
                Domain:     domain,
13✔
3621
                Query:      validatedQuery,
13✔
3622
        }
13✔
3623
        persistenceResp, err := wh.GetVisibilityManager().CountWorkflowExecutions(ctx, req)
13✔
3624
        if err != nil {
13✔
3625
                return nil, wh.error(err, scope)
×
3626
        }
×
3627

3628
        resp = &types.CountWorkflowExecutionsResponse{
13✔
3629
                Count: persistenceResp.Count,
13✔
3630
        }
13✔
3631
        return resp, nil
13✔
3632
}
3633

3634
// GetSearchAttributes return valid indexed keys
3635
func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context) (resp *types.GetSearchAttributesResponse, retError error) {
1✔
3636
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
2✔
3637

3638
        scope, sw := wh.startRequestProfile(ctx, metrics.FrontendGetSearchAttributesScope)
1✔
3639
        defer sw.Stop()
1✔
3640

1✔
3641
        if wh.isShuttingDown() {
1✔
3642
                return nil, errShuttingDown
×
3643
        }
×
3644

3645
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1✔
3646
                return nil, wh.error(err, scope)
×
3647
        }
×
3648

3649
        keys := wh.config.ValidSearchAttributes()
1✔
3650
        resp = &types.GetSearchAttributesResponse{
1✔
3651
                Keys: wh.convertIndexedKeyToThrift(keys),
1✔
3652
        }
1✔
3653
        return resp, nil
1✔
3654
}
3655

3656
// ResetStickyTaskList reset the volatile information in mutable state of a given workflow.
3657
func (wh *WorkflowHandler) ResetStickyTaskList(
3658
        ctx context.Context,
3659
        resetRequest *types.ResetStickyTaskListRequest,
3660
) (resp *types.ResetStickyTaskListResponse, retError error) {
3✔
3661
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
6✔
3662

3663
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendResetStickyTaskListScope, resetRequest)
3✔
3664
        defer sw.Stop()
3✔
3665

3✔
3666
        if wh.isShuttingDown() {
3✔
3667
                return nil, errShuttingDown
×
3668
        }
×
3669

3670
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
3671
                return nil, wh.error(err, scope)
×
3672
        }
×
3673

3674
        if resetRequest == nil {
3✔
3675
                return nil, wh.error(errRequestNotSet, scope)
×
3676
        }
×
3677

3678
        domainName := resetRequest.GetDomain()
3✔
3679
        wfExecution := resetRequest.GetExecution()
3✔
3680
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
3✔
3681

3✔
3682
        if domainName == "" {
3✔
3683
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
3684
        }
×
3685

3686
        // Count the request in the host RPS,
3687
        // but we still accept it even if RPS is exceeded
3688
        wh.allow(ratelimitTypeWorker, resetRequest)
3✔
3689

3✔
3690
        if err := validateExecution(wfExecution); err != nil {
3✔
3691
                return nil, wh.error(err, scope, tags...)
×
3692
        }
×
3693

3694
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
3✔
3695
        if err != nil {
3✔
3696
                return nil, wh.error(err, scope, tags...)
×
3697
        }
×
3698

3699
        _, err = wh.GetHistoryClient().ResetStickyTaskList(ctx, &types.HistoryResetStickyTaskListRequest{
3✔
3700
                DomainUUID: domainID,
3✔
3701
                Execution:  resetRequest.Execution,
3✔
3702
        })
3✔
3703
        if err != nil {
3✔
3704
                return nil, wh.normalizeVersionedErrors(ctx, wh.error(err, scope, tags...))
×
3705
        }
×
3706
        return &types.ResetStickyTaskListResponse{}, nil
3✔
3707
}
3708

3709
// QueryWorkflow returns query result for a specified workflow execution
3710
func (wh *WorkflowHandler) QueryWorkflow(
3711
        ctx context.Context,
3712
        queryRequest *types.QueryWorkflowRequest,
3713
) (resp *types.QueryWorkflowResponse, retError error) {
45✔
3714
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
90✔
3715

3716
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendQueryWorkflowScope, queryRequest)
45✔
3717
        defer sw.Stop()
45✔
3718

45✔
3719
        if wh.isShuttingDown() {
45✔
3720
                return nil, errShuttingDown
×
3721
        }
×
3722

3723
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
45✔
3724
                return nil, wh.error(err, scope)
×
3725
        }
×
3726

3727
        if queryRequest == nil {
45✔
3728
                return nil, wh.error(errRequestNotSet, scope)
×
3729
        }
×
3730

3731
        domainName := queryRequest.GetDomain()
45✔
3732
        wfExecution := queryRequest.GetExecution()
45✔
3733
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
45✔
3734

45✔
3735
        if domainName == "" {
45✔
3736
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
3737
        }
×
3738

3739
        if ok := wh.allow(ratelimitTypeUser, queryRequest); !ok {
45✔
3740
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
3741
        }
×
3742

3743
        if err := validateExecution(wfExecution); err != nil {
45✔
3744
                return nil, wh.error(err, scope, tags...)
×
3745
        }
×
3746

3747
        if wh.config.DisallowQuery(domainName) {
45✔
3748
                return nil, wh.error(errQueryDisallowedForDomain, scope, tags...)
×
3749
        }
×
3750

3751
        if queryRequest.Query == nil {
45✔
3752
                return nil, wh.error(errQueryNotSet, scope, tags...)
×
3753
        }
×
3754

3755
        if queryRequest.Query.GetQueryType() == "" {
45✔
3756
                return nil, wh.error(errQueryTypeNotSet, scope, tags...)
×
3757
        }
×
3758

3759
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
45✔
3760
        if err != nil {
45✔
3761
                return nil, wh.error(err, scope, tags...)
×
3762
        }
×
3763

3764
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
45✔
3765
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
45✔
3766

45✔
3767
        if err := common.CheckEventBlobSizeLimit(
45✔
3768
                len(queryRequest.GetQuery().GetQueryArgs()),
45✔
3769
                sizeLimitWarn,
45✔
3770
                sizeLimitError,
45✔
3771
                domainID,
45✔
3772
                queryRequest.GetExecution().GetWorkflowID(),
45✔
3773
                queryRequest.GetExecution().GetRunID(),
45✔
3774
                scope,
45✔
3775
                wh.GetThrottledLogger(),
45✔
3776
                tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil {
45✔
3777
                return nil, wh.error(err, scope, tags...)
×
3778
        }
×
3779

3780
        req := &types.HistoryQueryWorkflowRequest{
45✔
3781
                DomainUUID: domainID,
45✔
3782
                Request:    queryRequest,
45✔
3783
        }
45✔
3784
        hResponse, err := wh.GetHistoryClient().QueryWorkflow(ctx, req)
45✔
3785
        if err != nil {
57✔
3786
                return nil, wh.error(err, scope, tags...)
12✔
3787
        }
12✔
3788
        return hResponse.GetResponse(), nil
33✔
3789
}
3790

3791
// DescribeWorkflowExecution returns information about the specified workflow execution.
3792
func (wh *WorkflowHandler) DescribeWorkflowExecution(
3793
        ctx context.Context,
3794
        request *types.DescribeWorkflowExecutionRequest,
3795
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
93✔
3796
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
186✔
3797

3798
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendDescribeWorkflowExecutionScope, request)
93✔
3799
        defer sw.Stop()
93✔
3800

93✔
3801
        if wh.isShuttingDown() {
93✔
3802
                return nil, errShuttingDown
×
3803
        }
×
3804

3805
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
93✔
3806
                return nil, wh.error(err, scope)
×
3807
        }
×
3808

3809
        if request == nil {
93✔
3810
                return nil, wh.error(errRequestNotSet, scope)
×
3811
        }
×
3812

3813
        domainName := request.GetDomain()
93✔
3814
        wfExecution := request.GetExecution()
93✔
3815
        tags := getDomainWfIDRunIDTags(domainName, wfExecution)
93✔
3816

93✔
3817
        if domainName == "" {
93✔
3818
                return nil, wh.error(errDomainNotSet, scope, tags...)
×
3819
        }
×
3820

3821
        if ok := wh.allow(ratelimitTypeUser, request); !ok {
93✔
3822
                return nil, wh.error(createServiceBusyError(), scope, tags...)
×
3823
        }
×
3824

3825
        if err := validateExecution(wfExecution); err != nil {
93✔
3826
                return nil, wh.error(err, scope, tags...)
×
3827
        }
×
3828

3829
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
93✔
3830
        if err != nil {
93✔
3831
                return nil, wh.error(err, scope, tags...)
×
3832
        }
×
3833

3834
        response, err := wh.GetHistoryClient().DescribeWorkflowExecution(ctx, &types.HistoryDescribeWorkflowExecutionRequest{
93✔
3835
                DomainUUID: domainID,
93✔
3836
                Request:    request,
93✔
3837
        })
93✔
3838

93✔
3839
        if err != nil {
93✔
3840
                return nil, wh.error(err, scope, tags...)
×
3841
        }
×
3842

3843
        return response, nil
93✔
3844
}
3845

3846
// DescribeTaskList returns information about the target tasklist, right now this API returns the
3847
// pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true,
3848
// it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
3849
func (wh *WorkflowHandler) DescribeTaskList(
3850
        ctx context.Context,
3851
        request *types.DescribeTaskListRequest,
3852
) (resp *types.DescribeTaskListResponse, retError error) {
18✔
3853
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
36✔
3854

3855
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendDescribeTaskListScope, request)
18✔
3856
        defer sw.Stop()
18✔
3857

18✔
3858
        if wh.isShuttingDown() {
18✔
3859
                return nil, errShuttingDown
×
3860
        }
×
3861

3862
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
18✔
3863
                return nil, wh.error(err, scope)
×
3864
        }
×
3865

3866
        if request == nil {
18✔
3867
                return nil, wh.error(errRequestNotSet, scope)
×
3868
        }
×
3869

3870
        if request.GetDomain() == "" {
18✔
3871
                return nil, wh.error(errDomainNotSet, scope)
×
3872
        }
×
3873

3874
        if ok := wh.allow(ratelimitTypeUser, request); !ok {
18✔
3875
                return nil, wh.error(createServiceBusyError(), scope)
×
3876
        }
×
3877

3878
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
18✔
3879
        if err != nil {
18✔
3880
                return nil, wh.error(err, scope)
×
3881
        }
×
3882

3883
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
18✔
3884
                return nil, wh.error(err, scope)
×
3885
        }
×
3886

3887
        if request.TaskListType == nil {
18✔
3888
                return nil, wh.error(errTaskListTypeNotSet, scope)
×
3889
        }
×
3890

3891
        response, err := wh.GetMatchingClient().DescribeTaskList(ctx, &types.MatchingDescribeTaskListRequest{
18✔
3892
                DomainUUID:  domainID,
18✔
3893
                DescRequest: request,
18✔
3894
        })
18✔
3895
        if err != nil {
18✔
3896
                return nil, wh.error(err, scope)
×
3897
        }
×
3898

3899
        return response, nil
18✔
3900
}
3901

3902
// ListTaskListPartitions returns all the partition and host for a taskList
3903
func (wh *WorkflowHandler) ListTaskListPartitions(
3904
        ctx context.Context,
3905
        request *types.ListTaskListPartitionsRequest,
3906
) (resp *types.ListTaskListPartitionsResponse, retError error) {
×
3907
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
3908

3909
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendListTaskListPartitionsScope, request)
×
3910
        defer sw.Stop()
×
3911

×
3912
        if wh.isShuttingDown() {
×
3913
                return nil, errShuttingDown
×
3914
        }
×
3915

3916
        if request == nil {
×
3917
                return nil, wh.error(errRequestNotSet, scope)
×
3918
        }
×
3919

3920
        if request.GetDomain() == "" {
×
3921
                return nil, wh.error(errDomainNotSet, scope)
×
3922
        }
×
3923

3924
        if ok := wh.allow(ratelimitTypeUser, request); !ok {
×
3925
                return nil, wh.error(createServiceBusyError(), scope)
×
3926
        }
×
3927

3928
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
×
3929
                return nil, wh.error(err, scope)
×
3930
        }
×
3931

3932
        resp, err := wh.GetMatchingClient().ListTaskListPartitions(ctx, &types.MatchingListTaskListPartitionsRequest{
×
3933
                Domain:   request.Domain,
×
3934
                TaskList: request.TaskList,
×
3935
        })
×
3936
        return resp, err
×
3937
}
3938

3939
// GetTaskListsByDomain returns all the partition and host for a taskList
3940
func (wh *WorkflowHandler) GetTaskListsByDomain(
3941
        ctx context.Context,
3942
        request *types.GetTaskListsByDomainRequest,
3943
) (resp *types.GetTaskListsByDomainResponse, retError error) {
×
3944
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &retError) }()
×
3945

3946
        scope, sw := wh.startRequestProfileWithDomain(ctx, metrics.FrontendGetTaskListsByDomainScope, request)
×
3947
        defer sw.Stop()
×
3948

×
3949
        if wh.isShuttingDown() {
×
3950
                return nil, errShuttingDown
×
3951
        }
×
3952

3953
        if request == nil {
×
3954
                return nil, wh.error(errRequestNotSet, scope)
×
3955
        }
×
3956

3957
        if request.GetDomain() == "" {
×
3958
                return nil, wh.error(errDomainNotSet, scope)
×
3959
        }
×
3960

3961
        if ok := wh.allow(ratelimitTypeUser, request); !ok {
×
3962
                return nil, wh.error(createServiceBusyError(), scope)
×
3963
        }
×
3964

3965
        resp, err := wh.GetMatchingClient().GetTaskListsByDomain(ctx, &types.GetTaskListsByDomainRequest{
×
3966
                Domain: request.Domain,
×
3967
        })
×
3968
        return resp, err
×
3969
}
3970

3971
// RefreshWorkflowTasks re-generates the workflow tasks
3972
func (wh *WorkflowHandler) RefreshWorkflowTasks(
3973
        ctx context.Context,
3974
        request *types.RefreshWorkflowTasksRequest,
3975
) (err error) {
×
3976
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &err) }()
×
3977
        scope, sw := wh.startRequestProfile(ctx, metrics.AdminRefreshWorkflowTasksScope)
×
3978
        defer sw.Stop()
×
3979

×
3980
        if request == nil {
×
3981
                return wh.error(errRequestNotSet, scope)
×
3982
        }
×
3983
        if err := validateExecution(request.Execution); err != nil {
×
3984
                return wh.error(err, scope)
×
3985
        }
×
3986
        domainEntry, err := wh.GetDomainCache().GetDomain(request.GetDomain())
×
3987
        if err != nil {
×
3988
                return wh.error(err, scope)
×
3989
        }
×
3990

3991
        err = wh.GetHistoryClient().RefreshWorkflowTasks(ctx, &types.HistoryRefreshWorkflowTasksRequest{
×
3992
                DomainUIID: domainEntry.GetInfo().ID,
×
3993
                Request:    request,
×
3994
        })
×
3995
        if err != nil {
×
3996
                return wh.error(err, scope)
×
3997
        }
×
3998
        return nil
×
3999
}
4000

4001
func (wh *WorkflowHandler) getRawHistory(
4002
        ctx context.Context,
4003
        scope metrics.Scope,
4004
        domainID string,
4005
        domainName string,
4006
        execution types.WorkflowExecution,
4007
        firstEventID int64,
4008
        nextEventID int64,
4009
        pageSize int32,
4010
        nextPageToken []byte,
4011
        transientDecision *types.TransientDecisionInfo,
4012
        branchToken []byte,
4013
) ([]*types.DataBlob, []byte, error) {
2✔
4014
        rawHistory := []*types.DataBlob{}
2✔
4015
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
2✔
4016

2✔
4017
        resp, err := wh.GetHistoryManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
2✔
4018
                BranchToken:   branchToken,
2✔
4019
                MinEventID:    firstEventID,
2✔
4020
                MaxEventID:    nextEventID,
2✔
4021
                PageSize:      int(pageSize),
2✔
4022
                NextPageToken: nextPageToken,
2✔
4023
                ShardID:       common.IntPtr(shardID),
2✔
4024
                DomainName:    domainName,
2✔
4025
        })
2✔
4026
        if err != nil {
2✔
4027
                return nil, nil, err
×
4028
        }
×
4029

4030
        var encoding *types.EncodingType
2✔
4031
        for _, data := range resp.HistoryEventBlobs {
4✔
4032
                switch data.Encoding {
2✔
4033
                case common.EncodingTypeJSON:
×
4034
                        encoding = types.EncodingTypeJSON.Ptr()
×
4035
                case common.EncodingTypeThriftRW:
2✔
4036
                        encoding = types.EncodingTypeThriftRW.Ptr()
2✔
4037
                default:
×
4038
                        panic(fmt.Sprintf("Invalid encoding type for raw history, encoding type: %s", data.Encoding))
×
4039
                }
4040
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
4041
                        EncodingType: encoding,
2✔
4042
                        Data:         data.Data,
2✔
4043
                })
2✔
4044
        }
4045

4046
        if len(resp.NextPageToken) == 0 && transientDecision != nil {
4✔
4047
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
2✔
4048
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
4049
                        wh.GetLogger().Error("getHistory error",
×
4050
                                tag.WorkflowDomainID(domainID),
×
4051
                                tag.WorkflowID(execution.GetWorkflowID()),
×
4052
                                tag.WorkflowRunID(execution.GetRunID()),
×
4053
                                tag.Error(err))
×
4054
                }
×
4055
                blob, err := wh.GetPayloadSerializer().SerializeBatchEvents(
2✔
4056
                        []*types.HistoryEvent{transientDecision.ScheduledEvent, transientDecision.StartedEvent}, common.EncodingTypeThriftRW)
2✔
4057
                if err != nil {
2✔
4058
                        return nil, nil, err
×
4059
                }
×
4060
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
4061
                        EncodingType: types.EncodingTypeThriftRW.Ptr(),
2✔
4062
                        Data:         blob.Data,
2✔
4063
                })
2✔
4064
        }
4065

4066
        return rawHistory, resp.NextPageToken, nil
2✔
4067
}
4068

4069
func (wh *WorkflowHandler) getHistory(
4070
        ctx context.Context,
4071
        scope metrics.Scope,
4072
        domainID string,
4073
        domainName string,
4074
        execution types.WorkflowExecution,
4075
        firstEventID, nextEventID int64,
4076
        pageSize int32,
4077
        nextPageToken []byte,
4078
        transientDecision *types.TransientDecisionInfo,
4079
        branchToken []byte,
4080
) (*types.History, []byte, error) {
1,587✔
4081

1,587✔
4082
        var size int
1,587✔
4083

1,587✔
4084
        isFirstPage := len(nextPageToken) == 0
1,587✔
4085
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
1,587✔
4086
        var err error
1,587✔
4087
        historyEvents, size, nextPageToken, err := persistenceutils.ReadFullPageV2Events(ctx, wh.GetHistoryManager(), &persistence.ReadHistoryBranchRequest{
1,587✔
4088
                BranchToken:   branchToken,
1,587✔
4089
                MinEventID:    firstEventID,
1,587✔
4090
                MaxEventID:    nextEventID,
1,587✔
4091
                PageSize:      int(pageSize),
1,587✔
4092
                NextPageToken: nextPageToken,
1,587✔
4093
                ShardID:       common.IntPtr(shardID),
1,587✔
4094
                DomainName:    domainName,
1,587✔
4095
        })
1,587✔
4096

1,587✔
4097
        if err != nil {
1,587✔
4098
                return nil, nil, err
×
4099
        }
×
4100

4101
        scope.RecordTimer(metrics.HistorySize, time.Duration(size))
1,587✔
4102

1,587✔
4103
        isLastPage := len(nextPageToken) == 0
1,587✔
4104
        if err := verifyHistoryIsComplete(
1,587✔
4105
                historyEvents,
1,587✔
4106
                firstEventID,
1,587✔
4107
                nextEventID-1,
1,587✔
4108
                isFirstPage,
1,587✔
4109
                isLastPage,
1,587✔
4110
                int(pageSize)); err != nil {
1,587✔
4111
                scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
4112
                wh.GetLogger().Error("getHistory: incomplete history",
×
4113
                        tag.WorkflowDomainID(domainID),
×
4114
                        tag.WorkflowID(execution.GetWorkflowID()),
×
4115
                        tag.WorkflowRunID(execution.GetRunID()),
×
4116
                        tag.Error(err))
×
4117
                return nil, nil, err
×
4118
        }
×
4119

4120
        if len(nextPageToken) == 0 && transientDecision != nil {
1,756✔
4121
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
169✔
4122
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
4123
                        wh.GetLogger().Error("getHistory error",
×
4124
                                tag.WorkflowDomainID(domainID),
×
4125
                                tag.WorkflowID(execution.GetWorkflowID()),
×
4126
                                tag.WorkflowRunID(execution.GetRunID()),
×
4127
                                tag.Error(err))
×
4128
                }
×
4129
                // Append the transient decision events once we are done enumerating everything from the events table
4130
                historyEvents = append(historyEvents, transientDecision.ScheduledEvent, transientDecision.StartedEvent)
169✔
4131
        }
4132

4133
        executionHistory := &types.History{}
1,587✔
4134
        executionHistory.Events = historyEvents
1,587✔
4135
        return executionHistory, nextPageToken, nil
1,587✔
4136
}
4137

4138
func (wh *WorkflowHandler) validateTransientDecisionEvents(
4139
        expectedNextEventID int64,
4140
        decision *types.TransientDecisionInfo,
4141
) error {
171✔
4142

171✔
4143
        if decision.ScheduledEvent.ID == expectedNextEventID &&
171✔
4144
                decision.StartedEvent.ID == expectedNextEventID+1 {
342✔
4145
                return nil
171✔
4146
        }
171✔
4147

4148
        return fmt.Errorf(
×
4149
                "invalid transient decision: "+
×
4150
                        "expectedScheduledEventID=%v expectedStartedEventID=%v but have scheduledEventID=%v startedEventID=%v",
×
4151
                expectedNextEventID,
×
4152
                expectedNextEventID+1,
×
4153
                decision.ScheduledEvent.ID,
×
4154
                decision.StartedEvent.ID)
×
4155
}
4156

4157
// startRequestProfile initiates recording of request metrics
4158
func (wh *WorkflowHandler) startRequestProfile(ctx context.Context, scope int) (metrics.Scope, metrics.Stopwatch) {
186✔
4159
        metricsScope := wh.GetMetricsClient().Scope(scope).Tagged(metrics.DomainUnknownTag()).Tagged(metrics.GetContextTags(ctx)...)
186✔
4160
        // timer should be emitted with the all tag
186✔
4161
        sw := metricsScope.StartTimer(metrics.CadenceLatency)
186✔
4162
        metricsScope.IncCounter(metrics.CadenceRequests)
186✔
4163
        return metricsScope, sw
186✔
4164
}
186✔
4165

4166
// startRequestProfileWithDomain initiates recording of request metrics and returns a domain tagged scope
4167
func (wh *WorkflowHandler) startRequestProfileWithDomain(ctx context.Context, scope int, d domainGetter) (metrics.Scope, metrics.Stopwatch) {
6,349✔
4168
        metricsScope := getMetricsScopeWithDomain(scope, d, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
6,349✔
4169
        sw := metricsScope.StartTimer(metrics.CadenceLatency)
6,349✔
4170
        metricsScope.IncCounter(metrics.CadenceRequests)
6,349✔
4171
        return metricsScope, sw
6,349✔
4172
}
6,349✔
4173

4174
// getDefaultScope returns a default scope to use for request metrics
4175
func (wh *WorkflowHandler) getDefaultScope(ctx context.Context, scope int) metrics.Scope {
1,759✔
4176
        return wh.GetMetricsClient().Scope(scope).Tagged(metrics.DomainUnknownTag()).Tagged(metrics.GetContextTags(ctx)...)
1,759✔
4177
}
1,759✔
4178

4179
func frontendInternalServiceError(fmtStr string, args ...interface{}) error {
9✔
4180
        // NOTE: For internal error, we can't return thrift error from cadence-frontend.
9✔
4181
        // Because in uber internal metrics, thrift errors are counted as user errors.
9✔
4182
        return fmt.Errorf(fmtStr, args...)
9✔
4183
}
9✔
4184

4185
func (wh *WorkflowHandler) error(err error, scope metrics.Scope, tagsForErrorLog ...tag.Tag) error {
748✔
4186
        switch err := err.(type) {
748✔
4187
        case *types.InternalServiceError:
3✔
4188
                wh.GetLogger().WithTags(tagsForErrorLog...).Error("Internal service error", tag.Error(err))
3✔
4189
                scope.IncCounter(metrics.CadenceFailures)
3✔
4190
                return frontendInternalServiceError("cadence internal error, msg: %v", err.Message)
3✔
4191
        case *types.BadRequestError:
33✔
4192
                scope.IncCounter(metrics.CadenceErrBadRequestCounter)
33✔
4193
                return err
33✔
4194
        case *types.DomainNotActiveError:
×
4195
                scope.IncCounter(metrics.CadenceErrBadRequestCounter)
×
4196
                return err
×
4197
        case *types.ServiceBusyError:
×
4198
                scope.IncCounter(metrics.CadenceErrServiceBusyCounter)
×
4199
                return err
×
4200
        case *types.EntityNotExistsError:
649✔
4201
                scope.IncCounter(metrics.CadenceErrEntityNotExistsCounter)
649✔
4202
                return err
649✔
4203
        case *types.WorkflowExecutionAlreadyCompletedError:
19✔
4204
                scope.IncCounter(metrics.CadenceErrWorkflowExecutionAlreadyCompletedCounter)
19✔
4205
                return err
19✔
4206
        case *types.WorkflowExecutionAlreadyStartedError:
24✔
4207
                scope.IncCounter(metrics.CadenceErrExecutionAlreadyStartedCounter)
24✔
4208
                return err
24✔
4209
        case *types.DomainAlreadyExistsError:
×
4210
                scope.IncCounter(metrics.CadenceErrDomainAlreadyExistsCounter)
×
4211
                return err
×
4212
        case *types.CancellationAlreadyRequestedError:
3✔
4213
                scope.IncCounter(metrics.CadenceErrCancellationAlreadyRequestedCounter)
3✔
4214
                return err
3✔
4215
        case *types.QueryFailedError:
9✔
4216
                scope.IncCounter(metrics.CadenceErrQueryFailedCounter)
9✔
4217
                return err
9✔
4218
        case *types.LimitExceededError:
×
4219
                scope.IncCounter(metrics.CadenceErrLimitExceededCounter)
×
4220
                return err
×
4221
        case *types.ClientVersionNotSupportedError:
×
4222
                scope.IncCounter(metrics.CadenceErrClientVersionNotSupportedCounter)
×
4223
                return err
×
4224
        case *yarpcerrors.Status:
3✔
4225
                if err.Code() == yarpcerrors.CodeDeadlineExceeded {
5✔
4226
                        wh.GetLogger().WithTags(tagsForErrorLog...).Error("Frontend request timedout", tag.Error(err))
2✔
4227
                        scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
2✔
4228
                        return err
2✔
4229
                }
2✔
4230
        }
4231
        if errors.Is(err, context.DeadlineExceeded) {
6✔
4232
                wh.GetLogger().WithTags(tagsForErrorLog...).Error("Frontend request timedout", tag.Error(err))
×
4233
                scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
×
4234
                return err
×
4235
        }
×
4236
        wh.GetLogger().WithTags(tagsForErrorLog...).Error("Uncategorized error", tag.Error(err))
6✔
4237
        scope.IncCounter(metrics.CadenceFailures)
6✔
4238
        return frontendInternalServiceError("cadence internal uncategorized error, msg: %v", err.Error())
6✔
4239
}
4240

4241
func (wh *WorkflowHandler) validateTaskList(t *types.TaskList, scope metrics.Scope, domain string) error {
2,786✔
4242
        if t == nil || t.GetName() == "" {
2,787✔
4243
                return errTaskListNotSet
1✔
4244
        }
1✔
4245

4246
        if !common.ValidIDLength(
2,785✔
4247
                t.GetName(),
2,785✔
4248
                scope,
2,785✔
4249
                wh.config.MaxIDLengthWarnLimit(),
2,785✔
4250
                wh.config.TaskListNameMaxLength(domain),
2,785✔
4251
                metrics.CadenceErrTaskListNameExceededWarnLimit,
2,785✔
4252
                domain,
2,785✔
4253
                wh.GetLogger(),
2,785✔
4254
                tag.IDTypeTaskListName) {
2,785✔
4255
                return errTaskListTooLong
×
4256
        }
×
4257
        return nil
2,785✔
4258
}
4259

4260
func validateExecution(w *types.WorkflowExecution) error {
1,390✔
4261
        if w == nil {
1,390✔
4262
                return errExecutionNotSet
×
4263
        }
×
4264
        if w.GetWorkflowID() == "" {
1,390✔
4265
                return errWorkflowIDNotSet
×
4266
        }
×
4267
        if w.GetRunID() != "" && uuid.Parse(w.GetRunID()) == nil {
1,390✔
4268
                return errInvalidRunID
×
4269
        }
×
4270
        return nil
1,390✔
4271
}
4272

4273
func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
4274
        ctx context.Context,
4275
        scope metrics.Scope,
4276
        domainID string,
4277
        matchingResp *types.MatchingPollForDecisionTaskResponse,
4278
        branchToken []byte,
4279
) (*types.PollForDecisionTaskResponse, error) {
1,224✔
4280

1,224✔
4281
        if matchingResp.WorkflowExecution == nil {
1,283✔
4282
                // this will happen if there is no decision task to be send to worker / caller
59✔
4283
                return &types.PollForDecisionTaskResponse{}, nil
59✔
4284
        }
59✔
4285

4286
        var history *types.History
1,165✔
4287
        var continuation []byte
1,165✔
4288
        var err error
1,165✔
4289

1,165✔
4290
        if matchingResp.GetStickyExecutionEnabled() && matchingResp.Query != nil {
1,171✔
4291
                // meaning sticky query, we should not return any events to worker
6✔
4292
                // since query task only check the current status
6✔
4293
                history = &types.History{
6✔
4294
                        Events: []*types.HistoryEvent{},
6✔
4295
                }
6✔
4296
        } else {
1,165✔
4297
                // here we have 3 cases:
1,159✔
4298
                // 1. sticky && non query task
1,159✔
4299
                // 2. non sticky &&  non query task
1,159✔
4300
                // 3. non sticky && query task
1,159✔
4301
                // for 1, partial history have to be send back
1,159✔
4302
                // for 2 and 3, full history have to be send back
1,159✔
4303

1,159✔
4304
                var persistenceToken []byte
1,159✔
4305

1,159✔
4306
                firstEventID := common.FirstEventID
1,159✔
4307
                nextEventID := matchingResp.GetNextEventID()
1,159✔
4308
                if matchingResp.GetStickyExecutionEnabled() {
1,255✔
4309
                        firstEventID = matchingResp.GetPreviousStartedEventID() + 1
96✔
4310
                }
96✔
4311
                domainName, dErr := wh.GetDomainCache().GetDomainName(domainID)
1,159✔
4312
                if dErr != nil {
1,159✔
4313
                        return nil, dErr
×
4314
                }
×
4315
                scope = scope.Tagged(metrics.DomainTag(domainName))
1,159✔
4316
                history, persistenceToken, err = wh.getHistory(
1,159✔
4317
                        ctx,
1,159✔
4318
                        scope,
1,159✔
4319
                        domainID,
1,159✔
4320
                        domainName,
1,159✔
4321
                        *matchingResp.WorkflowExecution,
1,159✔
4322
                        firstEventID,
1,159✔
4323
                        nextEventID,
1,159✔
4324
                        int32(wh.config.HistoryMaxPageSize(domainName)),
1,159✔
4325
                        nil,
1,159✔
4326
                        matchingResp.DecisionInfo,
1,159✔
4327
                        branchToken,
1,159✔
4328
                )
1,159✔
4329
                if err != nil {
1,159✔
4330
                        return nil, err
×
4331
                }
×
4332

4333
                if len(persistenceToken) != 0 {
1,159✔
4334
                        continuation, err = serializeHistoryToken(&getHistoryContinuationToken{
×
4335
                                RunID:             matchingResp.WorkflowExecution.GetRunID(),
×
4336
                                FirstEventID:      firstEventID,
×
4337
                                NextEventID:       nextEventID,
×
4338
                                PersistenceToken:  persistenceToken,
×
4339
                                TransientDecision: matchingResp.DecisionInfo,
×
4340
                                BranchToken:       branchToken,
×
4341
                        })
×
4342
                        if err != nil {
×
4343
                                return nil, err
×
4344
                        }
×
4345
                }
4346
        }
4347

4348
        resp := &types.PollForDecisionTaskResponse{
1,165✔
4349
                TaskToken:                 matchingResp.TaskToken,
1,165✔
4350
                WorkflowExecution:         matchingResp.WorkflowExecution,
1,165✔
4351
                WorkflowType:              matchingResp.WorkflowType,
1,165✔
4352
                PreviousStartedEventID:    matchingResp.PreviousStartedEventID,
1,165✔
4353
                StartedEventID:            matchingResp.StartedEventID, // this field is not set for query tasks as there's no decision task started event
1,165✔
4354
                Query:                     matchingResp.Query,
1,165✔
4355
                BacklogCountHint:          matchingResp.BacklogCountHint,
1,165✔
4356
                Attempt:                   matchingResp.Attempt,
1,165✔
4357
                History:                   history,
1,165✔
4358
                NextPageToken:             continuation,
1,165✔
4359
                WorkflowExecutionTaskList: matchingResp.WorkflowExecutionTaskList,
1,165✔
4360
                ScheduledTimestamp:        matchingResp.ScheduledTimestamp,
1,165✔
4361
                StartedTimestamp:          matchingResp.StartedTimestamp,
1,165✔
4362
                Queries:                   matchingResp.Queries,
1,165✔
4363
                NextEventID:               matchingResp.NextEventID,
1,165✔
4364
        }
1,165✔
4365

1,165✔
4366
        return resp, nil
1,165✔
4367
}
4368

4369
func verifyHistoryIsComplete(
4370
        events []*types.HistoryEvent,
4371
        expectedFirstEventID int64,
4372
        expectedLastEventID int64,
4373
        isFirstPage bool,
4374
        isLastPage bool,
4375
        pageSize int,
4376
) error {
1,606✔
4377

1,606✔
4378
        nEvents := len(events)
1,606✔
4379
        if nEvents == 0 {
1,618✔
4380
                if isLastPage {
24✔
4381
                        // we seem to be returning a non-nil pageToken on the lastPage which
12✔
4382
                        // in turn cases the client to call getHistory again - only to find
12✔
4383
                        // there are no more events to consume - bail out if this is the case here
12✔
4384
                        return nil
12✔
4385
                }
12✔
4386
                return fmt.Errorf("invalid history: contains zero events")
×
4387
        }
4388

4389
        firstEventID := events[0].ID
1,594✔
4390
        lastEventID := events[nEvents-1].ID
1,594✔
4391

1,594✔
4392
        if !isFirstPage { // atleast one page of history has been read previously
1,630✔
4393
                if firstEventID <= expectedFirstEventID {
36✔
4394
                        // not first page and no events have been read in the previous pages - not possible
×
4395
                        return &types.InternalServiceError{
×
4396
                                Message: fmt.Sprintf(
×
4397
                                        "invalid history: expected first eventID to be > %v but got %v", expectedFirstEventID, firstEventID),
×
4398
                        }
×
4399
                }
×
4400
                expectedFirstEventID = firstEventID
36✔
4401
        }
4402

4403
        if !isLastPage {
1,643✔
4404
                // estimate lastEventID based on pageSize. This is a lower bound
49✔
4405
                // since the persistence layer counts "batch of events" as a single page
49✔
4406
                expectedLastEventID = expectedFirstEventID + int64(pageSize) - 1
49✔
4407
        }
49✔
4408

4409
        nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1
1,594✔
4410

1,594✔
4411
        if firstEventID == expectedFirstEventID &&
1,594✔
4412
                ((isLastPage && lastEventID == expectedLastEventID && int64(nEvents) == nExpectedEvents) ||
1,594✔
4413
                        (!isLastPage && lastEventID >= expectedLastEventID && int64(nEvents) >= nExpectedEvents)) {
3,176✔
4414
                return nil
1,582✔
4415
        }
1,582✔
4416

4417
        return &types.InternalServiceError{
12✔
4418
                Message: fmt.Sprintf(
12✔
4419
                        "incomplete history: "+
12✔
4420
                                "expected events [%v-%v] but got events [%v-%v] of length %v:"+
12✔
4421
                                "isFirstPage=%v,isLastPage=%v,pageSize=%v",
12✔
4422
                        expectedFirstEventID,
12✔
4423
                        expectedLastEventID,
12✔
4424
                        firstEventID,
12✔
4425
                        lastEventID,
12✔
4426
                        nEvents,
12✔
4427
                        isFirstPage,
12✔
4428
                        isLastPage,
12✔
4429
                        pageSize),
12✔
4430
        }
12✔
4431
}
4432

4433
func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) {
44✔
4434
        token := &getHistoryContinuationToken{}
44✔
4435
        err := json.Unmarshal(bytes, token)
44✔
4436
        return token, err
44✔
4437
}
44✔
4438

4439
func serializeHistoryToken(token *getHistoryContinuationToken) ([]byte, error) {
429✔
4440
        if token == nil {
816✔
4441
                return nil, nil
387✔
4442
        }
387✔
4443

4444
        bytes, err := json.Marshal(token)
42✔
4445
        return bytes, err
42✔
4446
}
4447

4448
func createServiceBusyError() *types.ServiceBusyError {
×
4449
        err := &types.ServiceBusyError{}
×
4450
        err.Message = "Too many outstanding requests to the cadence service"
×
4451
        return err
×
4452
}
×
4453

4454
func isFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
4455
        return updateRequest.ActiveClusterName != nil
9✔
4456
}
9✔
4457

4458
func isGraceFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
4459
        return updateRequest.FailoverTimeoutInSeconds != nil
9✔
4460
}
9✔
4461

4462
func (wh *WorkflowHandler) checkOngoingFailover(
4463
        ctx context.Context,
4464
        domainName *string,
4465
) error {
1✔
4466

1✔
4467
        enabledClusters := wh.GetClusterMetadata().GetEnabledClusterInfo()
1✔
4468
        respChan := make(chan *types.DescribeDomainResponse, len(enabledClusters))
1✔
4469

1✔
4470
        g := &errgroup.Group{}
1✔
4471
        for clusterName := range enabledClusters {
3✔
4472
                frontendClient := wh.GetRemoteFrontendClient(clusterName)
2✔
4473
                g.Go(func() (e error) {
4✔
4474
                        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &e) }()
4✔
4475

4476
                        resp, _ := frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{Name: domainName})
2✔
4477
                        respChan <- resp
2✔
4478
                        return nil
2✔
4479
                })
4480
        }
4481
        g.Wait()
1✔
4482
        close(respChan)
1✔
4483

1✔
4484
        var failoverVersion *int64
1✔
4485
        for resp := range respChan {
3✔
4486
                if resp == nil {
2✔
4487
                        return &types.InternalServiceError{
×
4488
                                Message: "Failed to verify failover version from all clusters",
×
4489
                        }
×
4490
                }
×
4491
                if failoverVersion == nil {
3✔
4492
                        failoverVersion = &resp.FailoverVersion
1✔
4493
                }
1✔
4494
                if *failoverVersion != resp.GetFailoverVersion() {
2✔
4495
                        return &types.BadRequestError{
×
4496
                                Message: "Concurrent failover is not allow.",
×
4497
                        }
×
4498
                }
×
4499
        }
4500
        return nil
1✔
4501
}
4502

4503
func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *types.GetWorkflowExecutionHistoryRequest, domainID string) bool {
440✔
4504
        if request.GetExecution() == nil || request.GetExecution().GetRunID() == "" {
475✔
4505
                return false
35✔
4506
        }
35✔
4507
        getMutableStateRequest := &types.GetMutableStateRequest{
405✔
4508
                DomainUUID: domainID,
405✔
4509
                Execution:  request.Execution,
405✔
4510
        }
405✔
4511
        _, err := wh.GetHistoryClient().GetMutableState(ctx, getMutableStateRequest)
405✔
4512
        if err == nil {
784✔
4513
                return false
379✔
4514
        }
379✔
4515
        switch err.(type) {
26✔
4516
        case *types.EntityNotExistsError:
25✔
4517
                // the only case in which history is assumed to be archived is if getting mutable state returns entity not found error
25✔
4518
                return true
25✔
4519
        }
4520
        return false
1✔
4521
}
4522

4523
func (wh *WorkflowHandler) getArchivedHistory(
4524
        ctx context.Context,
4525
        request *types.GetWorkflowExecutionHistoryRequest,
4526
        domainID string,
4527
        scope metrics.Scope,
4528
        tags ...tag.Tag,
4529
) (*types.GetWorkflowExecutionHistoryResponse, error) {
28✔
4530
        entry, err := wh.GetDomainCache().GetDomainByID(domainID)
28✔
4531
        if err != nil {
29✔
4532
                return nil, wh.error(err, scope)
1✔
4533
        }
1✔
4534

4535
        URIString := entry.GetConfig().HistoryArchivalURI
27✔
4536
        if URIString == "" {
28✔
4537
                // if URI is empty, it means the domain has never enabled for archival.
1✔
4538
                // the error is not "workflow has passed retention period", because
1✔
4539
                // we have no way to tell if the requested workflow exists or not.
1✔
4540
                return nil, wh.error(errHistoryNotFound, scope, tags...)
1✔
4541
        }
1✔
4542

4543
        URI, err := archiver.NewURI(URIString)
26✔
4544
        if err != nil {
27✔
4545
                return nil, wh.error(err, scope, tags...)
1✔
4546
        }
1✔
4547

4548
        historyArchiver, err := wh.GetArchiverProvider().GetHistoryArchiver(URI.Scheme(), service.Frontend)
25✔
4549
        if err != nil {
25✔
4550
                return nil, wh.error(err, scope, tags...)
×
4551
        }
×
4552

4553
        resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{
25✔
4554
                DomainID:      domainID,
25✔
4555
                WorkflowID:    request.GetExecution().GetWorkflowID(),
25✔
4556
                RunID:         request.GetExecution().GetRunID(),
25✔
4557
                NextPageToken: request.GetNextPageToken(),
25✔
4558
                PageSize:      int(request.GetMaximumPageSize()),
25✔
4559
        })
25✔
4560
        if err != nil {
28✔
4561
                return nil, wh.error(err, scope, tags...)
3✔
4562
        }
3✔
4563

4564
        history := &types.History{}
22✔
4565
        for _, batch := range resp.HistoryBatches {
279✔
4566
                history.Events = append(history.Events, batch.Events...)
257✔
4567
        }
257✔
4568
        return &types.GetWorkflowExecutionHistoryResponse{
22✔
4569
                History:       history,
22✔
4570
                NextPageToken: resp.NextPageToken,
22✔
4571
                Archived:      true,
22✔
4572
        }, nil
22✔
4573
}
4574

4575
func (wh *WorkflowHandler) convertIndexedKeyToThrift(keys map[string]interface{}) map[string]types.IndexedValueType {
3✔
4576
        converted := make(map[string]types.IndexedValueType)
3✔
4577
        for k, v := range keys {
51✔
4578
                converted[k] = common.ConvertIndexedValueTypeToInternalType(v, wh.GetLogger())
48✔
4579
        }
48✔
4580
        return converted
2✔
4581
}
4582

4583
func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain string) bool {
305✔
4584
        return common.IsAdvancedVisibilityReadingEnabled(wh.config.EnableReadVisibilityFromES(domain), wh.config.IsAdvancedVisConfigExist) &&
305✔
4585
                pageSize > int32(wh.config.ESIndexMaxResultWindow())
305✔
4586
}
305✔
4587

4588
func (wh *WorkflowHandler) allow(requestType ratelimitType, d domainGetter) bool {
6,037✔
4589
        domain := ""
6,037✔
4590
        if d != nil {
12,074✔
4591
                domain = d.GetDomain()
6,037✔
4592
        }
6,037✔
4593
        switch requestType {
6,037✔
4594
        case ratelimitTypeUser:
1,931✔
4595
                return wh.userRateLimiter.Allow(quotas.Info{Domain: domain})
1,931✔
4596
        case ratelimitTypeWorker:
3,819✔
4597
                return wh.workerRateLimiter.Allow(quotas.Info{Domain: domain})
3,819✔
4598
        case ratelimitTypeVisibility:
287✔
4599
                return wh.visibilityRateLimiter.Allow(quotas.Info{Domain: domain})
287✔
4600
        default:
×
4601
                wh.GetLogger().Fatal("coding error, unrecognized request ratelimit type value", tag.Value(requestType))
×
4602
                panic("unreachable")
×
4603
        }
4604
}
4605

4606
// GetClusterInfo return information about cadence deployment
4607
func (wh *WorkflowHandler) GetClusterInfo(
4608
        ctx context.Context,
4609
) (resp *types.ClusterInfo, err error) {
×
4610
        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &err) }()
×
4611

4612
        scope := wh.getDefaultScope(ctx, metrics.FrontendClientGetClusterInfoScope)
×
4613
        if ok := wh.allow(ratelimitTypeUser, nil); !ok {
×
4614
                return nil, wh.error(createServiceBusyError(), scope)
×
4615
        }
×
4616

4617
        return &types.ClusterInfo{
×
4618
                SupportedClientVersions: &types.SupportedClientVersions{
×
4619
                        GoSdk:   client.SupportedGoSDKVersion,
×
4620
                        JavaSdk: client.SupportedJavaSDKVersion,
×
4621
                },
×
4622
        }, nil
×
4623
}
4624

4625
func checkPermission(
4626
        config *Config,
4627
        securityToken string,
4628
) error {
60✔
4629
        if config.EnableAdminProtection() {
62✔
4630
                if securityToken == "" {
2✔
4631
                        return errNoPermission
×
4632
                }
×
4633
                requiredToken := config.AdminOperationToken()
2✔
4634
                if securityToken != requiredToken {
3✔
4635
                        return errNoPermission
1✔
4636
                }
1✔
4637
        }
4638
        return nil
59✔
4639
}
4640

4641
func checkFailOverPermission(config *Config, domainName string) error {
2✔
4642
        if config.Lockdown(domainName) {
3✔
4643
                return errDomainInLockdown
1✔
4644
        }
1✔
4645
        return nil
1✔
4646
}
4647

4648
type domainWrapper struct {
4649
        domain string
4650
}
4651

4652
func (d domainWrapper) GetDomain() string {
3,518✔
4653
        return d.domain
3,518✔
4654
}
3,518✔
4655

4656
func (hs HealthStatus) String() string {
×
4657
        switch hs {
×
4658
        case HealthStatusOK:
×
4659
                return "OK"
×
4660
        case HealthStatusWarmingUp:
×
4661
                return "WarmingUp"
×
4662
        case HealthStatusShuttingDown:
×
4663
                return "ShuttingDown"
×
4664
        default:
×
4665
                return "unknown"
×
4666
        }
4667
}
4668

4669
func getDomainWfIDRunIDTags(
4670
        domainName string,
4671
        wf *types.WorkflowExecution,
4672
) []tag.Tag {
5,965✔
4673
        tags := []tag.Tag{tag.WorkflowDomainName(domainName)}
5,965✔
4674
        if wf == nil {
8,255✔
4675
                return tags
2,290✔
4676
        }
2,290✔
4677
        return append(
3,675✔
4678
                tags,
3,675✔
4679
                tag.WorkflowID(wf.GetWorkflowID()),
3,675✔
4680
                tag.WorkflowRunID(wf.GetRunID()),
3,675✔
4681
        )
3,675✔
4682
}
4683

4684
func checkRequiredDomainDataKVs(requiredDomainDataKeys map[string]interface{}, domainData map[string]string) error {
45✔
4685
        // check requiredDomainDataKeys
45✔
4686
        for k := range requiredDomainDataKeys {
46✔
4687
                _, ok := domainData[k]
1✔
4688
                if !ok {
2✔
4689
                        return fmt.Errorf("domain data error, missing required key %v . All required keys: %v", k, requiredDomainDataKeys)
1✔
4690
                }
1✔
4691
        }
4692
        return nil
44✔
4693
}
4694

4695
// Some error types are introduced later that some clients might not support
4696
// To make them backward compatible, we continue returning the legacy error types
4697
// for older clients
4698
func (wh *WorkflowHandler) normalizeVersionedErrors(ctx context.Context, err error) error {
66✔
4699
        switch err.(type) {
66✔
4700
        case *types.WorkflowExecutionAlreadyCompletedError:
19✔
4701
                call := yarpc.CallFromContext(ctx)
19✔
4702
                clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
19✔
4703
                clientImpl := call.Header(common.ClientImplHeaderName)
19✔
4704
                featureFlags := client.GetFeatureFlagsFromHeader(call)
19✔
4705

19✔
4706
                vErr := wh.versionChecker.SupportsWorkflowAlreadyCompletedError(clientImpl, clientFeatureVersion, featureFlags)
19✔
4707
                if vErr == nil {
22✔
4708
                        return err
3✔
4709
                }
3✔
4710
                return &types.EntityNotExistsError{Message: "Workflow execution already completed."}
16✔
4711
        default:
47✔
4712
                return err
47✔
4713
        }
4714
}
4715
func constructRestartWorkflowRequest(w *types.WorkflowExecutionStartedEventAttributes, domain string, identity string, workflowID string) *types.StartWorkflowExecutionRequest {
1✔
4716

1✔
4717
        startRequest := &types.StartWorkflowExecutionRequest{
1✔
4718
                RequestID:  uuid.New(),
1✔
4719
                Domain:     domain,
1✔
4720
                WorkflowID: workflowID,
1✔
4721
                WorkflowType: &types.WorkflowType{
1✔
4722
                        Name: w.WorkflowType.Name,
1✔
4723
                },
1✔
4724
                TaskList: &types.TaskList{
1✔
4725
                        Name: w.TaskList.Name,
1✔
4726
                },
1✔
4727
                Input:                               w.Input,
1✔
4728
                ExecutionStartToCloseTimeoutSeconds: w.ExecutionStartToCloseTimeoutSeconds,
1✔
4729
                TaskStartToCloseTimeoutSeconds:      w.TaskStartToCloseTimeoutSeconds,
1✔
4730
                Identity:                            identity,
1✔
4731
                WorkflowIDReusePolicy:               types.WorkflowIDReusePolicyTerminateIfRunning.Ptr(),
1✔
4732
        }
1✔
4733
        startRequest.CronSchedule = w.CronSchedule
1✔
4734
        startRequest.RetryPolicy = w.RetryPolicy
1✔
4735
        startRequest.DelayStartSeconds = w.FirstDecisionTaskBackoffSeconds
1✔
4736
        startRequest.Header = w.Header
1✔
4737
        startRequest.Memo = w.Memo
1✔
4738
        startRequest.SearchAttributes = w.SearchAttributes
1✔
4739

1✔
4740
        return startRequest
1✔
4741
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc