• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018fefa1-26fc-4131-8c40-f79719037bde

06 Jun 2024 10:17PM UTC coverage: 71.366% (-0.04%) from 71.402%
018fefa1-26fc-4131-8c40-f79719037bde

push

buildkite

web-flow
Change permission of DescribeCluster to Read (#6081)

* feat: change permission or DescribeCluster to Read

* feat: add test for admin.DescribeCluster

* Lint test file access_controlled_test.go

Signed-off-by: jiaxuyang <xuyang.jia@gmail.com>

---------

Signed-off-by: jiaxuyang <xuyang.jia@gmail.com>

106401 of 149093 relevant lines covered (71.37%)

2578.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.61
/service/frontend/api/handler.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package api
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "fmt"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/google/uuid"
31
        "go.uber.org/yarpc"
32
        "golang.org/x/sync/errgroup"
33

34
        "github.com/uber/cadence/.gen/go/shared"
35
        "github.com/uber/cadence/.gen/go/sqlblobs"
36
        "github.com/uber/cadence/common"
37
        "github.com/uber/cadence/common/archiver"
38
        "github.com/uber/cadence/common/backoff"
39
        "github.com/uber/cadence/common/cache"
40
        "github.com/uber/cadence/common/client"
41
        "github.com/uber/cadence/common/codec"
42
        "github.com/uber/cadence/common/domain"
43
        "github.com/uber/cadence/common/elasticsearch/validator"
44
        "github.com/uber/cadence/common/log"
45
        "github.com/uber/cadence/common/log/tag"
46
        "github.com/uber/cadence/common/metrics"
47
        "github.com/uber/cadence/common/partition"
48
        "github.com/uber/cadence/common/persistence"
49
        persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
50
        "github.com/uber/cadence/common/resource"
51
        "github.com/uber/cadence/common/service"
52
        "github.com/uber/cadence/common/types"
53
        "github.com/uber/cadence/common/types/mapper/thrift"
54
        "github.com/uber/cadence/service/frontend/config"
55
        "github.com/uber/cadence/service/frontend/validate"
56
)
57

58
const (
59
        // HealthStatusOK is used when this node is healthy and rpc requests are allowed
60
        HealthStatusOK HealthStatus = iota + 1
61
        // HealthStatusWarmingUp is used when the rpc handler is warming up
62
        HealthStatusWarmingUp
63
        // HealthStatusShuttingDown is used when the rpc handler is shutting down
64
        HealthStatusShuttingDown
65
)
66

67
var _ Handler = (*WorkflowHandler)(nil)
68

69
type (
70
        // WorkflowHandler - Thrift handler interface for workflow service
71
        WorkflowHandler struct {
72
                resource.Resource
73

74
                shuttingDown              int32
75
                healthStatus              int32
76
                tokenSerializer           common.TaskTokenSerializer
77
                config                    *config.Config
78
                versionChecker            client.VersionChecker
79
                domainHandler             domain.Handler
80
                visibilityQueryValidator  *validator.VisibilityQueryValidator
81
                searchAttributesValidator *validator.SearchAttributesValidator
82
                throttleRetry             *backoff.ThrottleRetry
83
                producerManager           ProducerManager
84
                thriftrwEncoder           codec.BinaryEncoder
85
        }
86

87
        getHistoryContinuationToken struct {
88
                RunID             string
89
                FirstEventID      int64
90
                NextEventID       int64
91
                IsWorkflowRunning bool
92
                PersistenceToken  []byte
93
                TransientDecision *types.TransientDecisionInfo
94
                BranchToken       []byte
95
        }
96

97
        domainGetter interface {
98
                GetDomain() string
99
        }
100

101
        // HealthStatus is an enum that refers to the rpc handler health status
102
        HealthStatus int32
103
)
104

105
var (
106
        frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
107
)
108

109
// NewWorkflowHandler creates a thrift handler for the cadence service
110
func NewWorkflowHandler(
111
        resource resource.Resource,
112
        config *config.Config,
113
        versionChecker client.VersionChecker,
114
        domainHandler domain.Handler,
115
) *WorkflowHandler {
133✔
116
        return &WorkflowHandler{
133✔
117
                Resource:        resource,
133✔
118
                config:          config,
133✔
119
                healthStatus:    int32(HealthStatusWarmingUp),
133✔
120
                tokenSerializer: common.NewJSONTaskTokenSerializer(),
133✔
121
                versionChecker:  versionChecker,
133✔
122
                domainHandler:   domainHandler,
133✔
123
                visibilityQueryValidator: validator.NewQueryValidator(
133✔
124
                        config.ValidSearchAttributes,
133✔
125
                        config.EnableQueryAttributeValidation,
133✔
126
                ),
133✔
127
                searchAttributesValidator: validator.NewSearchAttributesValidator(
133✔
128
                        resource.GetLogger(),
133✔
129
                        config.EnableQueryAttributeValidation,
133✔
130
                        config.ValidSearchAttributes,
133✔
131
                        config.SearchAttributesNumberOfKeysLimit,
133✔
132
                        config.SearchAttributesSizeOfValueLimit,
133✔
133
                        config.SearchAttributesTotalSizeLimit,
133✔
134
                ),
133✔
135
                throttleRetry: backoff.NewThrottleRetry(
133✔
136
                        backoff.WithRetryPolicy(frontendServiceRetryPolicy),
133✔
137
                        backoff.WithRetryableError(common.IsServiceTransientError),
133✔
138
                ),
133✔
139
                producerManager: NewProducerManager(
133✔
140
                        resource.GetDomainCache(),
133✔
141
                        resource.GetAsyncWorkflowQueueProvider(),
133✔
142
                        resource.GetLogger(),
133✔
143
                        resource.GetMetricsClient(),
133✔
144
                ),
133✔
145
                thriftrwEncoder: codec.NewThriftRWEncoder(),
133✔
146
        }
133✔
147
}
133✔
148

149
// Start starts the handler
150
func (wh *WorkflowHandler) Start() {
21✔
151
        // TODO: Get warmup duration from config. Even better, run proactive checks such as probing downstream connections.
21✔
152
        const warmUpDuration = 30 * time.Second
21✔
153

21✔
154
        warmupTimer := time.NewTimer(warmUpDuration)
21✔
155
        go func() {
42✔
156
                <-warmupTimer.C
21✔
157
                wh.GetLogger().Warn("Service warmup duration has elapsed.")
21✔
158
                if atomic.CompareAndSwapInt32(&wh.healthStatus, int32(HealthStatusWarmingUp), int32(HealthStatusOK)) {
33✔
159
                        wh.GetLogger().Warn("Warmup time has elapsed. Service is healthy.")
12✔
160
                } else {
13✔
161
                        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
1✔
162
                        wh.GetLogger().Warn(fmt.Sprintf("Warmup time has elapsed. Service status is: %v", status.String()))
1✔
163
                }
1✔
164
        }()
165
}
166

167
// Stop stops the handler
168
func (wh *WorkflowHandler) Stop() {
21✔
169
        atomic.StoreInt32(&wh.shuttingDown, 1)
21✔
170
}
21✔
171

172
// UpdateHealthStatus sets the health status for this rpc handler.
173
// This health status will be used within the rpc health check handler
174
func (wh *WorkflowHandler) UpdateHealthStatus(status HealthStatus) {
22✔
175
        atomic.StoreInt32(&wh.healthStatus, int32(status))
22✔
176
}
22✔
177

178
func (wh *WorkflowHandler) isShuttingDown() bool {
6,676✔
179
        return atomic.LoadInt32(&wh.shuttingDown) != 0
6,676✔
180
}
6,676✔
181

182
// Health is for health check
183
func (wh *WorkflowHandler) Health(ctx context.Context) (*types.HealthStatus, error) {
2✔
184
        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
2✔
185
        msg := status.String()
2✔
186

2✔
187
        if status != HealthStatusOK {
3✔
188
                wh.GetLogger().Warn(fmt.Sprintf("Service status is: %v", msg))
1✔
189
        }
1✔
190

191
        return &types.HealthStatus{
2✔
192
                Ok:  status == HealthStatusOK,
2✔
193
                Msg: msg,
2✔
194
        }, nil
2✔
195
}
196

197
// RegisterDomain creates a new domain which can be used as a container for all resources.  Domain is a top level
198
// entity within Cadence, used as a container for all resources like workflow executions, tasklists, etc.  Domain
199
// acts as a sandbox and provides isolation for all resources within the domain.  All resources belongs to exactly one
200
// domain.
201
func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *types.RegisterDomainRequest) (retError error) {
51✔
202
        if wh.isShuttingDown() {
51✔
203
                return validate.ErrShuttingDown
×
204
        }
×
205

206
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
51✔
207
                return err
×
208
        }
×
209

210
        if registerRequest == nil {
51✔
211
                return validate.ErrRequestNotSet
×
212
        }
×
213

214
        if registerRequest.GetWorkflowExecutionRetentionPeriodInDays() > int32(wh.config.DomainConfig.MaxRetentionDays()) {
51✔
215
                return validate.ErrInvalidRetention
×
216
        }
×
217

218
        if err := validate.CheckPermission(wh.config, registerRequest.SecurityToken); err != nil {
51✔
219
                return err
×
220
        }
×
221

222
        if err := checkRequiredDomainDataKVs(wh.config.DomainConfig.RequiredDomainDataKeys(), registerRequest.GetData()); err != nil {
52✔
223
                return err
1✔
224
        }
1✔
225

226
        if registerRequest.GetName() == "" {
50✔
227
                return validate.ErrDomainNotSet
×
228
        }
×
229

230
        return wh.domainHandler.RegisterDomain(ctx, registerRequest)
50✔
231
}
232

233
// ListDomains returns the information and configuration for a registered domain.
234
func (wh *WorkflowHandler) ListDomains(
235
        ctx context.Context,
236
        listRequest *types.ListDomainsRequest,
237
) (response *types.ListDomainsResponse, retError error) {
2✔
238
        if wh.isShuttingDown() {
2✔
239
                return nil, validate.ErrShuttingDown
×
240
        }
×
241

242
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
243
                return nil, err
×
244
        }
×
245

246
        if listRequest == nil {
3✔
247
                return nil, validate.ErrRequestNotSet
1✔
248
        }
1✔
249

250
        return wh.domainHandler.ListDomains(ctx, listRequest)
1✔
251
}
252

253
// DescribeDomain returns the information and configuration for a registered domain.
254
func (wh *WorkflowHandler) DescribeDomain(
255
        ctx context.Context,
256
        describeRequest *types.DescribeDomainRequest,
257
) (response *types.DescribeDomainResponse, retError error) {
134✔
258
        if wh.isShuttingDown() {
134✔
259
                return nil, validate.ErrShuttingDown
×
260
        }
×
261

262
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
134✔
263
                return nil, err
×
264
        }
×
265

266
        if describeRequest == nil {
134✔
267
                return nil, validate.ErrRequestNotSet
×
268
        }
×
269

270
        if describeRequest.GetName() == "" && describeRequest.GetUUID() == "" {
134✔
271
                return nil, validate.ErrDomainNotSet
×
272
        }
×
273

274
        resp, err := wh.domainHandler.DescribeDomain(ctx, describeRequest)
134✔
275
        if err != nil {
134✔
276
                return nil, err
×
277
        }
×
278

279
        if resp.GetFailoverInfo() != nil && resp.GetFailoverInfo().GetFailoverExpireTimestamp() > 0 {
134✔
280
                // fetch ongoing failover info from history service
×
281
                failoverResp, err := wh.GetHistoryClient().GetFailoverInfo(ctx, &types.GetFailoverInfoRequest{
×
282
                        DomainID: resp.GetDomainInfo().UUID,
×
283
                })
×
284
                if err != nil {
×
285
                        // despite the error from history, return describe domain response
×
286
                        wh.GetLogger().Error(
×
287
                                fmt.Sprintf("Failed to get failover info for domain %s", resp.DomainInfo.GetName()),
×
288
                                tag.Error(err),
×
289
                        )
×
290
                        return resp, nil
×
291
                }
×
292
                resp.FailoverInfo.CompletedShardCount = failoverResp.GetCompletedShardCount()
×
293
                resp.FailoverInfo.PendingShards = failoverResp.GetPendingShards()
×
294
        }
295
        return resp, nil
134✔
296
}
297

298
// UpdateDomain is used to update the information and configuration for a registered domain.
299
func (wh *WorkflowHandler) UpdateDomain(
300
        ctx context.Context,
301
        updateRequest *types.UpdateDomainRequest,
302
) (resp *types.UpdateDomainResponse, retError error) {
9✔
303
        domainName := ""
9✔
304
        if updateRequest != nil {
18✔
305
                domainName = updateRequest.GetName()
9✔
306
        }
9✔
307

308
        logger := wh.GetLogger().WithTags(
9✔
309
                tag.WorkflowDomainName(domainName),
9✔
310
                tag.OperationName("DomainUpdate"))
9✔
311

9✔
312
        if updateRequest == nil {
9✔
313
                logger.Error("Nil domain update request.",
×
314
                        tag.Error(validate.ErrRequestNotSet))
×
315
                return nil, validate.ErrRequestNotSet
×
316
        }
×
317

318
        isFailover := isFailoverRequest(updateRequest)
9✔
319
        isGraceFailover := isGraceFailoverRequest(updateRequest)
9✔
320
        logger.Info(fmt.Sprintf(
9✔
321
                "Domain Update requested. isFailover: %v, isGraceFailover: %v, Request: %#v.",
9✔
322
                isFailover,
9✔
323
                isGraceFailover,
9✔
324
                updateRequest))
9✔
325

9✔
326
        if wh.isShuttingDown() {
9✔
327
                logger.Error("Won't apply the domain update since workflowHandler is shutting down.",
×
328
                        tag.Error(validate.ErrShuttingDown))
×
329
                return nil, validate.ErrShuttingDown
×
330
        }
×
331

332
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
9✔
333
                logger.Error("Won't apply the domain update since client version is not supported.",
×
334
                        tag.Error(err))
×
335
                return nil, err
×
336
        }
×
337

338
        // don't require permission for failover request
339
        if isFailover {
11✔
340
                // reject the failover if the cluster is in lockdown
2✔
341
                if err := checkFailOverPermission(wh.config, updateRequest.GetName()); err != nil {
3✔
342
                        logger.Error("Domain failover request rejected since domain is in lockdown.",
1✔
343
                                tag.Error(err))
1✔
344
                        return nil, err
1✔
345
                }
1✔
346
        } else {
7✔
347
                if err := validate.CheckPermission(wh.config, updateRequest.SecurityToken); err != nil {
7✔
348
                        logger.Error("Domain update request rejected due to failing permissions.",
×
349
                                tag.Error(err))
×
350
                        return nil, err
×
351
                }
×
352
        }
353

354
        if isGraceFailover {
9✔
355
                if err := wh.checkOngoingFailover(
1✔
356
                        ctx,
1✔
357
                        &updateRequest.Name,
1✔
358
                ); err != nil {
1✔
359
                        logger.Error("Graceful domain failover request failed. Not able to check ongoing failovers.",
×
360
                                tag.Error(err))
×
361
                        return nil, err
×
362
                }
×
363
        }
364

365
        if updateRequest.GetName() == "" {
8✔
366
                logger.Error("Domain not set on request.",
×
367
                        tag.Error(validate.ErrDomainNotSet))
×
368
                return nil, validate.ErrDomainNotSet
×
369
        }
×
370
        // TODO: call remote clusters to verify domain data
371
        resp, err := wh.domainHandler.UpdateDomain(ctx, updateRequest)
8✔
372
        if err != nil {
10✔
373
                logger.Error("Domain update operation failed.",
2✔
374
                        tag.Error(err))
2✔
375
                return nil, err
2✔
376
        }
2✔
377
        logger.Info("Domain update operation succeeded.")
6✔
378
        return resp, nil
6✔
379
}
380

381
// DeprecateDomain us used to update status of a registered domain to DEPRECATED. Once the domain is deprecated
382
// it cannot be used to start new workflow executions.  Existing workflow executions will continue to run on
383
// deprecated domains.
384
func (wh *WorkflowHandler) DeprecateDomain(ctx context.Context, deprecateRequest *types.DeprecateDomainRequest) (retError error) {
×
385
        if wh.isShuttingDown() {
×
386
                return validate.ErrShuttingDown
×
387
        }
×
388

389
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
390
                return err
×
391
        }
×
392

393
        if deprecateRequest == nil {
×
394
                return validate.ErrRequestNotSet
×
395
        }
×
396

397
        if err := validate.CheckPermission(wh.config, deprecateRequest.SecurityToken); err != nil {
×
398
                return err
×
399
        }
×
400

401
        if deprecateRequest.GetName() == "" {
×
402
                return validate.ErrDomainNotSet
×
403
        }
×
404

405
        return wh.domainHandler.DeprecateDomain(ctx, deprecateRequest)
×
406
}
407

408
// PollForActivityTask - Poll for an activity task.
409
func (wh *WorkflowHandler) PollForActivityTask(
410
        ctx context.Context,
411
        pollRequest *types.PollForActivityTaskRequest,
412
) (resp *types.PollForActivityTaskResponse, retError error) {
722✔
413
        callTime := time.Now()
722✔
414

722✔
415
        if wh.isShuttingDown() {
722✔
416
                return nil, validate.ErrShuttingDown
×
417
        }
×
418

419
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
722✔
420
                return nil, err
×
421
        }
×
422

423
        if pollRequest == nil {
722✔
424
                return nil, validate.ErrRequestNotSet
×
425
        }
×
426

427
        domainName := pollRequest.GetDomain()
722✔
428
        if domainName == "" {
722✔
429
                return nil, validate.ErrDomainNotSet
×
430
        }
×
431

432
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForActivityTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
722✔
433
        wh.GetLogger().Debug("Received PollForActivityTask")
722✔
434
        if err := common.ValidateLongPollContextTimeout(
722✔
435
                ctx,
722✔
436
                "PollForActivityTask",
722✔
437
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
722✔
438
        ); err != nil {
724✔
439
                return nil, err
2✔
440
        }
2✔
441

442
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
720✔
443
        if !common.IsValidIDLength(
720✔
444
                domainName,
720✔
445
                scope,
720✔
446
                idLengthWarnLimit,
720✔
447
                wh.config.DomainNameMaxLength(domainName),
720✔
448
                metrics.CadenceErrDomainNameExceededWarnLimit,
720✔
449
                domainName,
720✔
450
                wh.GetLogger(),
720✔
451
                tag.IDTypeDomainName) {
720✔
452
                return nil, validate.ErrDomainTooLong
×
453
        }
×
454

455
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
720✔
456
                return nil, err
×
457
        }
×
458

459
        if !common.IsValidIDLength(
720✔
460
                pollRequest.GetIdentity(),
720✔
461
                scope,
720✔
462
                idLengthWarnLimit,
720✔
463
                wh.config.IdentityMaxLength(domainName),
720✔
464
                metrics.CadenceErrIdentityExceededWarnLimit,
720✔
465
                domainName,
720✔
466
                wh.GetLogger(),
720✔
467
                tag.IDTypeIdentity) {
720✔
468
                return nil, validate.ErrIdentityTooLong
×
469
        }
×
470

471
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
720✔
472
        if err != nil {
982✔
473
                return nil, err
262✔
474
        }
262✔
475

476
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
458✔
477
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
459✔
478
                return &types.PollForActivityTaskResponse{}, nil
1✔
479
        }
1✔
480
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
481
        // in this case, return an empty response
482
        if err := common.ValidateLongPollContextTimeout(
457✔
483
                ctx,
457✔
484
                "PollForActivityTask",
457✔
485
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
457✔
486
        ); err != nil {
457✔
487
                return &types.PollForActivityTaskResponse{}, nil
×
488
        }
×
489
        pollerID := uuid.New().String()
457✔
490
        op := func() error {
914✔
491
                resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{
457✔
492
                        DomainUUID:     domainID,
457✔
493
                        PollerID:       pollerID,
457✔
494
                        PollRequest:    pollRequest,
457✔
495
                        IsolationGroup: isolationGroup,
457✔
496
                })
457✔
497
                return err
457✔
498
        }
457✔
499

500
        err = wh.throttleRetry.Do(ctx, op)
457✔
501
        if err != nil {
523✔
502
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeActivity, pollRequest.TaskList, pollerID)
66✔
503
                if err != nil {
66✔
504
                        // For all other errors log an error and return it back to client.
×
505
                        ctxTimeout := "not-set"
×
506
                        ctxDeadline, ok := ctx.Deadline()
×
507
                        if ok {
×
508
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
509
                        }
×
510
                        wh.GetLogger().Error("PollForActivityTask failed.",
×
511
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
512
                                tag.Value(ctxTimeout),
×
513
                                tag.Error(err))
×
514
                        return nil, err
×
515
                }
516
        }
517
        return resp, nil
457✔
518
}
519

520
// PollForDecisionTask - Poll for a decision task.
521
func (wh *WorkflowHandler) PollForDecisionTask(
522
        ctx context.Context,
523
        pollRequest *types.PollForDecisionTaskRequest,
524
) (resp *types.PollForDecisionTaskResponse, retError error) {
1,509✔
525
        callTime := time.Now()
1,509✔
526

1,509✔
527
        if wh.isShuttingDown() {
1,509✔
528
                return nil, validate.ErrShuttingDown
×
529
        }
×
530

531
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1,509✔
532
                return nil, err
×
533
        }
×
534

535
        if pollRequest == nil {
1,509✔
536
                return nil, validate.ErrRequestNotSet
×
537
        }
×
538

539
        domainName := pollRequest.GetDomain()
1,509✔
540
        tags := getDomainWfIDRunIDTags(domainName, nil)
1,509✔
541

1,509✔
542
        if domainName == "" {
1,509✔
543
                return nil, validate.ErrDomainNotSet
×
544
        }
×
545

546
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForDecisionTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1,509✔
547
        wh.GetLogger().Debug("Received PollForDecisionTask")
1,509✔
548
        if err := common.ValidateLongPollContextTimeout(
1,509✔
549
                ctx,
1,509✔
550
                "PollForDecisionTask",
1,509✔
551
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
1,509✔
552
        ); err != nil {
1,511✔
553
                return nil, err
2✔
554
        }
2✔
555

556
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
1,507✔
557
        if !common.IsValidIDLength(
1,507✔
558
                domainName,
1,507✔
559
                scope,
1,507✔
560
                idLengthWarnLimit,
1,507✔
561
                wh.config.DomainNameMaxLength(domainName),
1,507✔
562
                metrics.CadenceErrDomainNameExceededWarnLimit,
1,507✔
563
                domainName,
1,507✔
564
                wh.GetLogger(),
1,507✔
565
                tag.IDTypeDomainName) {
1,507✔
566
                return nil, validate.ErrDomainTooLong
×
567
        }
×
568

569
        if !common.IsValidIDLength(
1,507✔
570
                pollRequest.GetIdentity(),
1,507✔
571
                scope,
1,507✔
572
                idLengthWarnLimit,
1,507✔
573
                wh.config.IdentityMaxLength(domainName),
1,507✔
574
                metrics.CadenceErrIdentityExceededWarnLimit,
1,507✔
575
                domainName,
1,507✔
576
                wh.GetLogger(),
1,507✔
577
                tag.IDTypeIdentity) {
1,507✔
578
                return nil, validate.ErrIdentityTooLong
×
579
        }
×
580

581
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
1,507✔
582
                return nil, err
×
583
        }
×
584

585
        domainEntry, err := wh.GetDomainCache().GetDomain(domainName)
1,507✔
586
        if err != nil {
1,769✔
587
                return nil, err
262✔
588
        }
262✔
589
        domainID := domainEntry.GetInfo().ID
1,245✔
590

1,245✔
591
        wh.GetLogger().Debug("Poll for decision.", tag.WorkflowDomainName(domainName), tag.WorkflowDomainID(domainID))
1,245✔
592
        if err := wh.checkBadBinary(domainEntry, pollRequest.GetBinaryChecksum()); err != nil {
1,245✔
593
                return nil, err
×
594
        }
×
595

596
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
1,245✔
597
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
1,246✔
598
                return &types.PollForDecisionTaskResponse{}, nil
1✔
599
        }
1✔
600
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
601
        // in this case, return an empty response
602
        if err := common.ValidateLongPollContextTimeout(
1,244✔
603
                ctx,
1,244✔
604
                "PollForDecisionTask",
1,244✔
605
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
1,244✔
606
        ); err != nil {
1,244✔
607
                return &types.PollForDecisionTaskResponse{}, nil
×
608
        }
×
609

610
        pollerID := uuid.New().String()
1,244✔
611
        var matchingResp *types.MatchingPollForDecisionTaskResponse
1,244✔
612
        op := func() error {
2,488✔
613
                matchingResp, err = wh.GetMatchingClient().PollForDecisionTask(ctx, &types.MatchingPollForDecisionTaskRequest{
1,244✔
614
                        DomainUUID:     domainID,
1,244✔
615
                        PollerID:       pollerID,
1,244✔
616
                        PollRequest:    pollRequest,
1,244✔
617
                        IsolationGroup: isolationGroup,
1,244✔
618
                })
1,244✔
619
                return err
1,244✔
620
        }
1,244✔
621

622
        err = wh.throttleRetry.Do(ctx, op)
1,244✔
623
        if err != nil {
1,310✔
624
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeDecision, pollRequest.TaskList, pollerID)
66✔
625
                if err != nil {
66✔
626
                        // For all other errors log an error and return it back to client.
×
627
                        ctxTimeout := "not-set"
×
628
                        ctxDeadline, ok := ctx.Deadline()
×
629
                        if ok {
×
630
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
631
                        }
×
632
                        wh.GetLogger().Error("PollForDecisionTask failed.",
×
633
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
634
                                tag.Value(ctxTimeout),
×
635
                                tag.Error(err))
×
636
                        return nil, err
×
637
                }
638

639
                // Must be cancellation error.  Does'nt matter what we return here.  Client already went away.
640
                return nil, nil
66✔
641
        }
642

643
        tags = append(tags, []tag.Tag{tag.WorkflowID(
1,178✔
644
                matchingResp.GetWorkflowExecution().GetWorkflowID()),
1,178✔
645
                tag.WorkflowRunID(matchingResp.GetWorkflowExecution().GetRunID())}...)
1,178✔
646
        resp, err = wh.createPollForDecisionTaskResponse(ctx, scope, domainID, matchingResp, matchingResp.GetBranchToken())
1,178✔
647
        if err != nil {
1,178✔
648
                return nil, err
×
649
        }
×
650
        return resp, nil
1,178✔
651
}
652

653
func (wh *WorkflowHandler) getIsolationGroup(ctx context.Context, domainName string) string {
2,948✔
654
        return partition.IsolationGroupFromContext(ctx)
2,948✔
655
}
2,948✔
656

657
func (wh *WorkflowHandler) getPartitionConfig(ctx context.Context, domainName string) map[string]string {
514✔
658
        return partition.ConfigFromContext(ctx)
514✔
659
}
514✔
660

661
func (wh *WorkflowHandler) isIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,245✔
662
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,247✔
663
                isDrained, err := wh.GetIsolationGroupState().IsDrained(ctx, domainName, isolationGroup)
2✔
664
                if err != nil {
2✔
665
                        wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
666
                        return true
×
667
                }
×
668
                return !isDrained
2✔
669
        }
670
        return true
1,243✔
671
}
672

673
func (wh *WorkflowHandler) waitUntilIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,703✔
674
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,705✔
675
                ticker := time.NewTicker(time.Second * 30)
2✔
676
                defer ticker.Stop()
2✔
677
                childCtx, cancel := common.CreateChildContext(ctx, 0.05)
2✔
678
                defer cancel()
2✔
679
                for {
4✔
680
                        isDrained, err := wh.GetIsolationGroupState().IsDrained(childCtx, domainName, isolationGroup)
2✔
681
                        if err != nil {
2✔
682
                                wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
683
                                return true
×
684
                        }
×
685
                        if !isDrained {
2✔
686
                                break
×
687
                        }
688
                        select {
2✔
689
                        case <-childCtx.Done():
2✔
690
                                return false
2✔
691
                        case <-ticker.C:
×
692
                        }
693
                }
694
        }
695
        return true
1,701✔
696
}
697

698
func (wh *WorkflowHandler) checkBadBinary(domainEntry *cache.DomainCacheEntry, binaryChecksum string) error {
1,245✔
699
        if domainEntry.GetConfig().BadBinaries.Binaries != nil {
2,489✔
700
                badBinaries := domainEntry.GetConfig().BadBinaries.Binaries
1,244✔
701
                _, ok := badBinaries[binaryChecksum]
1,244✔
702
                if ok {
1,244✔
703
                        wh.GetMetricsClient().IncCounter(metrics.FrontendPollForDecisionTaskScope, metrics.CadenceErrBadBinaryCounter)
×
704
                        return &types.BadRequestError{
×
705
                                Message: fmt.Sprintf("binary %v already marked as bad deployment", binaryChecksum),
×
706
                        }
×
707
                }
×
708
        }
709
        return nil
1,245✔
710
}
711

712
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error, domainID string, taskListType int32,
713
        taskList *types.TaskList, pollerID string) error {
132✔
714
        // First check if this err is due to context cancellation.  This means client connection to frontend is closed.
132✔
715
        if ctx.Err() == context.Canceled {
264✔
716
                // Our rpc stack does not propagates context cancellation to the other service.  Lets make an explicit
132✔
717
                // call to matching to notify this poller is gone to prevent any tasks being dispatched to zombie pollers.
132✔
718
                err = wh.GetMatchingClient().CancelOutstandingPoll(context.Background(), &types.CancelOutstandingPollRequest{
132✔
719
                        DomainUUID:   domainID,
132✔
720
                        TaskListType: common.Int32Ptr(taskListType),
132✔
721
                        TaskList:     taskList,
132✔
722
                        PollerID:     pollerID,
132✔
723
                })
132✔
724
                // We can not do much if this call fails.  Just log the error and move on
132✔
725
                if err != nil {
132✔
726
                        wh.GetLogger().Warn("Failed to cancel outstanding poller.",
×
727
                                tag.WorkflowTaskListName(taskList.GetName()), tag.Error(err))
×
728
                }
×
729

730
                // Clear error as we don't want to report context cancellation error to count against our SLA
731
                return nil
132✔
732
        }
733

734
        return err
×
735
}
736

737
// RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
738
func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
739
        ctx context.Context,
740
        heartbeatRequest *types.RecordActivityTaskHeartbeatRequest,
741
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
384✔
742
        if wh.isShuttingDown() {
384✔
743
                return nil, validate.ErrShuttingDown
×
744
        }
×
745

746
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
384✔
747
                return nil, err
×
748
        }
×
749

750
        if heartbeatRequest == nil {
385✔
751
                return nil, validate.ErrRequestNotSet
1✔
752
        }
1✔
753

754
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeat")
383✔
755
        if heartbeatRequest.TaskToken == nil {
384✔
756
                return nil, validate.ErrTaskTokenNotSet
1✔
757
        }
1✔
758
        taskToken, err := wh.tokenSerializer.Deserialize(heartbeatRequest.TaskToken)
382✔
759
        if err != nil {
382✔
760
                return nil, err
×
761
        }
×
762
        if taskToken.DomainID == "" {
382✔
763
                return nil, validate.ErrDomainNotSet
×
764
        }
×
765

766
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
382✔
767
        if err != nil {
382✔
768
                return nil, err
×
769
        }
×
770

771
        dw := domainWrapper{
382✔
772
                domain: domainName,
382✔
773
        }
382✔
774
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
382✔
775
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
382✔
776
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
382✔
777

382✔
778
        if err := common.CheckEventBlobSizeLimit(
382✔
779
                len(heartbeatRequest.Details),
382✔
780
                sizeLimitWarn,
382✔
781
                sizeLimitError,
382✔
782
                taskToken.DomainID,
382✔
783
                taskToken.WorkflowID,
382✔
784
                taskToken.RunID,
382✔
785
                scope,
382✔
786
                wh.GetThrottledLogger(),
382✔
787
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeat"),
382✔
788
        ); err != nil {
382✔
789
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
790
                failRequest := &types.RespondActivityTaskFailedRequest{
×
791
                        TaskToken: heartbeatRequest.TaskToken,
×
792
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
793
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
794
                        Identity:  heartbeatRequest.Identity,
×
795
                }
×
796
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
797
                        DomainUUID:    taskToken.DomainID,
×
798
                        FailedRequest: failRequest,
×
799
                })
×
800
                if err != nil {
×
801
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
802
                }
×
803
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
804
        } else {
382✔
805
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
382✔
806
                        DomainUUID:       taskToken.DomainID,
382✔
807
                        HeartbeatRequest: heartbeatRequest,
382✔
808
                })
382✔
809
                if err != nil {
382✔
810
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
811
                }
×
812
        }
813

814
        return resp, nil
382✔
815
}
816

817
// RecordActivityTaskHeartbeatByID - Record Activity Task Heart beat.
818
func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
819
        ctx context.Context,
820
        heartbeatRequest *types.RecordActivityTaskHeartbeatByIDRequest,
821
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
3✔
822
        if wh.isShuttingDown() {
3✔
823
                return nil, validate.ErrShuttingDown
×
824
        }
×
825

826
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
827
                return nil, err
×
828
        }
×
829

830
        if heartbeatRequest == nil {
4✔
831
                return nil, validate.ErrRequestNotSet
1✔
832
        }
1✔
833

834
        domainName := heartbeatRequest.GetDomain()
2✔
835
        if domainName == "" {
3✔
836
                return nil, validate.ErrDomainNotSet
1✔
837
        }
1✔
838

839
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID")
1✔
840
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
1✔
841
        if err != nil {
1✔
842
                return nil, err
×
843
        }
×
844
        workflowID := heartbeatRequest.GetWorkflowID()
1✔
845
        runID := heartbeatRequest.GetRunID() // runID is optional so can be empty
1✔
846
        activityID := heartbeatRequest.GetActivityID()
1✔
847

1✔
848
        if domainID == "" {
1✔
849
                return nil, validate.ErrDomainNotSet
×
850
        }
×
851
        if workflowID == "" {
1✔
852
                return nil, validate.ErrWorkflowIDNotSet
×
853
        }
×
854
        if activityID == "" {
1✔
855
                return nil, validate.ErrActivityIDNotSet
×
856
        }
×
857

858
        taskToken := &common.TaskToken{
1✔
859
                DomainID:   domainID,
1✔
860
                RunID:      runID,
1✔
861
                WorkflowID: workflowID,
1✔
862
                ScheduleID: common.EmptyEventID,
1✔
863
                ActivityID: activityID,
1✔
864
        }
1✔
865
        token, err := wh.tokenSerializer.Serialize(taskToken)
1✔
866
        if err != nil {
1✔
867
                return nil, err
×
868
        }
×
869

870
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatByIDScope, heartbeatRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1✔
871
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
1✔
872
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
1✔
873

1✔
874
        if err := common.CheckEventBlobSizeLimit(
1✔
875
                len(heartbeatRequest.Details),
1✔
876
                sizeLimitWarn,
1✔
877
                sizeLimitError,
1✔
878
                taskToken.DomainID,
1✔
879
                taskToken.WorkflowID,
1✔
880
                taskToken.RunID,
1✔
881
                scope,
1✔
882
                wh.GetThrottledLogger(),
1✔
883
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeatByID"),
1✔
884
        ); err != nil {
1✔
885
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
886
                failRequest := &types.RespondActivityTaskFailedRequest{
×
887
                        TaskToken: token,
×
888
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
889
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
890
                        Identity:  heartbeatRequest.Identity,
×
891
                }
×
892
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
893
                        DomainUUID:    taskToken.DomainID,
×
894
                        FailedRequest: failRequest,
×
895
                })
×
896
                if err != nil {
×
897
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
898
                }
×
899
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
900
        } else {
1✔
901
                req := &types.RecordActivityTaskHeartbeatRequest{
1✔
902
                        TaskToken: token,
1✔
903
                        Details:   heartbeatRequest.Details,
1✔
904
                        Identity:  heartbeatRequest.Identity,
1✔
905
                }
1✔
906

1✔
907
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
1✔
908
                        DomainUUID:       taskToken.DomainID,
1✔
909
                        HeartbeatRequest: req,
1✔
910
                })
1✔
911
                if err != nil {
1✔
912
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
913
                }
×
914
        }
915

916
        return resp, nil
1✔
917
}
918

919
// RespondActivityTaskCompleted - response to an activity task
920
func (wh *WorkflowHandler) RespondActivityTaskCompleted(
921
        ctx context.Context,
922
        completeRequest *types.RespondActivityTaskCompletedRequest,
923
) (retError error) {
262✔
924
        if wh.isShuttingDown() {
262✔
925
                return validate.ErrShuttingDown
×
926
        }
×
927

928
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
262✔
929
                return err
×
930
        }
×
931

932
        if completeRequest == nil {
262✔
933
                return validate.ErrRequestNotSet
×
934
        }
×
935

936
        if completeRequest.TaskToken == nil {
262✔
937
                return validate.ErrTaskTokenNotSet
×
938
        }
×
939
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
262✔
940
        if err != nil {
262✔
941
                return err
×
942
        }
×
943
        if taskToken.DomainID == "" {
262✔
944
                return validate.ErrDomainNotSet
×
945
        }
×
946

947
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
262✔
948
        if err != nil {
262✔
949
                return err
×
950
        }
×
951

952
        dw := domainWrapper{
262✔
953
                domain: domainName,
262✔
954
        }
262✔
955
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
262✔
956
        if !common.IsValidIDLength(
262✔
957
                completeRequest.GetIdentity(),
262✔
958
                scope,
262✔
959
                wh.config.MaxIDLengthWarnLimit(),
262✔
960
                wh.config.IdentityMaxLength(domainName),
262✔
961
                metrics.CadenceErrIdentityExceededWarnLimit,
262✔
962
                domainName,
262✔
963
                wh.GetLogger(),
262✔
964
                tag.IDTypeIdentity) {
262✔
965
                return validate.ErrIdentityTooLong
×
966
        }
×
967

968
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
262✔
969
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
262✔
970

262✔
971
        if err := common.CheckEventBlobSizeLimit(
262✔
972
                len(completeRequest.Result),
262✔
973
                sizeLimitWarn,
262✔
974
                sizeLimitError,
262✔
975
                taskToken.DomainID,
262✔
976
                taskToken.WorkflowID,
262✔
977
                taskToken.RunID,
262✔
978
                scope,
262✔
979
                wh.GetThrottledLogger(),
262✔
980
                tag.BlobSizeViolationOperation("RespondActivityTaskCompleted"),
262✔
981
        ); err != nil {
262✔
982
                // result exceeds blob size limit, we would record it as failure
×
983
                failRequest := &types.RespondActivityTaskFailedRequest{
×
984
                        TaskToken: completeRequest.TaskToken,
×
985
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
986
                        Details:   completeRequest.Result[0:sizeLimitError],
×
987
                        Identity:  completeRequest.Identity,
×
988
                }
×
989
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
990
                        DomainUUID:    taskToken.DomainID,
×
991
                        FailedRequest: failRequest,
×
992
                })
×
993
                if err != nil {
×
994
                        return wh.normalizeVersionedErrors(ctx, err)
×
995
                }
×
996
        } else {
262✔
997
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
262✔
998
                        DomainUUID:      taskToken.DomainID,
262✔
999
                        CompleteRequest: completeRequest,
262✔
1000
                })
262✔
1001
                if err != nil {
307✔
1002
                        return wh.normalizeVersionedErrors(ctx, err)
45✔
1003
                }
45✔
1004
        }
1005

1006
        return nil
217✔
1007
}
1008

1009
// RespondActivityTaskCompletedByID - response to an activity task
1010
func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
1011
        ctx context.Context,
1012
        completeRequest *types.RespondActivityTaskCompletedByIDRequest,
1013
) (retError error) {
76✔
1014
        if wh.isShuttingDown() {
76✔
1015
                return validate.ErrShuttingDown
×
1016
        }
×
1017

1018
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
76✔
1019
                return err
×
1020
        }
×
1021

1022
        if completeRequest == nil {
76✔
1023
                return validate.ErrRequestNotSet
×
1024
        }
×
1025

1026
        domainName := completeRequest.GetDomain()
76✔
1027
        if domainName == "" {
76✔
1028
                return validate.ErrDomainNotSet
×
1029
        }
×
1030
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
76✔
1031
        if err != nil {
76✔
1032
                return err
×
1033
        }
×
1034
        workflowID := completeRequest.GetWorkflowID()
76✔
1035
        runID := completeRequest.GetRunID() // runID is optional so can be empty
76✔
1036
        activityID := completeRequest.GetActivityID()
76✔
1037

76✔
1038
        if domainID == "" {
76✔
1039
                return validate.ErrDomainNotSet
×
1040
        }
×
1041
        if workflowID == "" {
76✔
1042
                return validate.ErrWorkflowIDNotSet
×
1043
        }
×
1044
        if activityID == "" {
76✔
1045
                return validate.ErrActivityIDNotSet
×
1046
        }
×
1047

1048
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedByIDScope, completeRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
76✔
1049
        if !common.IsValidIDLength(
76✔
1050
                completeRequest.GetIdentity(),
76✔
1051
                scope,
76✔
1052
                wh.config.MaxIDLengthWarnLimit(),
76✔
1053
                wh.config.IdentityMaxLength(domainName),
76✔
1054
                metrics.CadenceErrIdentityExceededWarnLimit,
76✔
1055
                domainName,
76✔
1056
                wh.GetLogger(),
76✔
1057
                tag.IDTypeIdentity) {
76✔
1058
                return validate.ErrIdentityTooLong
×
1059
        }
×
1060

1061
        taskToken := &common.TaskToken{
76✔
1062
                DomainID:   domainID,
76✔
1063
                RunID:      runID,
76✔
1064
                WorkflowID: workflowID,
76✔
1065
                ScheduleID: common.EmptyEventID,
76✔
1066
                ActivityID: activityID,
76✔
1067
        }
76✔
1068
        token, err := wh.tokenSerializer.Serialize(taskToken)
76✔
1069
        if err != nil {
76✔
1070
                return err
×
1071
        }
×
1072

1073
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
76✔
1074
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
76✔
1075

76✔
1076
        if err := common.CheckEventBlobSizeLimit(
76✔
1077
                len(completeRequest.Result),
76✔
1078
                sizeLimitWarn,
76✔
1079
                sizeLimitError,
76✔
1080
                taskToken.DomainID,
76✔
1081
                taskToken.WorkflowID,
76✔
1082
                taskToken.RunID,
76✔
1083
                scope,
76✔
1084
                wh.GetThrottledLogger(),
76✔
1085
                tag.BlobSizeViolationOperation("RespondActivityTaskCompletedByID"),
76✔
1086
        ); err != nil {
76✔
1087
                // result exceeds blob size limit, we would record it as failure
×
1088
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1089
                        TaskToken: token,
×
1090
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
1091
                        Details:   completeRequest.Result[0:sizeLimitError],
×
1092
                        Identity:  completeRequest.Identity,
×
1093
                }
×
1094
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1095
                        DomainUUID:    taskToken.DomainID,
×
1096
                        FailedRequest: failRequest,
×
1097
                })
×
1098
                if err != nil {
×
1099
                        return wh.normalizeVersionedErrors(ctx, err)
×
1100
                }
×
1101
        } else {
76✔
1102
                req := &types.RespondActivityTaskCompletedRequest{
76✔
1103
                        TaskToken: token,
76✔
1104
                        Result:    completeRequest.Result,
76✔
1105
                        Identity:  completeRequest.Identity,
76✔
1106
                }
76✔
1107

76✔
1108
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
76✔
1109
                        DomainUUID:      taskToken.DomainID,
76✔
1110
                        CompleteRequest: req,
76✔
1111
                })
76✔
1112
                if err != nil {
76✔
1113
                        return wh.normalizeVersionedErrors(ctx, err)
×
1114
                }
×
1115
        }
1116

1117
        return nil
76✔
1118
}
1119

1120
// RespondActivityTaskFailed - response to an activity task failure
1121
func (wh *WorkflowHandler) RespondActivityTaskFailed(
1122
        ctx context.Context,
1123
        failedRequest *types.RespondActivityTaskFailedRequest,
1124
) (retError error) {
14✔
1125
        if wh.isShuttingDown() {
15✔
1126
                return validate.ErrShuttingDown
1✔
1127
        }
1✔
1128

1129
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
13✔
1130
                return err
×
1131
        }
×
1132

1133
        if failedRequest == nil {
13✔
1134
                return validate.ErrRequestNotSet
×
1135
        }
×
1136

1137
        if failedRequest.TaskToken == nil {
13✔
1138
                return validate.ErrTaskTokenNotSet
×
1139
        }
×
1140
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
13✔
1141
        if err != nil {
13✔
1142
                return err
×
1143
        }
×
1144
        if taskToken.DomainID == "" {
13✔
1145
                return validate.ErrDomainNotSet
×
1146
        }
×
1147

1148
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
13✔
1149
        if err != nil {
13✔
1150
                return err
×
1151
        }
×
1152

1153
        dw := domainWrapper{
13✔
1154
                domain: domainName,
13✔
1155
        }
13✔
1156
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
13✔
1157
        if !common.IsValidIDLength(
13✔
1158
                failedRequest.GetIdentity(),
13✔
1159
                scope,
13✔
1160
                wh.config.MaxIDLengthWarnLimit(),
13✔
1161
                wh.config.IdentityMaxLength(domainName),
13✔
1162
                metrics.CadenceErrIdentityExceededWarnLimit,
13✔
1163
                domainName,
13✔
1164
                wh.GetLogger(),
13✔
1165
                tag.IDTypeIdentity) {
13✔
1166
                return validate.ErrIdentityTooLong
×
1167
        }
×
1168

1169
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
13✔
1170
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
13✔
1171

13✔
1172
        if err := common.CheckEventBlobSizeLimit(
13✔
1173
                len(failedRequest.Details),
13✔
1174
                sizeLimitWarn,
13✔
1175
                sizeLimitError,
13✔
1176
                taskToken.DomainID,
13✔
1177
                taskToken.WorkflowID,
13✔
1178
                taskToken.RunID,
13✔
1179
                scope,
13✔
1180
                wh.GetThrottledLogger(),
13✔
1181
                tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
13✔
1182
        ); err != nil {
13✔
1183
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1184
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1185
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1186
        }
×
1187

1188
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
13✔
1189
                DomainUUID:    taskToken.DomainID,
13✔
1190
                FailedRequest: failedRequest,
13✔
1191
        })
13✔
1192
        if err != nil {
13✔
1193
                return wh.normalizeVersionedErrors(ctx, err)
×
1194
        }
×
1195
        return nil
13✔
1196
}
1197

1198
// RespondActivityTaskFailedByID - response to an activity task failure
1199
func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
1200
        ctx context.Context,
1201
        failedRequest *types.RespondActivityTaskFailedByIDRequest,
1202
) (retError error) {
12✔
1203
        if wh.isShuttingDown() {
13✔
1204
                return validate.ErrShuttingDown
1✔
1205
        }
1✔
1206

1207
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
12✔
1208
                return err
1✔
1209
        }
1✔
1210

1211
        if failedRequest == nil {
11✔
1212
                return validate.ErrRequestNotSet
1✔
1213
        }
1✔
1214

1215
        domainName := failedRequest.GetDomain()
9✔
1216

9✔
1217
        if domainName == "" {
10✔
1218
                return validate.ErrDomainNotSet
1✔
1219
        }
1✔
1220
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
8✔
1221
        if err != nil {
9✔
1222
                return err
1✔
1223
        }
1✔
1224
        workflowID := failedRequest.GetWorkflowID()
7✔
1225
        runID := failedRequest.GetRunID() // runID is optional so can be empty
7✔
1226
        activityID := failedRequest.GetActivityID()
7✔
1227

7✔
1228
        if domainID == "" {
8✔
1229
                return validate.ErrDomainNotSet
1✔
1230
        }
1✔
1231
        if workflowID == "" {
7✔
1232
                return validate.ErrWorkflowIDNotSet
1✔
1233
        }
1✔
1234
        if activityID == "" {
6✔
1235
                return validate.ErrActivityIDNotSet
1✔
1236
        }
1✔
1237

1238
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedByIDScope, failedRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
4✔
1239
        if !common.IsValidIDLength(
4✔
1240
                failedRequest.GetIdentity(),
4✔
1241
                scope,
4✔
1242
                wh.config.MaxIDLengthWarnLimit(),
4✔
1243
                wh.config.IdentityMaxLength(failedRequest.GetDomain()),
4✔
1244
                metrics.CadenceErrIdentityExceededWarnLimit,
4✔
1245
                domainName,
4✔
1246
                wh.GetLogger(),
4✔
1247
                tag.IDTypeIdentity) {
5✔
1248
                return validate.ErrIdentityTooLong
1✔
1249
        }
1✔
1250

1251
        taskToken := &common.TaskToken{
3✔
1252
                DomainID:   domainID,
3✔
1253
                RunID:      runID,
3✔
1254
                WorkflowID: workflowID,
3✔
1255
                ScheduleID: common.EmptyEventID,
3✔
1256
                ActivityID: activityID,
3✔
1257
        }
3✔
1258
        token, err := wh.tokenSerializer.Serialize(taskToken)
3✔
1259
        if err != nil {
4✔
1260
                return err
1✔
1261
        }
1✔
1262

1263
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
2✔
1264
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
2✔
1265

2✔
1266
        if err := common.CheckEventBlobSizeLimit(
2✔
1267
                len(failedRequest.Details),
2✔
1268
                sizeLimitWarn,
2✔
1269
                sizeLimitError,
2✔
1270
                taskToken.DomainID,
2✔
1271
                taskToken.WorkflowID,
2✔
1272
                taskToken.RunID,
2✔
1273
                scope,
2✔
1274
                wh.GetThrottledLogger(),
2✔
1275
                tag.BlobSizeViolationOperation("RespondActivityTaskFailedByID"),
2✔
1276
        ); err != nil {
3✔
1277
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
1✔
1278
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
1✔
1279
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
1✔
1280
        }
1✔
1281

1282
        req := &types.RespondActivityTaskFailedRequest{
2✔
1283
                TaskToken: token,
2✔
1284
                Reason:    failedRequest.Reason,
2✔
1285
                Details:   failedRequest.Details,
2✔
1286
                Identity:  failedRequest.Identity,
2✔
1287
        }
2✔
1288

2✔
1289
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
2✔
1290
                DomainUUID:    taskToken.DomainID,
2✔
1291
                FailedRequest: req,
2✔
1292
        })
2✔
1293
        if err != nil {
3✔
1294
                return wh.normalizeVersionedErrors(ctx, err)
1✔
1295
        }
1✔
1296
        return nil
1✔
1297
}
1298

1299
// RespondActivityTaskCanceled - called to cancel an activity task
1300
func (wh *WorkflowHandler) RespondActivityTaskCanceled(
1301
        ctx context.Context,
1302
        cancelRequest *types.RespondActivityTaskCanceledRequest,
1303
) (retError error) {
11✔
1304
        if wh.isShuttingDown() {
12✔
1305
                return validate.ErrShuttingDown
1✔
1306
        }
1✔
1307

1308
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
11✔
1309
                return err
1✔
1310
        }
1✔
1311

1312
        if cancelRequest == nil {
10✔
1313
                return validate.ErrRequestNotSet
1✔
1314
        }
1✔
1315

1316
        if cancelRequest.TaskToken == nil {
9✔
1317
                return validate.ErrTaskTokenNotSet
1✔
1318
        }
1✔
1319

1320
        taskToken, err := wh.tokenSerializer.Deserialize(cancelRequest.TaskToken)
7✔
1321
        if err != nil {
8✔
1322
                return err
1✔
1323
        }
1✔
1324

1325
        if taskToken.DomainID == "" {
7✔
1326
                return validate.ErrDomainNotSet
1✔
1327
        }
1✔
1328

1329
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
5✔
1330
        if err != nil {
6✔
1331
                return err
1✔
1332
        }
1✔
1333

1334
        dw := domainWrapper{
4✔
1335
                domain: domainName,
4✔
1336
        }
4✔
1337
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
4✔
1338
        if !common.IsValidIDLength(
4✔
1339
                cancelRequest.GetIdentity(),
4✔
1340
                scope,
4✔
1341
                wh.config.MaxIDLengthWarnLimit(),
4✔
1342
                wh.config.IdentityMaxLength(domainName),
4✔
1343
                metrics.CadenceErrIdentityExceededWarnLimit,
4✔
1344
                domainName,
4✔
1345
                wh.GetLogger(),
4✔
1346
                tag.IDTypeIdentity) {
5✔
1347
                return validate.ErrIdentityTooLong
1✔
1348
        }
1✔
1349

1350
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
3✔
1351
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
3✔
1352

3✔
1353
        if err := common.CheckEventBlobSizeLimit(
3✔
1354
                len(cancelRequest.Details),
3✔
1355
                sizeLimitWarn,
3✔
1356
                sizeLimitError,
3✔
1357
                taskToken.DomainID,
3✔
1358
                taskToken.WorkflowID,
3✔
1359
                taskToken.RunID,
3✔
1360
                scope,
3✔
1361
                wh.GetThrottledLogger(),
3✔
1362
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceled"),
3✔
1363
        ); err != nil {
4✔
1364
                // details exceeds blob size limit, we would record it as failure
1✔
1365
                failRequest := &types.RespondActivityTaskFailedRequest{
1✔
1366
                        TaskToken: cancelRequest.TaskToken,
1✔
1367
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
1✔
1368
                        Details:   cancelRequest.Details[0:sizeLimitError],
1✔
1369
                        Identity:  cancelRequest.Identity,
1✔
1370
                }
1✔
1371
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
1✔
1372
                        DomainUUID:    taskToken.DomainID,
1✔
1373
                        FailedRequest: failRequest,
1✔
1374
                })
1✔
1375
                if err != nil {
2✔
1376
                        return wh.normalizeVersionedErrors(ctx, err)
1✔
1377
                }
1✔
1378
        } else {
2✔
1379
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
2✔
1380
                        DomainUUID:    taskToken.DomainID,
2✔
1381
                        CancelRequest: cancelRequest,
2✔
1382
                })
2✔
1383
                if err != nil {
3✔
1384
                        return wh.normalizeVersionedErrors(ctx, err)
1✔
1385
                }
1✔
1386
        }
1387

1388
        return nil
1✔
1389
}
1390

1391
// RespondActivityTaskCanceledByID - called to cancel an activity task
1392
func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
1393
        ctx context.Context,
1394
        cancelRequest *types.RespondActivityTaskCanceledByIDRequest,
1395
) (retError error) {
13✔
1396
        if wh.isShuttingDown() {
14✔
1397
                return validate.ErrShuttingDown
1✔
1398
        }
1✔
1399

1400
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
13✔
1401
                return err
1✔
1402
        }
1✔
1403

1404
        if cancelRequest == nil {
12✔
1405
                return validate.ErrRequestNotSet
1✔
1406
        }
1✔
1407

1408
        domainName := cancelRequest.GetDomain()
10✔
1409
        if domainName == "" {
11✔
1410
                return validate.ErrDomainNotSet
1✔
1411
        }
1✔
1412
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
9✔
1413
        if err != nil {
10✔
1414
                return err
1✔
1415
        }
1✔
1416
        workflowID := cancelRequest.GetWorkflowID()
8✔
1417
        runID := cancelRequest.GetRunID() // runID is optional so can be empty
8✔
1418
        activityID := cancelRequest.GetActivityID()
8✔
1419

8✔
1420
        if domainID == "" {
9✔
1421
                return validate.ErrDomainNotSet
1✔
1422
        }
1✔
1423
        if workflowID == "" {
8✔
1424
                return validate.ErrWorkflowIDNotSet
1✔
1425
        }
1✔
1426
        if activityID == "" {
7✔
1427
                return validate.ErrActivityIDNotSet
1✔
1428
        }
1✔
1429

1430
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledByIDScope, cancelRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
5✔
1431
        if !common.IsValidIDLength(
5✔
1432
                cancelRequest.GetIdentity(),
5✔
1433
                scope,
5✔
1434
                wh.config.MaxIDLengthWarnLimit(),
5✔
1435
                wh.config.IdentityMaxLength(cancelRequest.GetDomain()),
5✔
1436
                metrics.CadenceErrIdentityExceededWarnLimit,
5✔
1437
                domainName,
5✔
1438
                wh.GetLogger(),
5✔
1439
                tag.IDTypeIdentity) {
6✔
1440
                return validate.ErrIdentityTooLong
1✔
1441
        }
1✔
1442

1443
        taskToken := &common.TaskToken{
4✔
1444
                DomainID:   domainID,
4✔
1445
                RunID:      runID,
4✔
1446
                WorkflowID: workflowID,
4✔
1447
                ScheduleID: common.EmptyEventID,
4✔
1448
                ActivityID: activityID,
4✔
1449
        }
4✔
1450
        token, err := wh.tokenSerializer.Serialize(taskToken)
4✔
1451
        if err != nil {
5✔
1452
                return err
1✔
1453
        }
1✔
1454

1455
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
3✔
1456
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
3✔
1457

3✔
1458
        if err := common.CheckEventBlobSizeLimit(
3✔
1459
                len(cancelRequest.Details),
3✔
1460
                sizeLimitWarn,
3✔
1461
                sizeLimitError,
3✔
1462
                taskToken.DomainID,
3✔
1463
                taskToken.WorkflowID,
3✔
1464
                taskToken.RunID,
3✔
1465
                scope,
3✔
1466
                wh.GetThrottledLogger(),
3✔
1467
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceledByID"),
3✔
1468
        ); err != nil {
4✔
1469
                // details exceeds blob size limit, we would record it as failure
1✔
1470
                failRequest := &types.RespondActivityTaskFailedRequest{
1✔
1471
                        TaskToken: token,
1✔
1472
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
1✔
1473
                        Details:   cancelRequest.Details[0:sizeLimitError],
1✔
1474
                        Identity:  cancelRequest.Identity,
1✔
1475
                }
1✔
1476
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
1✔
1477
                        DomainUUID:    taskToken.DomainID,
1✔
1478
                        FailedRequest: failRequest,
1✔
1479
                })
1✔
1480
                if err != nil {
2✔
1481
                        return wh.normalizeVersionedErrors(ctx, err)
1✔
1482
                }
1✔
1483
        } else {
2✔
1484
                req := &types.RespondActivityTaskCanceledRequest{
2✔
1485
                        TaskToken: token,
2✔
1486
                        Details:   cancelRequest.Details,
2✔
1487
                        Identity:  cancelRequest.Identity,
2✔
1488
                }
2✔
1489

2✔
1490
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
2✔
1491
                        DomainUUID:    taskToken.DomainID,
2✔
1492
                        CancelRequest: req,
2✔
1493
                })
2✔
1494
                if err != nil {
3✔
1495
                        return wh.normalizeVersionedErrors(ctx, err)
1✔
1496
                }
1✔
1497
        }
1498

1499
        return nil
1✔
1500
}
1501

1502
// RespondDecisionTaskCompleted - response to a decision task
1503
func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
1504
        ctx context.Context,
1505
        completeRequest *types.RespondDecisionTaskCompletedRequest,
1506
) (resp *types.RespondDecisionTaskCompletedResponse, retError error) {
962✔
1507
        if wh.isShuttingDown() {
963✔
1508
                return nil, validate.ErrShuttingDown
1✔
1509
        }
1✔
1510

1511
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
962✔
1512
                return nil, err
1✔
1513
        }
1✔
1514

1515
        if completeRequest == nil {
961✔
1516
                return nil, validate.ErrRequestNotSet
1✔
1517
        }
1✔
1518

1519
        if completeRequest.TaskToken == nil {
960✔
1520
                return nil, validate.ErrTaskTokenNotSet
1✔
1521
        }
1✔
1522
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
958✔
1523
        if err != nil {
959✔
1524
                return nil, err
1✔
1525
        }
1✔
1526
        if taskToken.DomainID == "" {
958✔
1527
                return nil, validate.ErrDomainNotSet
1✔
1528
        }
1✔
1529

1530
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
956✔
1531
        if err != nil {
957✔
1532
                return nil, err
1✔
1533
        }
1✔
1534

1535
        dw := domainWrapper{
955✔
1536
                domain: domainName,
955✔
1537
        }
955✔
1538
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
955✔
1539
        if !common.IsValidIDLength(
955✔
1540
                completeRequest.GetIdentity(),
955✔
1541
                scope,
955✔
1542
                wh.config.MaxIDLengthWarnLimit(),
955✔
1543
                wh.config.IdentityMaxLength(domainName),
955✔
1544
                metrics.CadenceErrIdentityExceededWarnLimit,
955✔
1545
                domainName,
955✔
1546
                wh.GetLogger(),
955✔
1547
                tag.IDTypeIdentity) {
956✔
1548
                return nil, validate.ErrIdentityTooLong
1✔
1549
        }
1✔
1550

1551
        if err := common.CheckDecisionResultLimit(
954✔
1552
                len(completeRequest.Decisions),
954✔
1553
                wh.config.DecisionResultCountLimit(domainName),
954✔
1554
                scope); err != nil {
955✔
1555
                return nil, err
1✔
1556
        }
1✔
1557

1558
        histResp, err := wh.GetHistoryClient().RespondDecisionTaskCompleted(ctx, &types.HistoryRespondDecisionTaskCompletedRequest{
953✔
1559
                DomainUUID:      taskToken.DomainID,
953✔
1560
                CompleteRequest: completeRequest},
953✔
1561
        )
953✔
1562
        if err != nil {
963✔
1563
                return nil, wh.normalizeVersionedErrors(ctx, err)
10✔
1564
        }
10✔
1565

1566
        completedResp := &types.RespondDecisionTaskCompletedResponse{}
943✔
1567
        completedResp.ActivitiesToDispatchLocally = histResp.ActivitiesToDispatchLocally
943✔
1568
        if completeRequest.GetReturnNewDecisionTask() && histResp != nil && histResp.StartedResponse != nil {
1,004✔
1569
                taskToken := &common.TaskToken{
61✔
1570
                        DomainID:        taskToken.DomainID,
61✔
1571
                        WorkflowID:      taskToken.WorkflowID,
61✔
1572
                        RunID:           taskToken.RunID,
61✔
1573
                        ScheduleID:      histResp.StartedResponse.GetScheduledEventID(),
61✔
1574
                        ScheduleAttempt: histResp.StartedResponse.GetAttempt(),
61✔
1575
                }
61✔
1576
                token, _ := wh.tokenSerializer.Serialize(taskToken)
61✔
1577
                workflowExecution := &types.WorkflowExecution{
61✔
1578
                        WorkflowID: taskToken.WorkflowID,
61✔
1579
                        RunID:      taskToken.RunID,
61✔
1580
                }
61✔
1581
                matchingResp := common.CreateMatchingPollForDecisionTaskResponse(histResp.StartedResponse, workflowExecution, token)
61✔
1582

61✔
1583
                newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, scope, taskToken.DomainID, matchingResp, matchingResp.GetBranchToken())
61✔
1584
                if err != nil {
61✔
1585
                        return nil, err
×
1586
                }
×
1587
                completedResp.DecisionTask = newDecisionTask
61✔
1588
        }
1589

1590
        return completedResp, nil
943✔
1591
}
1592

1593
// RespondDecisionTaskFailed - failed response to a decision task
1594
func (wh *WorkflowHandler) RespondDecisionTaskFailed(
1595
        ctx context.Context,
1596
        failedRequest *types.RespondDecisionTaskFailedRequest,
1597
) (retError error) {
169✔
1598
        if wh.isShuttingDown() {
170✔
1599
                return validate.ErrShuttingDown
1✔
1600
        }
1✔
1601

1602
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
169✔
1603
                return err
1✔
1604
        }
1✔
1605

1606
        if failedRequest == nil {
168✔
1607
                return validate.ErrRequestNotSet
1✔
1608
        }
1✔
1609

1610
        if failedRequest.TaskToken == nil {
167✔
1611
                return validate.ErrTaskTokenNotSet
1✔
1612
        }
1✔
1613
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
165✔
1614
        if err != nil {
166✔
1615
                return err
1✔
1616
        }
1✔
1617
        if taskToken.DomainID == "" {
165✔
1618
                return validate.ErrDomainNotSet
1✔
1619
        }
1✔
1620

1621
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
163✔
1622
        if err != nil {
164✔
1623
                return err
1✔
1624
        }
1✔
1625

1626
        dw := domainWrapper{
162✔
1627
                domain: domainName,
162✔
1628
        }
162✔
1629
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
162✔
1630
        if !common.IsValidIDLength(
162✔
1631
                failedRequest.GetIdentity(),
162✔
1632
                scope,
162✔
1633
                wh.config.MaxIDLengthWarnLimit(),
162✔
1634
                wh.config.IdentityMaxLength(domainName),
162✔
1635
                metrics.CadenceErrIdentityExceededWarnLimit,
162✔
1636
                domainName,
162✔
1637
                wh.GetLogger(),
162✔
1638
                tag.IDTypeIdentity) {
163✔
1639
                return validate.ErrIdentityTooLong
1✔
1640
        }
1✔
1641

1642
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
161✔
1643
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
161✔
1644

161✔
1645
        if err := common.CheckEventBlobSizeLimit(
161✔
1646
                len(failedRequest.Details),
161✔
1647
                sizeLimitWarn,
161✔
1648
                sizeLimitError,
161✔
1649
                taskToken.DomainID,
161✔
1650
                taskToken.WorkflowID,
161✔
1651
                taskToken.RunID,
161✔
1652
                scope,
161✔
1653
                wh.GetThrottledLogger(),
161✔
1654
                tag.BlobSizeViolationOperation("RespondDecisionTaskFailed"),
161✔
1655
        ); err != nil {
162✔
1656
                // details exceed, we would just truncate the size for decision task failed as the details is not used anywhere by client code
1✔
1657
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
1✔
1658
        }
1✔
1659

1660
        err = wh.GetHistoryClient().RespondDecisionTaskFailed(ctx, &types.HistoryRespondDecisionTaskFailedRequest{
161✔
1661
                DomainUUID:    taskToken.DomainID,
161✔
1662
                FailedRequest: failedRequest,
161✔
1663
        })
161✔
1664
        if err != nil {
162✔
1665
                return wh.normalizeVersionedErrors(ctx, err)
1✔
1666
        }
1✔
1667
        return nil
160✔
1668
}
1669

1670
// RespondQueryTaskCompleted - response to a query task
1671
func (wh *WorkflowHandler) RespondQueryTaskCompleted(
1672
        ctx context.Context,
1673
        completeRequest *types.RespondQueryTaskCompletedRequest,
1674
) (retError error) {
39✔
1675
        if wh.isShuttingDown() {
40✔
1676
                return validate.ErrShuttingDown
1✔
1677
        }
1✔
1678

1679
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
39✔
1680
                return err
1✔
1681
        }
1✔
1682

1683
        if completeRequest == nil {
38✔
1684
                return validate.ErrRequestNotSet
1✔
1685
        }
1✔
1686

1687
        if completeRequest.TaskToken == nil {
37✔
1688
                return validate.ErrTaskTokenNotSet
1✔
1689
        }
1✔
1690
        queryTaskToken, err := wh.tokenSerializer.DeserializeQueryTaskToken(completeRequest.TaskToken)
35✔
1691
        if err != nil {
36✔
1692
                return err
1✔
1693
        }
1✔
1694
        if queryTaskToken.DomainID == "" || queryTaskToken.TaskList == "" || queryTaskToken.TaskID == "" {
35✔
1695
                return validate.ErrInvalidTaskToken
1✔
1696
        }
1✔
1697

1698
        domainName, err := wh.GetDomainCache().GetDomainName(queryTaskToken.DomainID)
33✔
1699
        if err != nil {
34✔
1700
                return err
1✔
1701
        }
1✔
1702

1703
        dw := domainWrapper{
32✔
1704
                domain: domainName,
32✔
1705
        }
32✔
1706
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
32✔
1707
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
32✔
1708

32✔
1709
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondQueryTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
32✔
1710
        if err := common.CheckEventBlobSizeLimit(
32✔
1711
                len(completeRequest.GetQueryResult()),
32✔
1712
                sizeLimitWarn,
32✔
1713
                sizeLimitError,
32✔
1714
                queryTaskToken.DomainID,
32✔
1715
                "",
32✔
1716
                "",
32✔
1717
                scope,
32✔
1718
                wh.GetThrottledLogger(),
32✔
1719
                tag.BlobSizeViolationOperation("RespondQueryTaskCompleted"),
32✔
1720
        ); err != nil {
33✔
1721
                completeRequest = &types.RespondQueryTaskCompletedRequest{
1✔
1722
                        TaskToken:     completeRequest.TaskToken,
1✔
1723
                        CompletedType: types.QueryTaskCompletedTypeFailed.Ptr(),
1✔
1724
                        QueryResult:   nil,
1✔
1725
                        ErrorMessage:  err.Error(),
1✔
1726
                }
1✔
1727
        }
1✔
1728

1729
        call := yarpc.CallFromContext(ctx)
32✔
1730

32✔
1731
        completeRequest.WorkerVersionInfo = &types.WorkerVersionInfo{
32✔
1732
                Impl:           call.Header(common.ClientImplHeaderName),
32✔
1733
                FeatureVersion: call.Header(common.FeatureVersionHeaderName),
32✔
1734
        }
32✔
1735
        matchingRequest := &types.MatchingRespondQueryTaskCompletedRequest{
32✔
1736
                DomainUUID:       queryTaskToken.DomainID,
32✔
1737
                TaskList:         &types.TaskList{Name: queryTaskToken.TaskList},
32✔
1738
                TaskID:           queryTaskToken.TaskID,
32✔
1739
                CompletedRequest: completeRequest,
32✔
1740
        }
32✔
1741

32✔
1742
        err = wh.GetMatchingClient().RespondQueryTaskCompleted(ctx, matchingRequest)
32✔
1743
        if err != nil {
33✔
1744
                return err
1✔
1745
        }
1✔
1746
        return nil
31✔
1747
}
1748

1749
func (wh *WorkflowHandler) StartWorkflowExecutionAsync(
1750
        ctx context.Context,
1751
        startRequest *types.StartWorkflowExecutionAsyncRequest,
1752
) (resp *types.StartWorkflowExecutionAsyncResponse, retError error) {
3✔
1753
        if wh.isShuttingDown() {
3✔
1754
                return nil, validate.ErrShuttingDown
×
1755
        }
×
1756
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionAsyncScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
3✔
1757
        // validate request before pushing to queue
3✔
1758
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest.StartWorkflowExecutionRequest, scope)
3✔
1759
        if err != nil {
3✔
1760
                return nil, err
×
1761
        }
×
1762

1763
        producer, err := wh.producerManager.GetProducerByDomain(startRequest.GetDomain())
3✔
1764
        if err != nil {
4✔
1765
                return nil, err
1✔
1766
        }
1✔
1767

1768
        // Serialize the message to be sent to the queue.
1769
        // Avoid JSON because json encoding of requests excludes PII fields such as input. JSON encoded request are logged by acccess controlled api layer for audit purposes.
1770
        payload, err := wh.thriftrwEncoder.Encode(thrift.FromStartWorkflowExecutionAsyncRequest(startRequest))
2✔
1771
        if err != nil {
2✔
1772
                return nil, fmt.Errorf("failed to encode StartWorkflowExecutionAsyncRequest: %v", err)
×
1773
        }
×
1774
        scope.RecordTimer(metrics.AsyncRequestPayloadSize, time.Duration(len(payload)))
2✔
1775

2✔
1776
        // propagate the headers from the context to the message
2✔
1777
        header := &shared.Header{
2✔
1778
                Fields: make(map[string][]byte),
2✔
1779
        }
2✔
1780
        for k, v := range yarpc.CallFromContext(ctx).OriginalHeaders() {
2✔
1781
                header.Fields[k] = []byte(v)
×
1782
        }
×
1783
        messageType := sqlblobs.AsyncRequestTypeStartWorkflowExecutionAsyncRequest
2✔
1784
        message := &sqlblobs.AsyncRequestMessage{
2✔
1785
                PartitionKey: common.StringPtr(startRequest.GetWorkflowID()),
2✔
1786
                Type:         &messageType,
2✔
1787
                Header:       header,
2✔
1788
                Encoding:     common.StringPtr(string(common.EncodingTypeThriftRW)),
2✔
1789
                Payload:      payload,
2✔
1790
        }
2✔
1791
        err = producer.Publish(ctx, message)
2✔
1792
        if err != nil {
3✔
1793
                return nil, err
1✔
1794
        }
1✔
1795
        return &types.StartWorkflowExecutionAsyncResponse{}, nil
1✔
1796
}
1797

1798
// StartWorkflowExecution - Creates a new workflow execution
1799
func (wh *WorkflowHandler) StartWorkflowExecution(
1800
        ctx context.Context,
1801
        startRequest *types.StartWorkflowExecutionRequest,
1802
) (resp *types.StartWorkflowExecutionResponse, retError error) {
491✔
1803
        if wh.isShuttingDown() {
491✔
1804
                return nil, validate.ErrShuttingDown
×
1805
        }
×
1806
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
491✔
1807
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest, scope)
491✔
1808
        if err != nil {
502✔
1809
                return nil, err
11✔
1810
        }
11✔
1811
        domainName := startRequest.GetDomain()
480✔
1812
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
480✔
1813
        if err != nil {
480✔
1814
                return nil, err
×
1815
        }
×
1816
        wh.GetLogger().Debug("Start workflow execution request domainID", tag.WorkflowDomainID(domainID))
480✔
1817
        historyRequest, err := common.CreateHistoryStartWorkflowRequest(
480✔
1818
                domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
480✔
1819
        if err != nil {
480✔
1820
                return nil, err
×
1821
        }
×
1822

1823
        resp, err = wh.GetHistoryClient().StartWorkflowExecution(ctx, historyRequest)
480✔
1824
        if err != nil {
513✔
1825
                return nil, err
33✔
1826
        }
33✔
1827
        return resp, nil
447✔
1828
}
1829

1830
func (wh *WorkflowHandler) validateStartWorkflowExecutionRequest(ctx context.Context, startRequest *types.StartWorkflowExecutionRequest, scope metrics.Scope) error {
494✔
1831
        if startRequest == nil {
495✔
1832
                return validate.ErrRequestNotSet
1✔
1833
        }
1✔
1834
        domainName := startRequest.GetDomain()
493✔
1835
        if domainName == "" {
494✔
1836
                return validate.ErrDomainNotSet
1✔
1837
        }
1✔
1838
        if startRequest.GetWorkflowID() == "" {
493✔
1839
                return validate.ErrWorkflowIDNotSet
1✔
1840
        }
1✔
1841
        if _, err := uuid.Parse(startRequest.RequestID); err != nil {
493✔
1842
                return &types.BadRequestError{Message: fmt.Sprintf("requestId %q is not a valid UUID", startRequest.RequestID)}
2✔
1843
        }
2✔
1844
        if startRequest.WorkflowType == nil || startRequest.WorkflowType.GetName() == "" {
490✔
1845
                return validate.ErrWorkflowTypeNotSet
1✔
1846
        }
1✔
1847
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
488✔
1848
                return err
×
1849
        }
×
1850
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
488✔
1851
        if !common.IsValidIDLength(
488✔
1852
                domainName,
488✔
1853
                scope,
488✔
1854
                idLengthWarnLimit,
488✔
1855
                wh.config.DomainNameMaxLength(domainName),
488✔
1856
                metrics.CadenceErrDomainNameExceededWarnLimit,
488✔
1857
                domainName,
488✔
1858
                wh.GetLogger(),
488✔
1859
                tag.IDTypeDomainName) {
488✔
1860
                return validate.ErrDomainTooLong
×
1861
        }
×
1862
        if !common.IsValidIDLength(
488✔
1863
                startRequest.GetWorkflowID(),
488✔
1864
                scope,
488✔
1865
                idLengthWarnLimit,
488✔
1866
                wh.config.WorkflowIDMaxLength(domainName),
488✔
1867
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
488✔
1868
                domainName,
488✔
1869
                wh.GetLogger(),
488✔
1870
                tag.IDTypeWorkflowID) {
488✔
1871
                return validate.ErrWorkflowIDTooLong
×
1872
        }
×
1873
        if err := common.ValidateRetryPolicy(startRequest.RetryPolicy); err != nil {
488✔
1874
                return err
×
1875
        }
×
1876
        wh.GetLogger().Debug(
488✔
1877
                "Received StartWorkflowExecution. WorkflowID",
488✔
1878
                tag.WorkflowID(startRequest.GetWorkflowID()))
488✔
1879
        if !common.IsValidIDLength(
488✔
1880
                startRequest.WorkflowType.GetName(),
488✔
1881
                scope,
488✔
1882
                idLengthWarnLimit,
488✔
1883
                wh.config.WorkflowTypeMaxLength(domainName),
488✔
1884
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
488✔
1885
                domainName,
488✔
1886
                wh.GetLogger(),
488✔
1887
                tag.IDTypeWorkflowType) {
488✔
1888
                return validate.ErrWorkflowTypeTooLong
×
1889
        }
×
1890
        if err := wh.validateTaskList(startRequest.TaskList, scope, domainName); err != nil {
489✔
1891
                return err
1✔
1892
        }
1✔
1893
        if startRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
488✔
1894
                return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
1✔
1895
        }
1✔
1896
        if startRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
487✔
1897
                return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
1✔
1898
        }
1✔
1899
        if startRequest.GetDelayStartSeconds() < 0 {
486✔
1900
                return validate.ErrInvalidDelayStartSeconds
1✔
1901
        }
1✔
1902
        if startRequest.GetJitterStartSeconds() < 0 {
484✔
1903
                return validate.ErrInvalidJitterStartSeconds
×
1904
        }
×
1905
        jitter := startRequest.GetJitterStartSeconds()
484✔
1906
        cron := startRequest.GetCronSchedule()
484✔
1907
        if cron != "" {
502✔
1908
                if _, err := backoff.ValidateSchedule(startRequest.GetCronSchedule()); err != nil {
18✔
1909
                        return err
×
1910
                }
×
1911
        }
1912
        if jitter > 0 && cron != "" {
484✔
1913
                // Calculate the cron duration and ensure that jitter is not greater than the cron duration,
×
1914
                // because that would be confusing to users.
×
1915

×
1916
                // Request using start/end time zero value, which will get us an exact answer (i.e. its not in the
×
1917
                // middle of a minute)
×
1918
                backoffSeconds, err := backoff.GetBackoffForNextScheduleInSeconds(cron, time.Time{}, time.Time{}, jitter)
×
1919
                if err != nil {
×
1920
                        return err
×
1921
                }
×
1922
                if jitter > backoffSeconds {
×
1923
                        return validate.ErrInvalidJitterStartSeconds2
×
1924
                }
×
1925
        }
1926
        if !common.IsValidIDLength(
484✔
1927
                startRequest.GetRequestID(),
484✔
1928
                scope,
484✔
1929
                idLengthWarnLimit,
484✔
1930
                wh.config.RequestIDMaxLength(domainName),
484✔
1931
                metrics.CadenceErrRequestIDExceededWarnLimit,
484✔
1932
                domainName,
484✔
1933
                wh.GetLogger(),
484✔
1934
                tag.IDTypeRequestID) {
484✔
1935
                return validate.ErrRequestIDTooLong
×
1936
        }
×
1937
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(startRequest.SearchAttributes, domainName); err != nil {
484✔
1938
                return err
×
1939
        }
×
1940
        wh.GetLogger().Debug("Start workflow execution request domain", tag.WorkflowDomainName(domainName))
484✔
1941
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
484✔
1942
        if err != nil {
484✔
1943
                return err
×
1944
        }
×
1945
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
484✔
1946
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
484✔
1947
        actualSize := len(startRequest.Input)
484✔
1948
        if startRequest.Memo != nil {
496✔
1949
                actualSize += common.GetSizeOfMapStringToByteArray(startRequest.Memo.GetFields())
12✔
1950
        }
12✔
1951
        if err := common.CheckEventBlobSizeLimit(
484✔
1952
                actualSize,
484✔
1953
                sizeLimitWarn,
484✔
1954
                sizeLimitError,
484✔
1955
                domainID,
484✔
1956
                startRequest.GetWorkflowID(),
484✔
1957
                "",
484✔
1958
                scope,
484✔
1959
                wh.GetThrottledLogger(),
484✔
1960
                tag.BlobSizeViolationOperation("StartWorkflowExecution"),
484✔
1961
        ); err != nil {
484✔
1962
                return err
×
1963
        }
×
1964
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
484✔
1965
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
485✔
1966
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
1967
        }
1✔
1968
        return nil
483✔
1969
}
1970

1971
// GetWorkflowExecutionHistory - retrieves the history of workflow execution
1972
func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
1973
        ctx context.Context,
1974
        getRequest *types.GetWorkflowExecutionHistoryRequest,
1975
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
464✔
1976
        if wh.isShuttingDown() {
464✔
1977
                return nil, validate.ErrShuttingDown
×
1978
        }
×
1979

1980
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
464✔
1981
                return nil, err
×
1982
        }
×
1983

1984
        if getRequest == nil {
464✔
1985
                return nil, validate.ErrRequestNotSet
×
1986
        }
×
1987

1988
        domainName := getRequest.GetDomain()
464✔
1989
        wfExecution := getRequest.GetExecution()
464✔
1990

464✔
1991
        if domainName == "" {
464✔
1992
                return nil, validate.ErrDomainNotSet
×
1993
        }
×
1994

1995
        if err := validate.CheckExecution(wfExecution); err != nil {
464✔
1996
                return nil, err
×
1997
        }
×
1998

1999
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
464✔
2000
        if err != nil {
464✔
2001
                return nil, err
×
2002
        }
×
2003

2004
        if getRequest.GetMaximumPageSize() <= 0 {
796✔
2005
                getRequest.MaximumPageSize = int32(wh.config.HistoryMaxPageSize(getRequest.GetDomain()))
332✔
2006
        }
332✔
2007
        // force limit page size if exceed
2008
        if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize {
464✔
2009
                wh.GetThrottledLogger().Warn("GetHistory page size is larger than threshold",
×
2010
                        tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2011
                        tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2012
                        tag.WorkflowDomainID(domainID),
×
2013
                        tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))
×
2014
                getRequest.MaximumPageSize = common.GetHistoryMaxPageSize
×
2015
        }
×
2016

2017
        scope := getMetricsScopeWithDomain(metrics.FrontendGetWorkflowExecutionHistoryScope, getRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
464✔
2018
        if !getRequest.GetSkipArchival() {
910✔
2019
                enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled()
446✔
2020
                historyArchived := wh.historyArchived(ctx, getRequest, domainID)
446✔
2021
                if enableArchivalRead && historyArchived {
469✔
2022
                        return wh.getArchivedHistory(ctx, getRequest, domainID)
23✔
2023
                }
23✔
2024
        }
2025

2026
        // this function return the following 6 things,
2027
        // 1. branch token
2028
        // 2. the workflow run ID
2029
        // 3. the last first event ID (the event ID of the last batch of events in the history)
2030
        // 4. the next event ID
2031
        // 5. whether the workflow is closed
2032
        // 6. error if any
2033
        queryHistory := func(
441✔
2034
                domainUUID string,
441✔
2035
                execution *types.WorkflowExecution,
441✔
2036
                expectedNextEventID int64,
441✔
2037
                currentBranchToken []byte,
441✔
2038
        ) ([]byte, string, int64, int64, bool, error) {
838✔
2039
                response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
397✔
2040
                        DomainUUID:          domainUUID,
397✔
2041
                        Execution:           execution,
397✔
2042
                        ExpectedNextEventID: expectedNextEventID,
397✔
2043
                        CurrentBranchToken:  currentBranchToken,
397✔
2044
                })
397✔
2045

397✔
2046
                if err != nil {
397✔
2047
                        return nil, "", 0, 0, false, err
×
2048
                }
×
2049
                isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
397✔
2050

397✔
2051
                return response.CurrentBranchToken,
397✔
2052
                        response.Execution.GetRunID(),
397✔
2053
                        response.GetLastFirstEventID(),
397✔
2054
                        response.GetNextEventID(),
397✔
2055
                        isWorkflowRunning,
397✔
2056
                        nil
397✔
2057
        }
2058

2059
        isLongPoll := getRequest.GetWaitForNewEvent()
441✔
2060
        isCloseEventOnly := getRequest.GetHistoryEventFilterType() == types.HistoryEventFilterTypeCloseEvent
441✔
2061
        execution := getRequest.Execution
441✔
2062
        token := &getHistoryContinuationToken{}
441✔
2063

441✔
2064
        var runID string
441✔
2065
        lastFirstEventID := common.FirstEventID
441✔
2066
        var nextEventID int64
441✔
2067
        var isWorkflowRunning bool
441✔
2068

441✔
2069
        // process the token for paging
441✔
2070
        queryNextEventID := common.EndEventID
441✔
2071
        if getRequest.NextPageToken != nil {
485✔
2072
                token, err = deserializeHistoryToken(getRequest.NextPageToken)
44✔
2073
                if err != nil {
44✔
2074
                        return nil, validate.ErrInvalidNextPageToken
×
2075
                }
×
2076
                if execution.RunID != "" && execution.GetRunID() != token.RunID {
44✔
2077
                        return nil, validate.ErrNextPageTokenRunIDMismatch
×
2078
                }
×
2079

2080
                execution.RunID = token.RunID
44✔
2081

44✔
2082
                // we need to update the current next event ID and whether workflow is running
44✔
2083
                if len(token.PersistenceToken) == 0 && isLongPoll && token.IsWorkflowRunning {
44✔
2084
                        logger := wh.GetLogger().WithTags(
×
2085
                                tag.WorkflowDomainName(getRequest.GetDomain()),
×
2086
                                tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2087
                                tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2088
                        )
×
2089
                        // TODO: for now we only log the invalid timeout (this is done inside the helper function) in case
×
2090
                        // this change breaks existing customers. Once we are sure no one is calling this API with very short timeout
×
2091
                        // we can return the error.
×
2092
                        _ = common.ValidateLongPollContextTimeout(ctx, "GetWorkflowExecutionHistory", logger)
×
2093

×
2094
                        if !isCloseEventOnly {
×
2095
                                queryNextEventID = token.NextEventID
×
2096
                        }
×
2097
                        token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
×
2098
                                queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
×
2099
                        if err != nil {
×
2100
                                return nil, err
×
2101
                        }
×
2102
                        token.FirstEventID = token.NextEventID
×
2103
                        token.NextEventID = nextEventID
×
2104
                        token.IsWorkflowRunning = isWorkflowRunning
×
2105
                }
2106
        } else {
397✔
2107
                if !isCloseEventOnly {
776✔
2108
                        queryNextEventID = common.FirstEventID
379✔
2109
                }
379✔
2110
                token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
397✔
2111
                        queryHistory(domainID, execution, queryNextEventID, nil)
397✔
2112
                if err != nil {
397✔
2113
                        return nil, err
×
2114
                }
×
2115

2116
                execution.RunID = runID
397✔
2117

397✔
2118
                token.RunID = runID
397✔
2119
                token.FirstEventID = common.FirstEventID
397✔
2120
                token.NextEventID = nextEventID
397✔
2121
                token.IsWorkflowRunning = isWorkflowRunning
397✔
2122
                token.PersistenceToken = nil
397✔
2123
        }
2124

2125
        call := yarpc.CallFromContext(ctx)
441✔
2126
        clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
441✔
2127
        clientImpl := call.Header(common.ClientImplHeaderName)
441✔
2128
        supportsRawHistoryQuery := wh.versionChecker.SupportsRawHistoryQuery(clientImpl, clientFeatureVersion) == nil
441✔
2129
        isRawHistoryEnabled := wh.config.SendRawWorkflowHistory(domainName) && supportsRawHistoryQuery
441✔
2130

441✔
2131
        history := &types.History{}
441✔
2132
        history.Events = []*types.HistoryEvent{}
441✔
2133
        var historyBlob []*types.DataBlob
441✔
2134

441✔
2135
        // helper function to just getHistory
441✔
2136
        getHistory := func(firstEventID, nextEventID int64, nextPageToken []byte) error {
882✔
2137
                if isRawHistoryEnabled {
443✔
2138
                        historyBlob, token.PersistenceToken, err = wh.getRawHistory(
2✔
2139
                                ctx,
2✔
2140
                                scope,
2✔
2141
                                domainID,
2✔
2142
                                domainName,
2✔
2143
                                *execution,
2✔
2144
                                firstEventID,
2✔
2145
                                nextEventID,
2✔
2146
                                getRequest.GetMaximumPageSize(),
2✔
2147
                                nextPageToken,
2✔
2148
                                token.TransientDecision,
2✔
2149
                                token.BranchToken,
2✔
2150
                        )
2✔
2151
                } else {
441✔
2152
                        history, token.PersistenceToken, err = wh.getHistory(
439✔
2153
                                ctx,
439✔
2154
                                scope,
439✔
2155
                                domainID,
439✔
2156
                                domainName,
439✔
2157
                                *execution,
439✔
2158
                                firstEventID,
439✔
2159
                                nextEventID,
439✔
2160
                                getRequest.GetMaximumPageSize(),
439✔
2161
                                nextPageToken,
439✔
2162
                                token.TransientDecision,
439✔
2163
                                token.BranchToken,
439✔
2164
                        )
439✔
2165
                }
439✔
2166
                if err != nil {
441✔
2167
                        return err
×
2168
                }
×
2169
                return nil
441✔
2170
        }
2171

2172
        if isCloseEventOnly {
459✔
2173
                if !isWorkflowRunning {
36✔
2174
                        if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil {
18✔
2175
                                return nil, err
×
2176
                        }
×
2177
                        if isRawHistoryEnabled {
18✔
2178
                                // since getHistory func will not return empty history, so the below is safe
×
2179
                                historyBlob = historyBlob[len(historyBlob)-1:]
×
2180
                        } else {
18✔
2181
                                // since getHistory func will not return empty history, so the below is safe
18✔
2182
                                history.Events = history.Events[len(history.Events)-1:]
18✔
2183
                        }
18✔
2184
                        token = nil
18✔
2185
                } else if isLongPoll {
×
2186
                        // set the persistence token to be nil so next time we will query history for updates
×
2187
                        token.PersistenceToken = nil
×
2188
                } else {
×
2189
                        token = nil
×
2190
                }
×
2191
        } else {
423✔
2192
                // return all events
423✔
2193
                if token.FirstEventID >= token.NextEventID {
423✔
2194
                        // currently there is no new event
×
2195
                        history.Events = []*types.HistoryEvent{}
×
2196
                        if !isWorkflowRunning {
×
2197
                                token = nil
×
2198
                        }
×
2199
                } else {
423✔
2200
                        if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil {
423✔
2201
                                return nil, err
×
2202
                        }
×
2203
                        // here, for long pull on history events, we need to intercept the paging token from cassandra
2204
                        // and do something clever
2205
                        if len(token.PersistenceToken) == 0 && (!token.IsWorkflowRunning || !isLongPoll) {
804✔
2206
                                // meaning, there is no more history to be returned
381✔
2207
                                token = nil
381✔
2208
                        }
381✔
2209
                }
2210
        }
2211

2212
        nextToken, err := serializeHistoryToken(token)
441✔
2213
        if err != nil {
441✔
2214
                return nil, err
×
2215
        }
×
2216
        return &types.GetWorkflowExecutionHistoryResponse{
441✔
2217
                History:       history,
441✔
2218
                RawHistory:    historyBlob,
441✔
2219
                NextPageToken: nextToken,
441✔
2220
                Archived:      false,
441✔
2221
        }, nil
441✔
2222
}
2223

2224
// SignalWorkflowExecution is used to send a signal event to running workflow execution.  This results in
2225
// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
2226
func (wh *WorkflowHandler) SignalWorkflowExecution(
2227
        ctx context.Context,
2228
        signalRequest *types.SignalWorkflowExecutionRequest,
2229
) (retError error) {
723✔
2230
        if wh.isShuttingDown() {
723✔
2231
                return validate.ErrShuttingDown
×
2232
        }
×
2233

2234
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
723✔
2235
                return err
×
2236
        }
×
2237

2238
        if signalRequest == nil {
723✔
2239
                return validate.ErrRequestNotSet
×
2240
        }
×
2241

2242
        domainName := signalRequest.GetDomain()
723✔
2243
        wfExecution := signalRequest.GetWorkflowExecution()
723✔
2244

723✔
2245
        if domainName == "" {
723✔
2246
                return validate.ErrDomainNotSet
×
2247
        }
×
2248
        if err := validate.CheckExecution(wfExecution); err != nil {
723✔
2249
                return err
×
2250
        }
×
2251

2252
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWorkflowExecutionScope, signalRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
723✔
2253
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
723✔
2254
        if !common.IsValidIDLength(
723✔
2255
                domainName,
723✔
2256
                scope,
723✔
2257
                idLengthWarnLimit,
723✔
2258
                wh.config.DomainNameMaxLength(domainName),
723✔
2259
                metrics.CadenceErrDomainNameExceededWarnLimit,
723✔
2260
                domainName,
723✔
2261
                wh.GetLogger(),
723✔
2262
                tag.IDTypeDomainName) {
723✔
2263
                return validate.ErrDomainTooLong
×
2264
        }
×
2265

2266
        if signalRequest.GetSignalName() == "" {
723✔
2267
                return validate.ErrSignalNameNotSet
×
2268
        }
×
2269

2270
        if !common.IsValidIDLength(
723✔
2271
                signalRequest.GetSignalName(),
723✔
2272
                scope,
723✔
2273
                idLengthWarnLimit,
723✔
2274
                wh.config.SignalNameMaxLength(domainName),
723✔
2275
                metrics.CadenceErrSignalNameExceededWarnLimit,
723✔
2276
                domainName,
723✔
2277
                wh.GetLogger(),
723✔
2278
                tag.IDTypeSignalName) {
723✔
2279
                return validate.ErrSignalNameTooLong
×
2280
        }
×
2281

2282
        if !common.IsValidIDLength(
723✔
2283
                signalRequest.GetRequestID(),
723✔
2284
                scope,
723✔
2285
                idLengthWarnLimit,
723✔
2286
                wh.config.RequestIDMaxLength(domainName),
723✔
2287
                metrics.CadenceErrRequestIDExceededWarnLimit,
723✔
2288
                domainName,
723✔
2289
                wh.GetLogger(),
723✔
2290
                tag.IDTypeRequestID) {
723✔
2291
                return validate.ErrRequestIDTooLong
×
2292
        }
×
2293

2294
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
723✔
2295
        if err != nil {
723✔
2296
                return err
×
2297
        }
×
2298

2299
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
723✔
2300
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
723✔
2301
        if err := common.CheckEventBlobSizeLimit(
723✔
2302
                len(signalRequest.Input),
723✔
2303
                sizeLimitWarn,
723✔
2304
                sizeLimitError,
723✔
2305
                domainID,
723✔
2306
                signalRequest.GetWorkflowExecution().GetWorkflowID(),
723✔
2307
                signalRequest.GetWorkflowExecution().GetRunID(),
723✔
2308
                scope,
723✔
2309
                wh.GetThrottledLogger(),
723✔
2310
                tag.BlobSizeViolationOperation("SignalWorkflowExecution"),
723✔
2311
        ); err != nil {
723✔
2312
                return err
×
2313
        }
×
2314

2315
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
723✔
2316
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
723✔
2317
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2318
        }
×
2319

2320
        err = wh.GetHistoryClient().SignalWorkflowExecution(ctx, &types.HistorySignalWorkflowExecutionRequest{
723✔
2321
                DomainUUID:    domainID,
723✔
2322
                SignalRequest: signalRequest,
723✔
2323
        })
723✔
2324
        if err != nil {
732✔
2325
                return wh.normalizeVersionedErrors(ctx, err)
9✔
2326
        }
9✔
2327

2328
        return nil
714✔
2329
}
2330

2331
func (wh *WorkflowHandler) SignalWithStartWorkflowExecutionAsync(
2332
        ctx context.Context,
2333
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionAsyncRequest,
2334
) (resp *types.SignalWithStartWorkflowExecutionAsyncResponse, retError error) {
3✔
2335
        if wh.isShuttingDown() {
3✔
2336
                return nil, validate.ErrShuttingDown
×
2337
        }
×
2338
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionAsyncScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
3✔
2339
        // validate request before pushing to queue
3✔
2340
        err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest.SignalWithStartWorkflowExecutionRequest, scope)
3✔
2341
        if err != nil {
3✔
2342
                return nil, err
×
2343
        }
×
2344
        producer, err := wh.producerManager.GetProducerByDomain(signalWithStartRequest.GetDomain())
3✔
2345
        if err != nil {
4✔
2346
                return nil, err
1✔
2347
        }
1✔
2348

2349
        // Serialize the message to be sent to the queue.
2350
        // Avoid JSON because json encoding of requests excludes PII fields such as input. JSON encoded request are logged by acccess controlled api layer for audit purposes.
2351
        payload, err := wh.thriftrwEncoder.Encode(thrift.FromSignalWithStartWorkflowExecutionAsyncRequest(signalWithStartRequest))
2✔
2352
        if err != nil {
2✔
2353
                return nil, fmt.Errorf("failed to encode SignalWithStartWorkflowExecutionAsyncRequest: %v", err)
×
2354
        }
×
2355
        scope.RecordTimer(metrics.AsyncRequestPayloadSize, time.Duration(len(payload)))
2✔
2356

2✔
2357
        // propagate the headers from the context to the message
2✔
2358
        header := &shared.Header{
2✔
2359
                Fields: map[string][]byte{},
2✔
2360
        }
2✔
2361
        for k, v := range yarpc.CallFromContext(ctx).OriginalHeaders() {
2✔
2362
                header.Fields[k] = []byte(v)
×
2363
        }
×
2364
        messageType := sqlblobs.AsyncRequestTypeSignalWithStartWorkflowExecutionAsyncRequest
2✔
2365
        message := &sqlblobs.AsyncRequestMessage{
2✔
2366
                PartitionKey: common.StringPtr(signalWithStartRequest.GetWorkflowID()),
2✔
2367
                Type:         &messageType,
2✔
2368
                Header:       header,
2✔
2369
                Encoding:     common.StringPtr(string(common.EncodingTypeThriftRW)),
2✔
2370
                Payload:      payload,
2✔
2371
        }
2✔
2372
        err = producer.Publish(ctx, message)
2✔
2373
        if err != nil {
3✔
2374
                return nil, err
1✔
2375
        }
1✔
2376
        return &types.SignalWithStartWorkflowExecutionAsyncResponse{}, nil
1✔
2377
}
2378

2379
// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
2380
// If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history
2381
// and a decision task being created for the execution.
2382
// If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled
2383
// event recorded in history, and a decision task being created for the execution
2384
func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(
2385
        ctx context.Context,
2386
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest,
2387
) (resp *types.StartWorkflowExecutionResponse, retError error) {
33✔
2388
        if wh.isShuttingDown() {
33✔
2389
                return nil, validate.ErrShuttingDown
×
2390
        }
×
2391

2392
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
33✔
2393
        err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest, scope)
33✔
2394
        if err != nil {
33✔
2395
                return nil, err
×
2396
        }
×
2397

2398
        domainName := signalWithStartRequest.GetDomain()
33✔
2399
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
33✔
2400
        if err != nil {
33✔
2401
                return nil, err
×
2402
        }
×
2403
        resp, err = wh.GetHistoryClient().SignalWithStartWorkflowExecution(ctx, &types.HistorySignalWithStartWorkflowExecutionRequest{
33✔
2404
                DomainUUID:             domainID,
33✔
2405
                SignalWithStartRequest: signalWithStartRequest,
33✔
2406
                PartitionConfig:        wh.getPartitionConfig(ctx, domainName),
33✔
2407
        })
33✔
2408
        if err != nil {
39✔
2409
                return nil, err
6✔
2410
        }
6✔
2411

2412
        return resp, nil
27✔
2413
}
2414

2415
func (wh *WorkflowHandler) validateSignalWithStartWorkflowExecutionRequest(ctx context.Context, signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest, scope metrics.Scope) error {
36✔
2416
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
36✔
2417
                return err
×
2418
        }
×
2419

2420
        if signalWithStartRequest == nil {
36✔
2421
                return validate.ErrRequestNotSet
×
2422
        }
×
2423

2424
        domainName := signalWithStartRequest.GetDomain()
36✔
2425
        if domainName == "" {
36✔
2426
                return validate.ErrDomainNotSet
×
2427
        }
×
2428
        if signalWithStartRequest.GetWorkflowID() == "" {
36✔
2429
                return validate.ErrWorkflowIDNotSet
×
2430
        }
×
2431

2432
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
36✔
2433
        if !common.IsValidIDLength(
36✔
2434
                domainName,
36✔
2435
                scope,
36✔
2436
                idLengthWarnLimit,
36✔
2437
                wh.config.DomainNameMaxLength(domainName),
36✔
2438
                metrics.CadenceErrDomainNameExceededWarnLimit,
36✔
2439
                domainName,
36✔
2440
                wh.GetLogger(),
36✔
2441
                tag.IDTypeDomainName) {
36✔
2442
                return validate.ErrDomainTooLong
×
2443
        }
×
2444

2445
        if !common.IsValidIDLength(
36✔
2446
                signalWithStartRequest.GetWorkflowID(),
36✔
2447
                scope,
36✔
2448
                idLengthWarnLimit,
36✔
2449
                wh.config.WorkflowIDMaxLength(domainName),
36✔
2450
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
36✔
2451
                domainName,
36✔
2452
                wh.GetLogger(),
36✔
2453
                tag.IDTypeWorkflowID) {
36✔
2454
                return validate.ErrWorkflowIDTooLong
×
2455
        }
×
2456

2457
        if signalWithStartRequest.GetSignalName() == "" {
36✔
2458
                return validate.ErrSignalNameNotSet
×
2459
        }
×
2460

2461
        if !common.IsValidIDLength(
36✔
2462
                signalWithStartRequest.GetSignalName(),
36✔
2463
                scope,
36✔
2464
                idLengthWarnLimit,
36✔
2465
                wh.config.SignalNameMaxLength(domainName),
36✔
2466
                metrics.CadenceErrSignalNameExceededWarnLimit,
36✔
2467
                domainName,
36✔
2468
                wh.GetLogger(),
36✔
2469
                tag.IDTypeSignalName) {
36✔
2470
                return validate.ErrSignalNameTooLong
×
2471
        }
×
2472

2473
        if signalWithStartRequest.WorkflowType == nil || signalWithStartRequest.WorkflowType.GetName() == "" {
36✔
2474
                return validate.ErrWorkflowTypeNotSet
×
2475
        }
×
2476

2477
        if !common.IsValidIDLength(
36✔
2478
                signalWithStartRequest.WorkflowType.GetName(),
36✔
2479
                scope,
36✔
2480
                idLengthWarnLimit,
36✔
2481
                wh.config.WorkflowTypeMaxLength(domainName),
36✔
2482
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
36✔
2483
                domainName,
36✔
2484
                wh.GetLogger(),
36✔
2485
                tag.IDTypeWorkflowType) {
36✔
2486
                return validate.ErrWorkflowTypeTooLong
×
2487
        }
×
2488

2489
        if err := wh.validateTaskList(signalWithStartRequest.TaskList, scope, domainName); err != nil {
36✔
2490
                return err
×
2491
        }
×
2492

2493
        if !common.IsValidIDLength(
36✔
2494
                signalWithStartRequest.GetRequestID(),
36✔
2495
                scope,
36✔
2496
                idLengthWarnLimit,
36✔
2497
                wh.config.RequestIDMaxLength(domainName),
36✔
2498
                metrics.CadenceErrRequestIDExceededWarnLimit,
36✔
2499
                domainName,
36✔
2500
                wh.GetLogger(),
36✔
2501
                tag.IDTypeRequestID) {
36✔
2502
                return validate.ErrRequestIDTooLong
×
2503
        }
×
2504

2505
        if signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
36✔
2506
                return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
×
2507
        }
×
2508

2509
        if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
36✔
2510
                return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
×
2511
        }
×
2512

2513
        if err := common.ValidateRetryPolicy(signalWithStartRequest.RetryPolicy); err != nil {
36✔
2514
                return err
×
2515
        }
×
2516

2517
        if signalWithStartRequest.GetCronSchedule() != "" {
36✔
2518
                if _, err := backoff.ValidateSchedule(signalWithStartRequest.GetCronSchedule()); err != nil {
×
2519
                        return err
×
2520
                }
×
2521
        }
2522

2523
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(signalWithStartRequest.SearchAttributes, domainName); err != nil {
36✔
2524
                return err
×
2525
        }
×
2526

2527
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
36✔
2528
        if err != nil {
36✔
2529
                return err
×
2530
        }
×
2531

2532
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
36✔
2533
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
36✔
2534
        if err := common.CheckEventBlobSizeLimit(
36✔
2535
                len(signalWithStartRequest.SignalInput),
36✔
2536
                sizeLimitWarn,
36✔
2537
                sizeLimitError,
36✔
2538
                domainID,
36✔
2539
                signalWithStartRequest.GetWorkflowID(),
36✔
2540
                "",
36✔
2541
                scope,
36✔
2542
                wh.GetThrottledLogger(),
36✔
2543
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
36✔
2544
        ); err != nil {
36✔
2545
                return err
×
2546
        }
×
2547
        actualSize := len(signalWithStartRequest.Input) + common.GetSizeOfMapStringToByteArray(signalWithStartRequest.Memo.GetFields())
36✔
2548
        if err := common.CheckEventBlobSizeLimit(
36✔
2549
                actualSize,
36✔
2550
                sizeLimitWarn,
36✔
2551
                sizeLimitError,
36✔
2552
                domainID,
36✔
2553
                signalWithStartRequest.GetWorkflowID(),
36✔
2554
                "",
36✔
2555
                scope,
36✔
2556
                wh.GetThrottledLogger(),
36✔
2557
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
36✔
2558
        ); err != nil {
36✔
2559
                return err
×
2560
        }
×
2561

2562
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
36✔
2563
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
36✔
2564
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2565
        }
×
2566
        return nil
36✔
2567
}
2568

2569
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event
2570
// in the history and immediately terminating the execution instance.
2571
func (wh *WorkflowHandler) TerminateWorkflowExecution(
2572
        ctx context.Context,
2573
        terminateRequest *types.TerminateWorkflowExecutionRequest,
2574
) (retError error) {
48✔
2575
        if wh.isShuttingDown() {
48✔
2576
                return validate.ErrShuttingDown
×
2577
        }
×
2578

2579
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
48✔
2580
                return err
×
2581
        }
×
2582

2583
        if terminateRequest == nil {
48✔
2584
                return validate.ErrRequestNotSet
×
2585
        }
×
2586

2587
        domainName := terminateRequest.GetDomain()
48✔
2588
        wfExecution := terminateRequest.GetWorkflowExecution()
48✔
2589
        if terminateRequest.GetDomain() == "" {
48✔
2590
                return validate.ErrDomainNotSet
×
2591
        }
×
2592
        if err := validate.CheckExecution(wfExecution); err != nil {
48✔
2593
                return err
×
2594
        }
×
2595

2596
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
48✔
2597
        if err != nil {
48✔
2598
                return err
×
2599
        }
×
2600

2601
        err = wh.GetHistoryClient().TerminateWorkflowExecution(ctx, &types.HistoryTerminateWorkflowExecutionRequest{
48✔
2602
                DomainUUID:       domainID,
48✔
2603
                TerminateRequest: terminateRequest,
48✔
2604
        })
48✔
2605
        if err != nil {
48✔
2606
                return wh.normalizeVersionedErrors(ctx, err)
×
2607
        }
×
2608

2609
        return nil
48✔
2610
}
2611

2612
// ResetWorkflowExecution reset an existing workflow execution to the nextFirstEventID
2613
// in the history and immediately terminating the current execution instance.
2614
func (wh *WorkflowHandler) ResetWorkflowExecution(
2615
        ctx context.Context,
2616
        resetRequest *types.ResetWorkflowExecutionRequest,
2617
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
15✔
2618
        if wh.isShuttingDown() {
15✔
2619
                return nil, validate.ErrShuttingDown
×
2620
        }
×
2621

2622
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2623
                return nil, err
×
2624
        }
×
2625

2626
        if resetRequest == nil {
15✔
2627
                return nil, validate.ErrRequestNotSet
×
2628
        }
×
2629

2630
        domainName := resetRequest.GetDomain()
15✔
2631
        wfExecution := resetRequest.GetWorkflowExecution()
15✔
2632
        if domainName == "" {
15✔
2633
                return nil, validate.ErrDomainNotSet
×
2634
        }
×
2635
        if err := validate.CheckExecution(wfExecution); err != nil {
15✔
2636
                return nil, err
×
2637
        }
×
2638

2639
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
15✔
2640
        if err != nil {
15✔
2641
                return nil, err
×
2642
        }
×
2643

2644
        resp, err = wh.GetHistoryClient().ResetWorkflowExecution(ctx, &types.HistoryResetWorkflowExecutionRequest{
15✔
2645
                DomainUUID:   domainID,
15✔
2646
                ResetRequest: resetRequest,
15✔
2647
        })
15✔
2648
        if err != nil {
15✔
2649
                return nil, err
×
2650
        }
×
2651

2652
        return resp, nil
15✔
2653
}
2654

2655
// RequestCancelWorkflowExecution - requests to cancel a workflow execution
2656
func (wh *WorkflowHandler) RequestCancelWorkflowExecution(
2657
        ctx context.Context,
2658
        cancelRequest *types.RequestCancelWorkflowExecutionRequest,
2659
) (retError error) {
14✔
2660
        if wh.isShuttingDown() {
15✔
2661
                return validate.ErrShuttingDown
1✔
2662
        }
1✔
2663

2664
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
14✔
2665
                return err
1✔
2666
        }
1✔
2667

2668
        if cancelRequest == nil {
13✔
2669
                return validate.ErrRequestNotSet
1✔
2670
        }
1✔
2671

2672
        domainName := cancelRequest.GetDomain()
11✔
2673
        wfExecution := cancelRequest.GetWorkflowExecution()
11✔
2674
        if domainName == "" {
12✔
2675
                return validate.ErrDomainNotSet
1✔
2676
        }
1✔
2677
        if err := validate.CheckExecution(wfExecution); err != nil {
11✔
2678
                return err
1✔
2679
        }
1✔
2680

2681
        domainID, err := wh.GetDomainCache().GetDomainID(cancelRequest.GetDomain())
9✔
2682
        if err != nil {
10✔
2683
                return err
1✔
2684
        }
1✔
2685

2686
        err = wh.GetHistoryClient().RequestCancelWorkflowExecution(ctx, &types.HistoryRequestCancelWorkflowExecutionRequest{
8✔
2687
                DomainUUID:    domainID,
8✔
2688
                CancelRequest: cancelRequest,
8✔
2689
        })
8✔
2690
        if err != nil {
12✔
2691
                return wh.normalizeVersionedErrors(ctx, err)
4✔
2692
        }
4✔
2693

2694
        return nil
4✔
2695
}
2696

2697
// ListOpenWorkflowExecutions - retrieves info for open workflow executions in a domain
2698
func (wh *WorkflowHandler) ListOpenWorkflowExecutions(
2699
        ctx context.Context,
2700
        listRequest *types.ListOpenWorkflowExecutionsRequest,
2701
) (resp *types.ListOpenWorkflowExecutionsResponse, retError error) {
106✔
2702
        if wh.isShuttingDown() {
106✔
2703
                return nil, validate.ErrShuttingDown
×
2704
        }
×
2705

2706
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
106✔
2707
                return nil, err
×
2708
        }
×
2709

2710
        if listRequest == nil {
106✔
2711
                return nil, validate.ErrRequestNotSet
×
2712
        }
×
2713

2714
        if listRequest.GetDomain() == "" {
106✔
2715
                return nil, validate.ErrDomainNotSet
×
2716
        }
×
2717

2718
        if listRequest.StartTimeFilter == nil {
106✔
2719
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2720
        }
×
2721

2722
        if listRequest.StartTimeFilter.EarliestTime == nil {
106✔
2723
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2724
        }
×
2725

2726
        if listRequest.StartTimeFilter.LatestTime == nil {
106✔
2727
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2728
        }
×
2729

2730
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
106✔
2731
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2732
        }
×
2733

2734
        if listRequest.ExecutionFilter != nil && listRequest.TypeFilter != nil {
106✔
2735
                return nil, &types.BadRequestError{
×
2736
                        Message: "Only one of ExecutionFilter or TypeFilter is allowed"}
×
2737
        }
×
2738

2739
        if listRequest.GetMaximumPageSize() <= 0 {
167✔
2740
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
61✔
2741
        }
61✔
2742

2743
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
106✔
2744
                return nil, &types.BadRequestError{
×
2745
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2746
        }
×
2747

2748
        domain := listRequest.GetDomain()
106✔
2749
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
106✔
2750
        if err != nil {
106✔
2751
                return nil, err
×
2752
        }
×
2753

2754
        baseReq := persistence.ListWorkflowExecutionsRequest{
106✔
2755
                DomainUUID:    domainID,
106✔
2756
                Domain:        domain,
106✔
2757
                PageSize:      int(listRequest.GetMaximumPageSize()),
106✔
2758
                NextPageToken: listRequest.NextPageToken,
106✔
2759
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
106✔
2760
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
106✔
2761
        }
106✔
2762

106✔
2763
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
106✔
2764
        if listRequest.ExecutionFilter != nil {
206✔
2765
                if wh.config.DisableListVisibilityByFilter(domain) {
101✔
2766
                        err = validate.ErrNoPermission
1✔
2767
                } else {
100✔
2768
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByWorkflowID(
99✔
2769
                                ctx,
99✔
2770
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
99✔
2771
                                        ListWorkflowExecutionsRequest: baseReq,
99✔
2772
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
99✔
2773
                                })
99✔
2774
                }
99✔
2775
                wh.GetLogger().Debug("List open workflow with filter",
100✔
2776
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
100✔
2777
        } else if listRequest.TypeFilter != nil {
7✔
2778
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2779
                        err = validate.ErrNoPermission
1✔
2780
                } else {
1✔
2781
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByType(
×
2782
                                ctx,
×
2783
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2784
                                        ListWorkflowExecutionsRequest: baseReq,
×
2785
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2786
                                },
×
2787
                        )
×
2788
                }
×
2789
                wh.GetLogger().Debug("List open workflow with filter",
1✔
2790
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2791
        } else {
5✔
2792
                persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutions(ctx, &baseReq)
5✔
2793
        }
5✔
2794

2795
        if err != nil {
108✔
2796
                return nil, err
2✔
2797
        }
2✔
2798

2799
        resp = &types.ListOpenWorkflowExecutionsResponse{}
104✔
2800
        resp.Executions = persistenceResp.Executions
104✔
2801
        resp.NextPageToken = persistenceResp.NextPageToken
104✔
2802
        return resp, nil
104✔
2803
}
2804

2805
// ListArchivedWorkflowExecutions - retrieves archived info for closed workflow executions in a domain
2806
func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(
2807
        ctx context.Context,
2808
        listRequest *types.ListArchivedWorkflowExecutionsRequest,
2809
) (resp *types.ListArchivedWorkflowExecutionsResponse, retError error) {
15✔
2810
        if wh.isShuttingDown() {
15✔
2811
                return nil, validate.ErrShuttingDown
×
2812
        }
×
2813

2814
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2815
                return nil, err
×
2816
        }
×
2817

2818
        if listRequest == nil {
15✔
2819
                return nil, validate.ErrRequestNotSet
×
2820
        }
×
2821

2822
        if listRequest.GetDomain() == "" {
16✔
2823
                return nil, validate.ErrDomainNotSet
1✔
2824
        }
1✔
2825

2826
        if listRequest.GetPageSize() <= 0 {
14✔
2827
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
2828
        }
×
2829

2830
        maxPageSize := wh.config.VisibilityArchivalQueryMaxPageSize()
14✔
2831
        if int(listRequest.GetPageSize()) > maxPageSize {
14✔
2832
                return nil, &types.BadRequestError{
×
2833
                        Message: fmt.Sprintf("Pagesize is larger than allowed %d", maxPageSize)}
×
2834
        }
×
2835

2836
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() {
15✔
2837
                return nil, &types.BadRequestError{Message: "Cluster is not configured for visibility archival"}
1✔
2838
        }
1✔
2839

2840
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ReadEnabled() {
13✔
2841
                return nil, &types.BadRequestError{Message: "Cluster is not configured for reading archived visibility records"}
×
2842
        }
×
2843

2844
        entry, err := wh.GetDomainCache().GetDomain(listRequest.GetDomain())
13✔
2845
        if err != nil {
14✔
2846
                return nil, err
1✔
2847
        }
1✔
2848

2849
        if entry.GetConfig().VisibilityArchivalStatus != types.ArchivalStatusEnabled {
14✔
2850
                return nil, &types.BadRequestError{Message: "Domain is not configured for visibility archival"}
2✔
2851
        }
2✔
2852

2853
        URI, err := archiver.NewURI(entry.GetConfig().VisibilityArchivalURI)
10✔
2854
        if err != nil {
10✔
2855
                return nil, err
×
2856
        }
×
2857

2858
        visibilityArchiver, err := wh.GetArchiverProvider().GetVisibilityArchiver(URI.Scheme(), service.Frontend)
10✔
2859
        if err != nil {
10✔
2860
                return nil, err
×
2861
        }
×
2862

2863
        archiverRequest := &archiver.QueryVisibilityRequest{
10✔
2864
                DomainID:      entry.GetInfo().ID,
10✔
2865
                PageSize:      int(listRequest.GetPageSize()),
10✔
2866
                NextPageToken: listRequest.NextPageToken,
10✔
2867
                Query:         listRequest.GetQuery(),
10✔
2868
        }
10✔
2869

10✔
2870
        archiverResponse, err := visibilityArchiver.Query(ctx, URI, archiverRequest)
10✔
2871
        if err != nil {
10✔
2872
                return nil, err
×
2873
        }
×
2874

2875
        // special handling of ExecutionTime for cron or retry
2876
        for _, execution := range archiverResponse.Executions {
25✔
2877
                if execution.GetExecutionTime() == 0 {
30✔
2878
                        execution.ExecutionTime = common.Int64Ptr(execution.GetStartTime())
15✔
2879
                }
15✔
2880
        }
2881

2882
        return &types.ListArchivedWorkflowExecutionsResponse{
10✔
2883
                Executions:    archiverResponse.Executions,
10✔
2884
                NextPageToken: archiverResponse.NextPageToken,
10✔
2885
        }, nil
10✔
2886
}
2887

2888
// ListClosedWorkflowExecutions - retrieves info for closed workflow executions in a domain
2889
func (wh *WorkflowHandler) ListClosedWorkflowExecutions(
2890
        ctx context.Context,
2891
        listRequest *types.ListClosedWorkflowExecutionsRequest,
2892
) (resp *types.ListClosedWorkflowExecutionsResponse, retError error) {
28✔
2893
        if wh.isShuttingDown() {
28✔
2894
                return nil, validate.ErrShuttingDown
×
2895
        }
×
2896

2897
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
28✔
2898
                return nil, err
×
2899
        }
×
2900

2901
        if listRequest == nil {
28✔
2902
                return nil, validate.ErrRequestNotSet
×
2903
        }
×
2904

2905
        if listRequest.GetDomain() == "" {
28✔
2906
                return nil, validate.ErrDomainNotSet
×
2907
        }
×
2908

2909
        if listRequest.StartTimeFilter == nil {
28✔
2910
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2911
        }
×
2912

2913
        if listRequest.StartTimeFilter.EarliestTime == nil {
28✔
2914
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2915
        }
×
2916

2917
        if listRequest.StartTimeFilter.LatestTime == nil {
28✔
2918
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2919
        }
×
2920

2921
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
28✔
2922
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2923
        }
×
2924

2925
        filterCount := 0
28✔
2926
        if listRequest.TypeFilter != nil {
29✔
2927
                filterCount++
1✔
2928
        }
1✔
2929
        if listRequest.StatusFilter != nil {
29✔
2930
                filterCount++
1✔
2931
        }
1✔
2932

2933
        if filterCount > 1 {
28✔
2934
                return nil, &types.BadRequestError{
×
2935
                        Message: "Only one of ExecutionFilter, TypeFilter or StatusFilter is allowed"}
×
2936
        } // If ExecutionFilter is provided with one of TypeFilter or StatusFilter, use ExecutionFilter and ignore other filter
×
2937

2938
        if listRequest.GetMaximumPageSize() <= 0 {
29✔
2939
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
1✔
2940
        }
1✔
2941

2942
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
28✔
2943
                return nil, &types.BadRequestError{
×
2944
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2945
        }
×
2946

2947
        domain := listRequest.GetDomain()
28✔
2948
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
28✔
2949
        if err != nil {
28✔
2950
                return nil, err
×
2951
        }
×
2952

2953
        baseReq := persistence.ListWorkflowExecutionsRequest{
28✔
2954
                DomainUUID:    domainID,
28✔
2955
                Domain:        domain,
28✔
2956
                PageSize:      int(listRequest.GetMaximumPageSize()),
28✔
2957
                NextPageToken: listRequest.NextPageToken,
28✔
2958
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
28✔
2959
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
28✔
2960
        }
28✔
2961

28✔
2962
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
28✔
2963
        if listRequest.ExecutionFilter != nil {
44✔
2964
                if wh.config.DisableListVisibilityByFilter(domain) {
17✔
2965
                        err = validate.ErrNoPermission
1✔
2966
                } else {
16✔
2967
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByWorkflowID(
15✔
2968
                                ctx,
15✔
2969
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
15✔
2970
                                        ListWorkflowExecutionsRequest: baseReq,
15✔
2971
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
15✔
2972
                                },
15✔
2973
                        )
15✔
2974
                }
15✔
2975
                wh.GetLogger().Debug("List closed workflow with filter",
16✔
2976
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
16✔
2977
        } else if listRequest.TypeFilter != nil {
13✔
2978
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2979
                        err = validate.ErrNoPermission
1✔
2980
                } else {
1✔
2981
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByType(
×
2982
                                ctx,
×
2983
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2984
                                        ListWorkflowExecutionsRequest: baseReq,
×
2985
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2986
                                },
×
2987
                        )
×
2988
                }
×
2989
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
2990
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2991
        } else if listRequest.StatusFilter != nil {
12✔
2992
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2993
                        err = validate.ErrNoPermission
1✔
2994
                } else {
1✔
2995
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByStatus(
×
2996
                                ctx,
×
2997
                                &persistence.ListClosedWorkflowExecutionsByStatusRequest{
×
2998
                                        ListWorkflowExecutionsRequest: baseReq,
×
2999
                                        Status:                        listRequest.GetStatusFilter(),
×
3000
                                },
×
3001
                        )
×
3002
                }
×
3003
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
3004
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByStatus)
1✔
3005
        } else {
10✔
3006
                persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutions(ctx, &baseReq)
10✔
3007
        }
10✔
3008

3009
        if err != nil {
31✔
3010
                return nil, err
3✔
3011
        }
3✔
3012

3013
        resp = &types.ListClosedWorkflowExecutionsResponse{}
25✔
3014
        resp.Executions = persistenceResp.Executions
25✔
3015
        resp.NextPageToken = persistenceResp.NextPageToken
25✔
3016
        return resp, nil
25✔
3017
}
3018

3019
// ListWorkflowExecutions - retrieves info for workflow executions in a domain
3020
func (wh *WorkflowHandler) ListWorkflowExecutions(
3021
        ctx context.Context,
3022
        listRequest *types.ListWorkflowExecutionsRequest,
3023
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
143✔
3024
        if wh.isShuttingDown() {
143✔
3025
                return nil, validate.ErrShuttingDown
×
3026
        }
×
3027

3028
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
143✔
3029
                return nil, err
×
3030
        }
×
3031

3032
        if listRequest == nil {
143✔
3033
                return nil, validate.ErrRequestNotSet
×
3034
        }
×
3035

3036
        if listRequest.GetDomain() == "" {
143✔
3037
                return nil, validate.ErrDomainNotSet
×
3038
        }
×
3039

3040
        if listRequest.GetPageSize() <= 0 {
143✔
3041
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3042
        }
×
3043

3044
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
143✔
3045
                return nil, &types.BadRequestError{
×
3046
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3047
        }
×
3048

3049
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
143✔
3050
        if err != nil {
146✔
3051
                return nil, err
3✔
3052
        }
3✔
3053

3054
        domain := listRequest.GetDomain()
140✔
3055
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
140✔
3056
        if err != nil {
140✔
3057
                return nil, err
×
3058
        }
×
3059

3060
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
140✔
3061
                DomainUUID:    domainID,
140✔
3062
                Domain:        domain,
140✔
3063
                PageSize:      int(listRequest.GetPageSize()),
140✔
3064
                NextPageToken: listRequest.NextPageToken,
140✔
3065
                Query:         validatedQuery,
140✔
3066
        }
140✔
3067
        persistenceResp, err := wh.GetVisibilityManager().ListWorkflowExecutions(ctx, req)
140✔
3068
        if err != nil {
140✔
3069
                return nil, err
×
3070
        }
×
3071

3072
        resp = &types.ListWorkflowExecutionsResponse{}
140✔
3073
        resp.Executions = persistenceResp.Executions
140✔
3074
        resp.NextPageToken = persistenceResp.NextPageToken
140✔
3075
        return resp, nil
140✔
3076
}
3077

3078
// ListAllWorkflowExecutions - retrieves info for all workflow executions in a domain
3079
func (wh *WorkflowHandler) ListAllWorkflowExecutions(
3080
        ctx context.Context,
3081
        listRequest *types.ListAllWorkflowExecutionsRequest,
3082
) (resp *types.ListAllWorkflowExecutionsResponse, retError error) {
4✔
3083
        if wh.isShuttingDown() {
4✔
3084
                return nil, validate.ErrShuttingDown
×
3085
        }
×
3086

3087
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
4✔
3088
                return nil, err
×
3089
        }
×
3090

3091
        if listRequest == nil {
5✔
3092
                return nil, validate.ErrRequestNotSet
1✔
3093
        }
1✔
3094

3095
        if listRequest.GetDomain() == "" {
3✔
3096
                return nil, validate.ErrDomainNotSet
×
3097
        }
×
3098

3099
        if listRequest.StartTimeFilter == nil {
4✔
3100
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
1✔
3101
        }
1✔
3102

3103
        if listRequest.StartTimeFilter.EarliestTime == nil {
2✔
3104
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
3105
        }
×
3106

3107
        if listRequest.StartTimeFilter.LatestTime == nil {
2✔
3108
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
3109
        }
×
3110

3111
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
3✔
3112
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
1✔
3113
        }
1✔
3114

3115
        if listRequest.GetMaximumPageSize() <= 0 {
2✔
3116
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
1✔
3117
        }
1✔
3118

3119
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
1✔
3120
                return nil, &types.BadRequestError{
×
3121
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3122
        }
×
3123

3124
        domain := listRequest.GetDomain()
1✔
3125
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
1✔
3126
        if err != nil {
1✔
3127
                return nil, err
×
3128
        }
×
3129

3130
        baseReq := persistence.ListWorkflowExecutionsRequest{
1✔
3131
                DomainUUID:    domainID,
1✔
3132
                Domain:        domain,
1✔
3133
                PageSize:      int(listRequest.GetMaximumPageSize()),
1✔
3134
                NextPageToken: listRequest.NextPageToken,
1✔
3135
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
1✔
3136
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
1✔
3137
        }
1✔
3138

1✔
3139
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
1✔
3140

1✔
3141
        persistenceResp, err = wh.GetVisibilityManager().ListAllWorkflowExecutions(
1✔
3142
                ctx,
1✔
3143
                &persistence.ListAllWorkflowExecutionsRequest{
1✔
3144
                        ListWorkflowExecutionsRequest: baseReq,
1✔
3145
                        PartialMatch:                  listRequest.PartialMatch,
1✔
3146
                        WorkflowSearchValue:           listRequest.WorkflowSearchValue,
1✔
3147
                },
1✔
3148
        )
1✔
3149
        if err != nil {
1✔
3150
                return nil, err
×
3151
        }
×
3152

3153
        wh.GetLogger().Debug("List all workflows",
1✔
3154
                tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
3155

1✔
3156
        resp = &types.ListAllWorkflowExecutionsResponse{}
1✔
3157
        resp.Executions = persistenceResp.Executions
1✔
3158
        resp.NextPageToken = persistenceResp.NextPageToken
1✔
3159
        return resp, nil
1✔
3160
}
3161

3162
// RestartWorkflowExecution - retrieves info for an existing workflow then restarts it
3163
func (wh *WorkflowHandler) RestartWorkflowExecution(ctx context.Context, request *types.RestartWorkflowExecutionRequest) (resp *types.RestartWorkflowExecutionResponse, retError error) {
2✔
3164
        if wh.isShuttingDown() {
2✔
3165
                return nil, validate.ErrShuttingDown
×
3166
        }
×
3167

3168
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
3169
                return nil, err
×
3170
        }
×
3171

3172
        if request == nil {
2✔
3173
                return nil, validate.ErrRequestNotSet
×
3174
        }
×
3175

3176
        domainName := request.GetDomain()
2✔
3177
        wfExecution := request.GetWorkflowExecution()
2✔
3178

2✔
3179
        if request.GetDomain() == "" {
2✔
3180
                return nil, validate.ErrDomainNotSet
×
3181
        }
×
3182

3183
        if err := validate.CheckExecution(wfExecution); err != nil {
2✔
3184
                return nil, err
×
3185
        }
×
3186

3187
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
2✔
3188
        if err != nil {
2✔
3189
                return nil, err
×
3190
        }
×
3191

3192
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
2✔
3193
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
3✔
3194
                return nil, &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
3195
        }
1✔
3196

3197
        history, err := wh.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
1✔
3198
                Domain: domainName,
1✔
3199
                Execution: &types.WorkflowExecution{
1✔
3200
                        WorkflowID: wfExecution.WorkflowID,
1✔
3201
                        RunID:      wfExecution.RunID,
1✔
3202
                },
1✔
3203
                SkipArchival: true,
1✔
3204
        })
1✔
3205
        if err != nil {
1✔
3206
                return nil, validate.ErrHistoryNotFound
×
3207
        }
×
3208
        startRequest := constructRestartWorkflowRequest(history.History.Events[0].WorkflowExecutionStartedEventAttributes,
1✔
3209
                domainName, request.Identity, wfExecution.WorkflowID)
1✔
3210
        req, err := common.CreateHistoryStartWorkflowRequest(domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
1✔
3211
        if err != nil {
1✔
3212
                return nil, err
×
3213
        }
×
3214
        startResp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, req)
1✔
3215
        if err != nil {
1✔
3216
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3217
        }
×
3218
        resp = &types.RestartWorkflowExecutionResponse{
1✔
3219
                RunID: startResp.RunID,
1✔
3220
        }
1✔
3221

1✔
3222
        return resp, nil
1✔
3223
}
3224

3225
// ScanWorkflowExecutions - retrieves info for large amount of workflow executions in a domain without order
3226
func (wh *WorkflowHandler) ScanWorkflowExecutions(
3227
        ctx context.Context,
3228
        listRequest *types.ListWorkflowExecutionsRequest,
3229
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
27✔
3230
        if wh.isShuttingDown() {
27✔
3231
                return nil, validate.ErrShuttingDown
×
3232
        }
×
3233

3234
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
27✔
3235
                return nil, err
×
3236
        }
×
3237

3238
        if listRequest == nil {
27✔
3239
                return nil, validate.ErrRequestNotSet
×
3240
        }
×
3241

3242
        if listRequest.GetDomain() == "" {
27✔
3243
                return nil, validate.ErrDomainNotSet
×
3244
        }
×
3245

3246
        if listRequest.GetPageSize() <= 0 {
27✔
3247
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3248
        }
×
3249

3250
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
27✔
3251
                return nil, &types.BadRequestError{
×
3252
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3253
        }
×
3254

3255
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
27✔
3256
        if err != nil {
28✔
3257
                return nil, err
1✔
3258
        }
1✔
3259

3260
        domain := listRequest.GetDomain()
26✔
3261
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
26✔
3262
        if err != nil {
26✔
3263
                return nil, err
×
3264
        }
×
3265

3266
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
26✔
3267
                DomainUUID:    domainID,
26✔
3268
                Domain:        domain,
26✔
3269
                PageSize:      int(listRequest.GetPageSize()),
26✔
3270
                NextPageToken: listRequest.NextPageToken,
26✔
3271
                Query:         validatedQuery,
26✔
3272
        }
26✔
3273
        persistenceResp, err := wh.GetVisibilityManager().ScanWorkflowExecutions(ctx, req)
26✔
3274
        if err != nil {
26✔
3275
                return nil, err
×
3276
        }
×
3277

3278
        resp = &types.ListWorkflowExecutionsResponse{}
26✔
3279
        resp.Executions = persistenceResp.Executions
26✔
3280
        resp.NextPageToken = persistenceResp.NextPageToken
26✔
3281
        return resp, nil
26✔
3282
}
3283

3284
// CountWorkflowExecutions - count number of workflow executions in a domain
3285
func (wh *WorkflowHandler) CountWorkflowExecutions(
3286
        ctx context.Context,
3287
        countRequest *types.CountWorkflowExecutionsRequest,
3288
) (resp *types.CountWorkflowExecutionsResponse, retError error) {
14✔
3289
        if wh.isShuttingDown() {
14✔
3290
                return nil, validate.ErrShuttingDown
×
3291
        }
×
3292

3293
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
14✔
3294
                return nil, err
×
3295
        }
×
3296

3297
        if countRequest == nil {
14✔
3298
                return nil, validate.ErrRequestNotSet
×
3299
        }
×
3300

3301
        if countRequest.GetDomain() == "" {
14✔
3302
                return nil, validate.ErrDomainNotSet
×
3303
        }
×
3304

3305
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(countRequest.GetQuery())
14✔
3306
        if err != nil {
15✔
3307
                return nil, err
1✔
3308
        }
1✔
3309

3310
        domain := countRequest.GetDomain()
13✔
3311
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
13✔
3312
        if err != nil {
13✔
3313
                return nil, err
×
3314
        }
×
3315

3316
        req := &persistence.CountWorkflowExecutionsRequest{
13✔
3317
                DomainUUID: domainID,
13✔
3318
                Domain:     domain,
13✔
3319
                Query:      validatedQuery,
13✔
3320
        }
13✔
3321
        persistenceResp, err := wh.GetVisibilityManager().CountWorkflowExecutions(ctx, req)
13✔
3322
        if err != nil {
13✔
3323
                return nil, err
×
3324
        }
×
3325

3326
        resp = &types.CountWorkflowExecutionsResponse{
13✔
3327
                Count: persistenceResp.Count,
13✔
3328
        }
13✔
3329
        return resp, nil
13✔
3330
}
3331

3332
// GetSearchAttributes return valid indexed keys
3333
func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context) (resp *types.GetSearchAttributesResponse, retError error) {
1✔
3334
        if wh.isShuttingDown() {
1✔
3335
                return nil, validate.ErrShuttingDown
×
3336
        }
×
3337

3338
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1✔
3339
                return nil, err
×
3340
        }
×
3341

3342
        keys := wh.config.ValidSearchAttributes()
1✔
3343
        resp = &types.GetSearchAttributesResponse{
1✔
3344
                Keys: wh.convertIndexedKeyToThrift(keys),
1✔
3345
        }
1✔
3346
        return resp, nil
1✔
3347
}
3348

3349
// ResetStickyTaskList reset the volatile information in mutable state of a given workflow.
3350
func (wh *WorkflowHandler) ResetStickyTaskList(
3351
        ctx context.Context,
3352
        resetRequest *types.ResetStickyTaskListRequest,
3353
) (resp *types.ResetStickyTaskListResponse, retError error) {
3✔
3354
        if wh.isShuttingDown() {
3✔
3355
                return nil, validate.ErrShuttingDown
×
3356
        }
×
3357

3358
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
3359
                return nil, err
×
3360
        }
×
3361

3362
        if resetRequest == nil {
3✔
3363
                return nil, validate.ErrRequestNotSet
×
3364
        }
×
3365

3366
        domainName := resetRequest.GetDomain()
3✔
3367
        wfExecution := resetRequest.GetExecution()
3✔
3368

3✔
3369
        if domainName == "" {
3✔
3370
                return nil, validate.ErrDomainNotSet
×
3371
        }
×
3372

3373
        if err := validate.CheckExecution(wfExecution); err != nil {
3✔
3374
                return nil, err
×
3375
        }
×
3376

3377
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
3✔
3378
        if err != nil {
3✔
3379
                return nil, err
×
3380
        }
×
3381

3382
        _, err = wh.GetHistoryClient().ResetStickyTaskList(ctx, &types.HistoryResetStickyTaskListRequest{
3✔
3383
                DomainUUID: domainID,
3✔
3384
                Execution:  resetRequest.Execution,
3✔
3385
        })
3✔
3386
        if err != nil {
3✔
3387
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3388
        }
×
3389
        return &types.ResetStickyTaskListResponse{}, nil
3✔
3390
}
3391

3392
// QueryWorkflow returns query result for a specified workflow execution
3393
func (wh *WorkflowHandler) QueryWorkflow(
3394
        ctx context.Context,
3395
        queryRequest *types.QueryWorkflowRequest,
3396
) (resp *types.QueryWorkflowResponse, retError error) {
57✔
3397
        if wh.isShuttingDown() {
58✔
3398
                return nil, validate.ErrShuttingDown
1✔
3399
        }
1✔
3400

3401
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
57✔
3402
                return nil, err
1✔
3403
        }
1✔
3404

3405
        if queryRequest == nil {
56✔
3406
                return nil, validate.ErrRequestNotSet
1✔
3407
        }
1✔
3408

3409
        domainName := queryRequest.GetDomain()
54✔
3410
        wfExecution := queryRequest.GetExecution()
54✔
3411

54✔
3412
        if domainName == "" {
55✔
3413
                return nil, validate.ErrDomainNotSet
1✔
3414
        }
1✔
3415

3416
        if err := validate.CheckExecution(wfExecution); err != nil {
54✔
3417
                return nil, err
1✔
3418
        }
1✔
3419

3420
        if wh.config.DisallowQuery(domainName) {
53✔
3421
                return nil, validate.ErrQueryDisallowedForDomain
1✔
3422
        }
1✔
3423

3424
        if queryRequest.Query == nil {
52✔
3425
                return nil, validate.ErrQueryNotSet
1✔
3426
        }
1✔
3427

3428
        if queryRequest.Query.GetQueryType() == "" {
51✔
3429
                return nil, validate.ErrQueryTypeNotSet
1✔
3430
        }
1✔
3431

3432
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
49✔
3433
        if err != nil {
50✔
3434
                return nil, err
1✔
3435
        }
1✔
3436

3437
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
48✔
3438
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
48✔
3439

48✔
3440
        scope := getMetricsScopeWithDomain(metrics.FrontendQueryWorkflowScope, queryRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
48✔
3441
        if err := common.CheckEventBlobSizeLimit(
48✔
3442
                len(queryRequest.GetQuery().GetQueryArgs()),
48✔
3443
                sizeLimitWarn,
48✔
3444
                sizeLimitError,
48✔
3445
                domainID,
48✔
3446
                queryRequest.GetExecution().GetWorkflowID(),
48✔
3447
                queryRequest.GetExecution().GetRunID(),
48✔
3448
                scope,
48✔
3449
                wh.GetThrottledLogger(),
48✔
3450
                tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil {
49✔
3451
                return nil, err
1✔
3452
        }
1✔
3453

3454
        req := &types.HistoryQueryWorkflowRequest{
47✔
3455
                DomainUUID: domainID,
47✔
3456
                Request:    queryRequest,
47✔
3457
        }
47✔
3458
        hResponse, err := wh.GetHistoryClient().QueryWorkflow(ctx, req)
47✔
3459
        if err != nil {
60✔
3460
                return nil, err
13✔
3461
        }
13✔
3462
        return hResponse.GetResponse(), nil
34✔
3463
}
3464

3465
// DescribeWorkflowExecution returns information about the specified workflow execution.
3466
func (wh *WorkflowHandler) DescribeWorkflowExecution(
3467
        ctx context.Context,
3468
        request *types.DescribeWorkflowExecutionRequest,
3469
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
101✔
3470
        if wh.isShuttingDown() {
102✔
3471
                return nil, validate.ErrShuttingDown
1✔
3472
        }
1✔
3473

3474
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
101✔
3475
                return nil, err
1✔
3476
        }
1✔
3477

3478
        if request == nil {
100✔
3479
                return nil, validate.ErrRequestNotSet
1✔
3480
        }
1✔
3481

3482
        domainName := request.GetDomain()
98✔
3483
        wfExecution := request.GetExecution()
98✔
3484
        if domainName == "" {
99✔
3485
                return nil, validate.ErrDomainNotSet
1✔
3486
        }
1✔
3487

3488
        if err := validate.CheckExecution(wfExecution); err != nil {
98✔
3489
                return nil, err
1✔
3490
        }
1✔
3491

3492
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
96✔
3493
        if err != nil {
97✔
3494
                return nil, err
1✔
3495
        }
1✔
3496

3497
        response, err := wh.GetHistoryClient().DescribeWorkflowExecution(ctx, &types.HistoryDescribeWorkflowExecutionRequest{
95✔
3498
                DomainUUID: domainID,
95✔
3499
                Request:    request,
95✔
3500
        })
95✔
3501

95✔
3502
        if err != nil {
96✔
3503
                return nil, err
1✔
3504
        }
1✔
3505

3506
        return response, nil
94✔
3507
}
3508

3509
// DescribeTaskList returns information about the target tasklist, right now this API returns the
3510
// pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true,
3511
// it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
3512
func (wh *WorkflowHandler) DescribeTaskList(
3513
        ctx context.Context,
3514
        request *types.DescribeTaskListRequest,
3515
) (resp *types.DescribeTaskListResponse, retError error) {
18✔
3516
        if wh.isShuttingDown() {
18✔
3517
                return nil, validate.ErrShuttingDown
×
3518
        }
×
3519

3520
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
18✔
3521
                return nil, err
×
3522
        }
×
3523

3524
        if request == nil {
18✔
3525
                return nil, validate.ErrRequestNotSet
×
3526
        }
×
3527

3528
        if request.GetDomain() == "" {
18✔
3529
                return nil, validate.ErrDomainNotSet
×
3530
        }
×
3531

3532
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
18✔
3533
        if err != nil {
18✔
3534
                return nil, err
×
3535
        }
×
3536

3537
        scope := getMetricsScopeWithDomain(metrics.FrontendDescribeTaskListScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
18✔
3538
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
18✔
3539
                return nil, err
×
3540
        }
×
3541

3542
        if request.TaskListType == nil {
18✔
3543
                return nil, validate.ErrTaskListTypeNotSet
×
3544
        }
×
3545

3546
        response, err := wh.GetMatchingClient().DescribeTaskList(ctx, &types.MatchingDescribeTaskListRequest{
18✔
3547
                DomainUUID:  domainID,
18✔
3548
                DescRequest: request,
18✔
3549
        })
18✔
3550
        if err != nil {
18✔
3551
                return nil, err
×
3552
        }
×
3553

3554
        return response, nil
18✔
3555
}
3556

3557
// ListTaskListPartitions returns all the partition and host for a taskList
3558
func (wh *WorkflowHandler) ListTaskListPartitions(
3559
        ctx context.Context,
3560
        request *types.ListTaskListPartitionsRequest,
3561
) (resp *types.ListTaskListPartitionsResponse, retError error) {
×
3562
        if wh.isShuttingDown() {
×
3563
                return nil, validate.ErrShuttingDown
×
3564
        }
×
3565

3566
        if request == nil {
×
3567
                return nil, validate.ErrRequestNotSet
×
3568
        }
×
3569

3570
        if request.GetDomain() == "" {
×
3571
                return nil, validate.ErrDomainNotSet
×
3572
        }
×
3573

3574
        scope := getMetricsScopeWithDomain(metrics.FrontendListTaskListPartitionsScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
3575
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
×
3576
                return nil, err
×
3577
        }
×
3578

3579
        resp, err := wh.GetMatchingClient().ListTaskListPartitions(ctx, &types.MatchingListTaskListPartitionsRequest{
×
3580
                Domain:   request.Domain,
×
3581
                TaskList: request.TaskList,
×
3582
        })
×
3583
        return resp, err
×
3584
}
3585

3586
// GetTaskListsByDomain returns all the partition and host for a taskList
3587
func (wh *WorkflowHandler) GetTaskListsByDomain(
3588
        ctx context.Context,
3589
        request *types.GetTaskListsByDomainRequest,
3590
) (resp *types.GetTaskListsByDomainResponse, retError error) {
×
3591
        if wh.isShuttingDown() {
×
3592
                return nil, validate.ErrShuttingDown
×
3593
        }
×
3594

3595
        if request == nil {
×
3596
                return nil, validate.ErrRequestNotSet
×
3597
        }
×
3598

3599
        if request.GetDomain() == "" {
×
3600
                return nil, validate.ErrDomainNotSet
×
3601
        }
×
3602

3603
        resp, err := wh.GetMatchingClient().GetTaskListsByDomain(ctx, &types.GetTaskListsByDomainRequest{
×
3604
                Domain: request.Domain,
×
3605
        })
×
3606
        return resp, err
×
3607
}
3608

3609
// RefreshWorkflowTasks re-generates the workflow tasks
3610
func (wh *WorkflowHandler) RefreshWorkflowTasks(
3611
        ctx context.Context,
3612
        request *types.RefreshWorkflowTasksRequest,
3613
) (err error) {
×
3614
        if request == nil {
×
3615
                return validate.ErrRequestNotSet
×
3616
        }
×
3617
        if err := validate.CheckExecution(request.Execution); err != nil {
×
3618
                return err
×
3619
        }
×
3620
        domainEntry, err := wh.GetDomainCache().GetDomain(request.GetDomain())
×
3621
        if err != nil {
×
3622
                return err
×
3623
        }
×
3624

3625
        err = wh.GetHistoryClient().RefreshWorkflowTasks(ctx, &types.HistoryRefreshWorkflowTasksRequest{
×
3626
                DomainUIID: domainEntry.GetInfo().ID,
×
3627
                Request:    request,
×
3628
        })
×
3629
        if err != nil {
×
3630
                return err
×
3631
        }
×
3632
        return nil
×
3633
}
3634

3635
func (wh *WorkflowHandler) getRawHistory(
3636
        ctx context.Context,
3637
        scope metrics.Scope,
3638
        domainID string,
3639
        domainName string,
3640
        execution types.WorkflowExecution,
3641
        firstEventID int64,
3642
        nextEventID int64,
3643
        pageSize int32,
3644
        nextPageToken []byte,
3645
        transientDecision *types.TransientDecisionInfo,
3646
        branchToken []byte,
3647
) ([]*types.DataBlob, []byte, error) {
2✔
3648
        rawHistory := []*types.DataBlob{}
2✔
3649
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
2✔
3650

2✔
3651
        resp, err := wh.GetHistoryManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
2✔
3652
                BranchToken:   branchToken,
2✔
3653
                MinEventID:    firstEventID,
2✔
3654
                MaxEventID:    nextEventID,
2✔
3655
                PageSize:      int(pageSize),
2✔
3656
                NextPageToken: nextPageToken,
2✔
3657
                ShardID:       common.IntPtr(shardID),
2✔
3658
                DomainName:    domainName,
2✔
3659
        })
2✔
3660
        if err != nil {
2✔
3661
                return nil, nil, err
×
3662
        }
×
3663

3664
        var encoding *types.EncodingType
2✔
3665
        for _, data := range resp.HistoryEventBlobs {
4✔
3666
                switch data.Encoding {
2✔
3667
                case common.EncodingTypeJSON:
×
3668
                        encoding = types.EncodingTypeJSON.Ptr()
×
3669
                case common.EncodingTypeThriftRW:
2✔
3670
                        encoding = types.EncodingTypeThriftRW.Ptr()
2✔
3671
                default:
×
3672
                        panic(fmt.Sprintf("Invalid encoding type for raw history, encoding type: %s", data.Encoding))
×
3673
                }
3674
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3675
                        EncodingType: encoding,
2✔
3676
                        Data:         data.Data,
2✔
3677
                })
2✔
3678
        }
3679

3680
        if len(resp.NextPageToken) == 0 && transientDecision != nil {
4✔
3681
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
2✔
3682
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3683
                        wh.GetLogger().Error("getHistory error",
×
3684
                                tag.WorkflowDomainID(domainID),
×
3685
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3686
                                tag.WorkflowRunID(execution.GetRunID()),
×
3687
                                tag.Error(err))
×
3688
                }
×
3689
                blob, err := wh.GetPayloadSerializer().SerializeBatchEvents(
2✔
3690
                        []*types.HistoryEvent{transientDecision.ScheduledEvent, transientDecision.StartedEvent}, common.EncodingTypeThriftRW)
2✔
3691
                if err != nil {
2✔
3692
                        return nil, nil, err
×
3693
                }
×
3694
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3695
                        EncodingType: types.EncodingTypeThriftRW.Ptr(),
2✔
3696
                        Data:         blob.Data,
2✔
3697
                })
2✔
3698
        }
3699

3700
        return rawHistory, resp.NextPageToken, nil
2✔
3701
}
3702

3703
func (wh *WorkflowHandler) getHistory(
3704
        ctx context.Context,
3705
        scope metrics.Scope,
3706
        domainID string,
3707
        domainName string,
3708
        execution types.WorkflowExecution,
3709
        firstEventID, nextEventID int64,
3710
        pageSize int32,
3711
        nextPageToken []byte,
3712
        transientDecision *types.TransientDecisionInfo,
3713
        branchToken []byte,
3714
) (*types.History, []byte, error) {
1,619✔
3715

1,619✔
3716
        var size int
1,619✔
3717

1,619✔
3718
        isFirstPage := len(nextPageToken) == 0
1,619✔
3719
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
1,619✔
3720
        var err error
1,619✔
3721
        historyEvents, size, nextPageToken, err := persistenceutils.ReadFullPageV2Events(ctx, wh.GetHistoryManager(), &persistence.ReadHistoryBranchRequest{
1,619✔
3722
                BranchToken:   branchToken,
1,619✔
3723
                MinEventID:    firstEventID,
1,619✔
3724
                MaxEventID:    nextEventID,
1,619✔
3725
                PageSize:      int(pageSize),
1,619✔
3726
                NextPageToken: nextPageToken,
1,619✔
3727
                ShardID:       common.IntPtr(shardID),
1,619✔
3728
                DomainName:    domainName,
1,619✔
3729
        })
1,619✔
3730

1,619✔
3731
        if err != nil {
1,619✔
3732
                return nil, nil, err
×
3733
        }
×
3734

3735
        scope.RecordTimer(metrics.HistorySize, time.Duration(size))
1,619✔
3736

1,619✔
3737
        isLastPage := len(nextPageToken) == 0
1,619✔
3738
        if err := verifyHistoryIsComplete(
1,619✔
3739
                historyEvents,
1,619✔
3740
                firstEventID,
1,619✔
3741
                nextEventID-1,
1,619✔
3742
                isFirstPage,
1,619✔
3743
                isLastPage,
1,619✔
3744
                int(pageSize)); err != nil {
1,619✔
3745
                scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3746
                wh.GetLogger().Error("getHistory: incomplete history",
×
3747
                        tag.WorkflowDomainID(domainID),
×
3748
                        tag.WorkflowID(execution.GetWorkflowID()),
×
3749
                        tag.WorkflowRunID(execution.GetRunID()),
×
3750
                        tag.Error(err))
×
3751
                return nil, nil, err
×
3752
        }
×
3753

3754
        if len(nextPageToken) == 0 && transientDecision != nil {
1,790✔
3755
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
171✔
3756
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3757
                        wh.GetLogger().Error("getHistory error",
×
3758
                                tag.WorkflowDomainID(domainID),
×
3759
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3760
                                tag.WorkflowRunID(execution.GetRunID()),
×
3761
                                tag.Error(err))
×
3762
                }
×
3763
                // Append the transient decision events once we are done enumerating everything from the events table
3764
                historyEvents = append(historyEvents, transientDecision.ScheduledEvent, transientDecision.StartedEvent)
171✔
3765
        }
3766

3767
        executionHistory := &types.History{}
1,619✔
3768
        executionHistory.Events = historyEvents
1,619✔
3769
        return executionHistory, nextPageToken, nil
1,619✔
3770
}
3771

3772
func (wh *WorkflowHandler) validateTransientDecisionEvents(
3773
        expectedNextEventID int64,
3774
        decision *types.TransientDecisionInfo,
3775
) error {
173✔
3776

173✔
3777
        if decision.ScheduledEvent.ID == expectedNextEventID &&
173✔
3778
                decision.StartedEvent.ID == expectedNextEventID+1 {
346✔
3779
                return nil
173✔
3780
        }
173✔
3781

3782
        return fmt.Errorf(
×
3783
                "invalid transient decision: "+
×
3784
                        "expectedScheduledEventID=%v expectedStartedEventID=%v but have scheduledEventID=%v startedEventID=%v",
×
3785
                expectedNextEventID,
×
3786
                expectedNextEventID+1,
×
3787
                decision.ScheduledEvent.ID,
×
3788
                decision.StartedEvent.ID)
×
3789
}
3790

3791
func (wh *WorkflowHandler) validateTaskList(t *types.TaskList, scope metrics.Scope, domain string) error {
2,769✔
3792
        if t == nil || t.GetName() == "" {
2,770✔
3793
                return validate.ErrTaskListNotSet
1✔
3794
        }
1✔
3795

3796
        if !common.IsValidIDLength(
2,768✔
3797
                t.GetName(),
2,768✔
3798
                scope,
2,768✔
3799
                wh.config.MaxIDLengthWarnLimit(),
2,768✔
3800
                wh.config.TaskListNameMaxLength(domain),
2,768✔
3801
                metrics.CadenceErrTaskListNameExceededWarnLimit,
2,768✔
3802
                domain,
2,768✔
3803
                wh.GetLogger(),
2,768✔
3804
                tag.IDTypeTaskListName) {
2,768✔
3805
                return validate.ErrTaskListTooLong
×
3806
        }
×
3807
        return nil
2,768✔
3808
}
3809

3810
func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
3811
        ctx context.Context,
3812
        scope metrics.Scope,
3813
        domainID string,
3814
        matchingResp *types.MatchingPollForDecisionTaskResponse,
3815
        branchToken []byte,
3816
) (*types.PollForDecisionTaskResponse, error) {
1,239✔
3817

1,239✔
3818
        if matchingResp.WorkflowExecution == nil {
1,293✔
3819
                // this will happen if there is no decision task to be send to worker / caller
54✔
3820
                return &types.PollForDecisionTaskResponse{}, nil
54✔
3821
        }
54✔
3822

3823
        var history *types.History
1,185✔
3824
        var continuation []byte
1,185✔
3825
        var err error
1,185✔
3826

1,185✔
3827
        if matchingResp.GetStickyExecutionEnabled() && matchingResp.Query != nil {
1,191✔
3828
                // meaning sticky query, we should not return any events to worker
6✔
3829
                // since query task only check the current status
6✔
3830
                history = &types.History{
6✔
3831
                        Events: []*types.HistoryEvent{},
6✔
3832
                }
6✔
3833
        } else {
1,185✔
3834
                // here we have 3 cases:
1,179✔
3835
                // 1. sticky && non query task
1,179✔
3836
                // 2. non sticky &&  non query task
1,179✔
3837
                // 3. non sticky && query task
1,179✔
3838
                // for 1, partial history have to be send back
1,179✔
3839
                // for 2 and 3, full history have to be send back
1,179✔
3840

1,179✔
3841
                var persistenceToken []byte
1,179✔
3842

1,179✔
3843
                firstEventID := common.FirstEventID
1,179✔
3844
                nextEventID := matchingResp.GetNextEventID()
1,179✔
3845
                if matchingResp.GetStickyExecutionEnabled() {
1,275✔
3846
                        firstEventID = matchingResp.GetPreviousStartedEventID() + 1
96✔
3847
                }
96✔
3848
                domainName, dErr := wh.GetDomainCache().GetDomainName(domainID)
1,179✔
3849
                if dErr != nil {
1,179✔
3850
                        return nil, dErr
×
3851
                }
×
3852
                scope = scope.Tagged(metrics.DomainTag(domainName))
1,179✔
3853
                history, persistenceToken, err = wh.getHistory(
1,179✔
3854
                        ctx,
1,179✔
3855
                        scope,
1,179✔
3856
                        domainID,
1,179✔
3857
                        domainName,
1,179✔
3858
                        *matchingResp.WorkflowExecution,
1,179✔
3859
                        firstEventID,
1,179✔
3860
                        nextEventID,
1,179✔
3861
                        int32(wh.config.HistoryMaxPageSize(domainName)),
1,179✔
3862
                        nil,
1,179✔
3863
                        matchingResp.DecisionInfo,
1,179✔
3864
                        branchToken,
1,179✔
3865
                )
1,179✔
3866
                if err != nil {
1,179✔
3867
                        return nil, err
×
3868
                }
×
3869

3870
                if len(persistenceToken) != 0 {
1,179✔
3871
                        continuation, err = serializeHistoryToken(&getHistoryContinuationToken{
×
3872
                                RunID:             matchingResp.WorkflowExecution.GetRunID(),
×
3873
                                FirstEventID:      firstEventID,
×
3874
                                NextEventID:       nextEventID,
×
3875
                                PersistenceToken:  persistenceToken,
×
3876
                                TransientDecision: matchingResp.DecisionInfo,
×
3877
                                BranchToken:       branchToken,
×
3878
                        })
×
3879
                        if err != nil {
×
3880
                                return nil, err
×
3881
                        }
×
3882
                }
3883
        }
3884

3885
        resp := &types.PollForDecisionTaskResponse{
1,185✔
3886
                TaskToken:                 matchingResp.TaskToken,
1,185✔
3887
                WorkflowExecution:         matchingResp.WorkflowExecution,
1,185✔
3888
                WorkflowType:              matchingResp.WorkflowType,
1,185✔
3889
                PreviousStartedEventID:    matchingResp.PreviousStartedEventID,
1,185✔
3890
                StartedEventID:            matchingResp.StartedEventID, // this field is not set for query tasks as there's no decision task started event
1,185✔
3891
                Query:                     matchingResp.Query,
1,185✔
3892
                BacklogCountHint:          matchingResp.BacklogCountHint,
1,185✔
3893
                Attempt:                   matchingResp.Attempt,
1,185✔
3894
                History:                   history,
1,185✔
3895
                NextPageToken:             continuation,
1,185✔
3896
                WorkflowExecutionTaskList: matchingResp.WorkflowExecutionTaskList,
1,185✔
3897
                ScheduledTimestamp:        matchingResp.ScheduledTimestamp,
1,185✔
3898
                StartedTimestamp:          matchingResp.StartedTimestamp,
1,185✔
3899
                Queries:                   matchingResp.Queries,
1,185✔
3900
                NextEventID:               matchingResp.NextEventID,
1,185✔
3901
                TotalHistoryBytes:         matchingResp.TotalHistoryBytes,
1,185✔
3902
        }
1,185✔
3903

1,185✔
3904
        return resp, nil
1,185✔
3905
}
3906

3907
func verifyHistoryIsComplete(
3908
        events []*types.HistoryEvent,
3909
        expectedFirstEventID int64,
3910
        expectedLastEventID int64,
3911
        isFirstPage bool,
3912
        isLastPage bool,
3913
        pageSize int,
3914
) error {
1,638✔
3915

1,638✔
3916
        nEvents := len(events)
1,638✔
3917
        if nEvents == 0 {
1,651✔
3918
                if isLastPage {
26✔
3919
                        // we seem to be returning a non-nil pageToken on the lastPage which
13✔
3920
                        // in turn cases the client to call getHistory again - only to find
13✔
3921
                        // there are no more events to consume - bail out if this is the case here
13✔
3922
                        return nil
13✔
3923
                }
13✔
3924
                return fmt.Errorf("invalid history: contains zero events")
×
3925
        }
3926

3927
        firstEventID := events[0].ID
1,625✔
3928
        lastEventID := events[nEvents-1].ID
1,625✔
3929

1,625✔
3930
        if !isFirstPage { // atleast one page of history has been read previously
1,661✔
3931
                if firstEventID <= expectedFirstEventID {
36✔
3932
                        // not first page and no events have been read in the previous pages - not possible
×
3933
                        return &types.InternalServiceError{
×
3934
                                Message: fmt.Sprintf(
×
3935
                                        "invalid history: expected first eventID to be > %v but got %v", expectedFirstEventID, firstEventID),
×
3936
                        }
×
3937
                }
×
3938
                expectedFirstEventID = firstEventID
36✔
3939
        }
3940

3941
        if !isLastPage {
1,674✔
3942
                // estimate lastEventID based on pageSize. This is a lower bound
49✔
3943
                // since the persistence layer counts "batch of events" as a single page
49✔
3944
                expectedLastEventID = expectedFirstEventID + int64(pageSize) - 1
49✔
3945
        }
49✔
3946

3947
        nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1
1,625✔
3948

1,625✔
3949
        if firstEventID == expectedFirstEventID &&
1,625✔
3950
                ((isLastPage && lastEventID == expectedLastEventID && int64(nEvents) == nExpectedEvents) ||
1,625✔
3951
                        (!isLastPage && lastEventID >= expectedLastEventID && int64(nEvents) >= nExpectedEvents)) {
3,238✔
3952
                return nil
1,613✔
3953
        }
1,613✔
3954

3955
        return &types.InternalServiceError{
12✔
3956
                Message: fmt.Sprintf(
12✔
3957
                        "incomplete history: "+
12✔
3958
                                "expected events [%v-%v] but got events [%v-%v] of length %v:"+
12✔
3959
                                "isFirstPage=%v,isLastPage=%v,pageSize=%v",
12✔
3960
                        expectedFirstEventID,
12✔
3961
                        expectedLastEventID,
12✔
3962
                        firstEventID,
12✔
3963
                        lastEventID,
12✔
3964
                        nEvents,
12✔
3965
                        isFirstPage,
12✔
3966
                        isLastPage,
12✔
3967
                        pageSize),
12✔
3968
        }
12✔
3969
}
3970

3971
func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) {
44✔
3972
        token := &getHistoryContinuationToken{}
44✔
3973
        err := json.Unmarshal(bytes, token)
44✔
3974
        return token, err
44✔
3975
}
44✔
3976

3977
func serializeHistoryToken(token *getHistoryContinuationToken) ([]byte, error) {
441✔
3978
        if token == nil {
840✔
3979
                return nil, nil
399✔
3980
        }
399✔
3981

3982
        bytes, err := json.Marshal(token)
42✔
3983
        return bytes, err
42✔
3984
}
3985

3986
func isFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3987
        return updateRequest.ActiveClusterName != nil
9✔
3988
}
9✔
3989

3990
func isGraceFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3991
        return updateRequest.FailoverTimeoutInSeconds != nil
9✔
3992
}
9✔
3993

3994
func (wh *WorkflowHandler) checkOngoingFailover(
3995
        ctx context.Context,
3996
        domainName *string,
3997
) error {
1✔
3998

1✔
3999
        enabledClusters := wh.GetClusterMetadata().GetEnabledClusterInfo()
1✔
4000
        respChan := make(chan *types.DescribeDomainResponse, len(enabledClusters))
1✔
4001

1✔
4002
        g := &errgroup.Group{}
1✔
4003
        for clusterName := range enabledClusters {
3✔
4004
                frontendClient := wh.GetRemoteFrontendClient(clusterName)
2✔
4005
                g.Go(func() (e error) {
4✔
4006
                        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &e) }()
4✔
4007

4008
                        resp, _ := frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{Name: domainName})
2✔
4009
                        respChan <- resp
2✔
4010
                        return nil
2✔
4011
                })
4012
        }
4013
        g.Wait()
1✔
4014
        close(respChan)
1✔
4015

1✔
4016
        var failoverVersion *int64
1✔
4017
        for resp := range respChan {
3✔
4018
                if resp == nil {
2✔
4019
                        return &types.InternalServiceError{
×
4020
                                Message: "Failed to verify failover version from all clusters",
×
4021
                        }
×
4022
                }
×
4023
                if failoverVersion == nil {
3✔
4024
                        failoverVersion = &resp.FailoverVersion
1✔
4025
                }
1✔
4026
                if *failoverVersion != resp.GetFailoverVersion() {
2✔
4027
                        return &types.BadRequestError{
×
4028
                                Message: "Concurrent failover is not allow.",
×
4029
                        }
×
4030
                }
×
4031
        }
4032
        return nil
1✔
4033
}
4034

4035
func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *types.GetWorkflowExecutionHistoryRequest, domainID string) bool {
451✔
4036
        if request.GetExecution() == nil || request.GetExecution().GetRunID() == "" {
489✔
4037
                return false
38✔
4038
        }
38✔
4039
        getMutableStateRequest := &types.GetMutableStateRequest{
413✔
4040
                DomainUUID: domainID,
413✔
4041
                Execution:  request.Execution,
413✔
4042
        }
413✔
4043
        _, err := wh.GetHistoryClient().GetMutableState(ctx, getMutableStateRequest)
413✔
4044
        if err == nil {
801✔
4045
                return false
388✔
4046
        }
388✔
4047
        switch err.(type) {
25✔
4048
        case *types.EntityNotExistsError:
24✔
4049
                // the only case in which history is assumed to be archived is if getting mutable state returns entity not found error
24✔
4050
                return true
24✔
4051
        }
4052
        return false
1✔
4053
}
4054

4055
func (wh *WorkflowHandler) getArchivedHistory(
4056
        ctx context.Context,
4057
        request *types.GetWorkflowExecutionHistoryRequest,
4058
        domainID string,
4059
) (*types.GetWorkflowExecutionHistoryResponse, error) {
27✔
4060
        entry, err := wh.GetDomainCache().GetDomainByID(domainID)
27✔
4061
        if err != nil {
28✔
4062
                return nil, err
1✔
4063
        }
1✔
4064

4065
        URIString := entry.GetConfig().HistoryArchivalURI
26✔
4066
        if URIString == "" {
27✔
4067
                // if URI is empty, it means the domain has never enabled for archival.
1✔
4068
                // the error is not "workflow has passed retention period", because
1✔
4069
                // we have no way to tell if the requested workflow exists or not.
1✔
4070
                return nil, validate.ErrHistoryNotFound
1✔
4071
        }
1✔
4072

4073
        URI, err := archiver.NewURI(URIString)
25✔
4074
        if err != nil {
26✔
4075
                return nil, err
1✔
4076
        }
1✔
4077

4078
        historyArchiver, err := wh.GetArchiverProvider().GetHistoryArchiver(URI.Scheme(), service.Frontend)
24✔
4079
        if err != nil {
24✔
4080
                return nil, err
×
4081
        }
×
4082

4083
        resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{
24✔
4084
                DomainID:      domainID,
24✔
4085
                WorkflowID:    request.GetExecution().GetWorkflowID(),
24✔
4086
                RunID:         request.GetExecution().GetRunID(),
24✔
4087
                NextPageToken: request.GetNextPageToken(),
24✔
4088
                PageSize:      int(request.GetMaximumPageSize()),
24✔
4089
        })
24✔
4090
        if err != nil {
26✔
4091
                return nil, err
2✔
4092
        }
2✔
4093

4094
        history := &types.History{}
22✔
4095
        for _, batch := range resp.HistoryBatches {
279✔
4096
                history.Events = append(history.Events, batch.Events...)
257✔
4097
        }
257✔
4098
        return &types.GetWorkflowExecutionHistoryResponse{
22✔
4099
                History:       history,
22✔
4100
                NextPageToken: resp.NextPageToken,
22✔
4101
                Archived:      true,
22✔
4102
        }, nil
22✔
4103
}
4104

4105
func (wh *WorkflowHandler) convertIndexedKeyToThrift(keys map[string]interface{}) map[string]types.IndexedValueType {
3✔
4106
        converted := make(map[string]types.IndexedValueType)
3✔
4107
        for k, v := range keys {
51✔
4108
                converted[k] = common.ConvertIndexedValueTypeToInternalType(v, wh.GetLogger())
48✔
4109
        }
48✔
4110
        return converted
2✔
4111
}
4112

4113
func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain string) bool {
305✔
4114
        return common.IsAdvancedVisibilityReadingEnabled(wh.config.EnableReadVisibilityFromES(domain), wh.config.IsAdvancedVisConfigExist) &&
305✔
4115
                pageSize > int32(wh.config.ESIndexMaxResultWindow())
305✔
4116
}
305✔
4117

4118
// GetClusterInfo return information about cadence deployment
4119
func (wh *WorkflowHandler) GetClusterInfo(
4120
        ctx context.Context,
4121
) (resp *types.ClusterInfo, err error) {
×
4122
        return &types.ClusterInfo{
×
4123
                SupportedClientVersions: &types.SupportedClientVersions{
×
4124
                        GoSdk:   client.SupportedGoSDKVersion,
×
4125
                        JavaSdk: client.SupportedJavaSDKVersion,
×
4126
                },
×
4127
        }, nil
×
4128
}
×
4129

4130
func checkFailOverPermission(config *config.Config, domainName string) error {
2✔
4131
        if config.Lockdown(domainName) {
3✔
4132
                return validate.ErrDomainInLockdown
1✔
4133
        }
1✔
4134
        return nil
1✔
4135
}
4136

4137
type domainWrapper struct {
4138
        domain string
4139
}
4140

4141
func (d domainWrapper) GetDomain() string {
1,810✔
4142
        return d.domain
1,810✔
4143
}
1,810✔
4144

4145
func (hs HealthStatus) String() string {
3✔
4146
        switch hs {
3✔
4147
        case HealthStatusOK:
1✔
4148
                return "OK"
1✔
4149
        case HealthStatusWarmingUp:
1✔
4150
                return "WarmingUp"
1✔
4151
        case HealthStatusShuttingDown:
1✔
4152
                return "ShuttingDown"
1✔
4153
        default:
×
4154
                return "unknown"
×
4155
        }
4156
}
4157

4158
func getDomainWfIDRunIDTags(
4159
        domainName string,
4160
        wf *types.WorkflowExecution,
4161
) []tag.Tag {
1,509✔
4162
        tags := []tag.Tag{tag.WorkflowDomainName(domainName)}
1,509✔
4163
        if wf == nil {
3,018✔
4164
                return tags
1,509✔
4165
        }
1,509✔
4166
        return append(
×
4167
                tags,
×
4168
                tag.WorkflowID(wf.GetWorkflowID()),
×
4169
                tag.WorkflowRunID(wf.GetRunID()),
×
4170
        )
×
4171
}
4172

4173
func checkRequiredDomainDataKVs(requiredDomainDataKeys map[string]interface{}, domainData map[string]string) error {
51✔
4174
        // check requiredDomainDataKeys
51✔
4175
        for k := range requiredDomainDataKeys {
52✔
4176
                _, ok := domainData[k]
1✔
4177
                if !ok {
2✔
4178
                        return fmt.Errorf("domain data error, missing required key %v . All required keys: %v", k, requiredDomainDataKeys)
1✔
4179
                }
1✔
4180
        }
4181
        return nil
50✔
4182
}
4183

4184
// Some error types are introduced later that some clients might not support
4185
// To make them backward compatible, we continue returning the legacy error types
4186
// for older clients
4187
func (wh *WorkflowHandler) normalizeVersionedErrors(ctx context.Context, err error) error {
74✔
4188
        switch err.(type) {
74✔
4189
        case *types.WorkflowExecutionAlreadyCompletedError:
17✔
4190
                call := yarpc.CallFromContext(ctx)
17✔
4191
                clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
17✔
4192
                clientImpl := call.Header(common.ClientImplHeaderName)
17✔
4193
                featureFlags := client.GetFeatureFlagsFromHeader(call)
17✔
4194

17✔
4195
                vErr := wh.versionChecker.SupportsWorkflowAlreadyCompletedError(clientImpl, clientFeatureVersion, featureFlags)
17✔
4196
                if vErr == nil {
20✔
4197
                        return err
3✔
4198
                }
3✔
4199
                return &types.EntityNotExistsError{Message: "Workflow execution already completed."}
14✔
4200
        default:
57✔
4201
                return err
57✔
4202
        }
4203
}
4204
func constructRestartWorkflowRequest(w *types.WorkflowExecutionStartedEventAttributes, domain string, identity string, workflowID string) *types.StartWorkflowExecutionRequest {
1✔
4205

1✔
4206
        startRequest := &types.StartWorkflowExecutionRequest{
1✔
4207
                RequestID:  uuid.New().String(),
1✔
4208
                Domain:     domain,
1✔
4209
                WorkflowID: workflowID,
1✔
4210
                WorkflowType: &types.WorkflowType{
1✔
4211
                        Name: w.WorkflowType.Name,
1✔
4212
                },
1✔
4213
                TaskList: &types.TaskList{
1✔
4214
                        Name: w.TaskList.Name,
1✔
4215
                },
1✔
4216
                Input:                               w.Input,
1✔
4217
                ExecutionStartToCloseTimeoutSeconds: w.ExecutionStartToCloseTimeoutSeconds,
1✔
4218
                TaskStartToCloseTimeoutSeconds:      w.TaskStartToCloseTimeoutSeconds,
1✔
4219
                Identity:                            identity,
1✔
4220
                WorkflowIDReusePolicy:               types.WorkflowIDReusePolicyTerminateIfRunning.Ptr(),
1✔
4221
        }
1✔
4222
        startRequest.CronSchedule = w.CronSchedule
1✔
4223
        startRequest.RetryPolicy = w.RetryPolicy
1✔
4224
        startRequest.DelayStartSeconds = w.FirstDecisionTaskBackoffSeconds
1✔
4225
        startRequest.Header = w.Header
1✔
4226
        startRequest.Memo = w.Memo
1✔
4227
        startRequest.SearchAttributes = w.SearchAttributes
1✔
4228

1✔
4229
        return startRequest
1✔
4230
}
1✔
4231

4232
func getMetricsScopeWithDomain(
4233
        scope int,
4234
        d domainGetter,
4235
        metricsClient metrics.Client,
4236
) metrics.Scope {
5,910✔
4237
        var metricsScope metrics.Scope
5,910✔
4238
        if d != nil {
11,820✔
4239
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainTag(d.GetDomain()))
5,910✔
4240
        } else {
5,910✔
4241
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainUnknownTag())
×
4242
        }
×
4243
        return metricsScope
5,910✔
4244
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc