• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018df986-3ddf-4a88-a536-c3c428403c8e

01 Mar 2024 10:18AM UTC coverage: 62.889% (-0.01%) from 62.9%
018df986-3ddf-4a88-a536-c3c428403c8e

Pull #5695

buildkite

web-flow
Merge branch 'master' into fix_local_ci
Pull Request #5695: Fix the local integration test docker-compose file

92954 of 147806 relevant lines covered (62.89%)

2335.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.11
/service/frontend/api/handler.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package api
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "fmt"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/google/uuid"
31
        "go.uber.org/yarpc"
32
        "golang.org/x/sync/errgroup"
33

34
        "github.com/uber/cadence/.gen/go/shared"
35
        "github.com/uber/cadence/.gen/go/sqlblobs"
36
        "github.com/uber/cadence/common"
37
        "github.com/uber/cadence/common/archiver"
38
        "github.com/uber/cadence/common/backoff"
39
        "github.com/uber/cadence/common/cache"
40
        "github.com/uber/cadence/common/client"
41
        "github.com/uber/cadence/common/domain"
42
        "github.com/uber/cadence/common/elasticsearch/validator"
43
        "github.com/uber/cadence/common/log"
44
        "github.com/uber/cadence/common/log/tag"
45
        "github.com/uber/cadence/common/metrics"
46
        "github.com/uber/cadence/common/partition"
47
        "github.com/uber/cadence/common/persistence"
48
        persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
49
        "github.com/uber/cadence/common/resource"
50
        "github.com/uber/cadence/common/service"
51
        "github.com/uber/cadence/common/types"
52
        "github.com/uber/cadence/service/frontend/config"
53
        "github.com/uber/cadence/service/frontend/validate"
54
)
55

56
const (
57
        // HealthStatusOK is used when this node is healthy and rpc requests are allowed
58
        HealthStatusOK HealthStatus = iota + 1
59
        // HealthStatusWarmingUp is used when the rpc handler is warming up
60
        HealthStatusWarmingUp
61
        // HealthStatusShuttingDown is used when the rpc handler is shutting down
62
        HealthStatusShuttingDown
63
)
64

65
var _ Handler = (*WorkflowHandler)(nil)
66

67
type (
68
        // WorkflowHandler - Thrift handler interface for workflow service
69
        WorkflowHandler struct {
70
                resource.Resource
71

72
                shuttingDown              int32
73
                healthStatus              int32
74
                tokenSerializer           common.TaskTokenSerializer
75
                config                    *config.Config
76
                versionChecker            client.VersionChecker
77
                domainHandler             domain.Handler
78
                visibilityQueryValidator  *validator.VisibilityQueryValidator
79
                searchAttributesValidator *validator.SearchAttributesValidator
80
                throttleRetry             *backoff.ThrottleRetry
81
                producerManager           ProducerManager
82
        }
83

84
        getHistoryContinuationToken struct {
85
                RunID             string
86
                FirstEventID      int64
87
                NextEventID       int64
88
                IsWorkflowRunning bool
89
                PersistenceToken  []byte
90
                TransientDecision *types.TransientDecisionInfo
91
                BranchToken       []byte
92
        }
93

94
        domainGetter interface {
95
                GetDomain() string
96
        }
97

98
        // HealthStatus is an enum that refers to the rpc handler health status
99
        HealthStatus int32
100
)
101

102
var (
103
        frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
104
)
105

106
// NewWorkflowHandler creates a thrift handler for the cadence service
107
func NewWorkflowHandler(
108
        resource resource.Resource,
109
        config *config.Config,
110
        versionChecker client.VersionChecker,
111
        domainHandler domain.Handler,
112
) *WorkflowHandler {
84✔
113
        return &WorkflowHandler{
84✔
114
                Resource:        resource,
84✔
115
                config:          config,
84✔
116
                healthStatus:    int32(HealthStatusWarmingUp),
84✔
117
                tokenSerializer: common.NewJSONTaskTokenSerializer(),
84✔
118
                versionChecker:  versionChecker,
84✔
119
                domainHandler:   domainHandler,
84✔
120
                visibilityQueryValidator: validator.NewQueryValidator(
84✔
121
                        config.ValidSearchAttributes,
84✔
122
                        config.EnableQueryAttributeValidation,
84✔
123
                ),
84✔
124
                searchAttributesValidator: validator.NewSearchAttributesValidator(
84✔
125
                        resource.GetLogger(),
84✔
126
                        config.EnableQueryAttributeValidation,
84✔
127
                        config.ValidSearchAttributes,
84✔
128
                        config.SearchAttributesNumberOfKeysLimit,
84✔
129
                        config.SearchAttributesSizeOfValueLimit,
84✔
130
                        config.SearchAttributesTotalSizeLimit,
84✔
131
                ),
84✔
132
                throttleRetry: backoff.NewThrottleRetry(
84✔
133
                        backoff.WithRetryPolicy(frontendServiceRetryPolicy),
84✔
134
                        backoff.WithRetryableError(common.IsServiceTransientError),
84✔
135
                ),
84✔
136
                producerManager: NewProducerManager(
84✔
137
                        resource.GetDomainCache(),
84✔
138
                        resource.GetAsyncWorkflowQueueProvider(),
84✔
139
                        resource.GetLogger(),
84✔
140
                        resource.GetMetricsClient(),
84✔
141
                ),
84✔
142
        }
84✔
143
}
84✔
144

145
// Start starts the handler
146
func (wh *WorkflowHandler) Start() {
15✔
147
        // TODO: Get warmup duration from config. Even better, run proactive checks such as probing downstream connections.
15✔
148
        const warmUpDuration = 30 * time.Second
15✔
149

15✔
150
        warmupTimer := time.NewTimer(warmUpDuration)
15✔
151
        go func() {
30✔
152
                <-warmupTimer.C
15✔
153
                wh.GetLogger().Warn("Service warmup duration has elapsed.")
15✔
154
                if atomic.CompareAndSwapInt32(&wh.healthStatus, int32(HealthStatusWarmingUp), int32(HealthStatusOK)) {
27✔
155
                        wh.GetLogger().Warn("Warmup time has elapsed. Service is healthy.")
12✔
156
                } else {
12✔
157
                        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
×
158
                        wh.GetLogger().Warn(fmt.Sprintf("Warmup time has elapsed. Service status is: %v", status.String()))
×
159
                }
×
160
        }()
161
}
162

163
// Stop stops the handler
164
func (wh *WorkflowHandler) Stop() {
15✔
165
        atomic.StoreInt32(&wh.shuttingDown, 1)
15✔
166
}
15✔
167

168
// UpdateHealthStatus sets the health status for this rpc handler.
169
// This health status will be used within the rpc health check handler
170
func (wh *WorkflowHandler) UpdateHealthStatus(status HealthStatus) {
16✔
171
        atomic.StoreInt32(&wh.healthStatus, int32(status))
16✔
172
}
16✔
173

174
func (wh *WorkflowHandler) isShuttingDown() bool {
6,463✔
175
        return atomic.LoadInt32(&wh.shuttingDown) != 0
6,463✔
176
}
6,463✔
177

178
// Health is for health check
179
func (wh *WorkflowHandler) Health(ctx context.Context) (*types.HealthStatus, error) {
2✔
180
        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
2✔
181
        msg := status.String()
2✔
182

2✔
183
        if status != HealthStatusOK {
3✔
184
                wh.GetLogger().Warn(fmt.Sprintf("Service status is: %v", msg))
1✔
185
        }
1✔
186

187
        return &types.HealthStatus{
2✔
188
                Ok:  status == HealthStatusOK,
2✔
189
                Msg: msg,
2✔
190
        }, nil
2✔
191
}
192

193
// RegisterDomain creates a new domain which can be used as a container for all resources.  Domain is a top level
194
// entity within Cadence, used as a container for all resources like workflow executions, tasklists, etc.  Domain
195
// acts as a sandbox and provides isolation for all resources within the domain.  All resources belongs to exactly one
196
// domain.
197
func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *types.RegisterDomainRequest) (retError error) {
45✔
198
        if wh.isShuttingDown() {
45✔
199
                return validate.ErrShuttingDown
×
200
        }
×
201

202
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
45✔
203
                return err
×
204
        }
×
205

206
        if registerRequest == nil {
45✔
207
                return validate.ErrRequestNotSet
×
208
        }
×
209

210
        if registerRequest.GetWorkflowExecutionRetentionPeriodInDays() > int32(wh.config.DomainConfig.MaxRetentionDays()) {
45✔
211
                return validate.ErrInvalidRetention
×
212
        }
×
213

214
        if err := validate.CheckPermission(wh.config, registerRequest.SecurityToken); err != nil {
45✔
215
                return err
×
216
        }
×
217

218
        if err := checkRequiredDomainDataKVs(wh.config.DomainConfig.RequiredDomainDataKeys(), registerRequest.GetData()); err != nil {
46✔
219
                return err
1✔
220
        }
1✔
221

222
        if registerRequest.GetName() == "" {
44✔
223
                return validate.ErrDomainNotSet
×
224
        }
×
225

226
        return wh.domainHandler.RegisterDomain(ctx, registerRequest)
44✔
227
}
228

229
// ListDomains returns the information and configuration for a registered domain.
230
func (wh *WorkflowHandler) ListDomains(
231
        ctx context.Context,
232
        listRequest *types.ListDomainsRequest,
233
) (response *types.ListDomainsResponse, retError error) {
2✔
234
        if wh.isShuttingDown() {
2✔
235
                return nil, validate.ErrShuttingDown
×
236
        }
×
237

238
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
239
                return nil, err
×
240
        }
×
241

242
        if listRequest == nil {
3✔
243
                return nil, validate.ErrRequestNotSet
1✔
244
        }
1✔
245

246
        return wh.domainHandler.ListDomains(ctx, listRequest)
1✔
247
}
248

249
// DescribeDomain returns the information and configuration for a registered domain.
250
func (wh *WorkflowHandler) DescribeDomain(
251
        ctx context.Context,
252
        describeRequest *types.DescribeDomainRequest,
253
) (response *types.DescribeDomainResponse, retError error) {
134✔
254
        if wh.isShuttingDown() {
134✔
255
                return nil, validate.ErrShuttingDown
×
256
        }
×
257

258
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
134✔
259
                return nil, err
×
260
        }
×
261

262
        if describeRequest == nil {
134✔
263
                return nil, validate.ErrRequestNotSet
×
264
        }
×
265

266
        if describeRequest.GetName() == "" && describeRequest.GetUUID() == "" {
134✔
267
                return nil, validate.ErrDomainNotSet
×
268
        }
×
269

270
        resp, err := wh.domainHandler.DescribeDomain(ctx, describeRequest)
134✔
271
        if err != nil {
134✔
272
                return nil, err
×
273
        }
×
274

275
        if resp.GetFailoverInfo() != nil && resp.GetFailoverInfo().GetFailoverExpireTimestamp() > 0 {
134✔
276
                // fetch ongoing failover info from history service
×
277
                failoverResp, err := wh.GetHistoryClient().GetFailoverInfo(ctx, &types.GetFailoverInfoRequest{
×
278
                        DomainID: resp.GetDomainInfo().UUID,
×
279
                })
×
280
                if err != nil {
×
281
                        // despite the error from history, return describe domain response
×
282
                        wh.GetLogger().Error(
×
283
                                fmt.Sprintf("Failed to get failover info for domain %s", resp.DomainInfo.GetName()),
×
284
                                tag.Error(err),
×
285
                        )
×
286
                        return resp, nil
×
287
                }
×
288
                resp.FailoverInfo.CompletedShardCount = failoverResp.GetCompletedShardCount()
×
289
                resp.FailoverInfo.PendingShards = failoverResp.GetPendingShards()
×
290
        }
291
        return resp, nil
134✔
292
}
293

294
// UpdateDomain is used to update the information and configuration for a registered domain.
295
func (wh *WorkflowHandler) UpdateDomain(
296
        ctx context.Context,
297
        updateRequest *types.UpdateDomainRequest,
298
) (resp *types.UpdateDomainResponse, retError error) {
9✔
299
        domainName := ""
9✔
300
        if updateRequest != nil {
18✔
301
                domainName = updateRequest.GetName()
9✔
302
        }
9✔
303

304
        logger := wh.GetLogger().WithTags(
9✔
305
                tag.WorkflowDomainName(domainName),
9✔
306
                tag.OperationName("DomainUpdate"))
9✔
307

9✔
308
        if updateRequest == nil {
9✔
309
                logger.Error("Nil domain update request.",
×
310
                        tag.Error(validate.ErrRequestNotSet))
×
311
                return nil, validate.ErrRequestNotSet
×
312
        }
×
313

314
        isFailover := isFailoverRequest(updateRequest)
9✔
315
        isGraceFailover := isGraceFailoverRequest(updateRequest)
9✔
316
        logger.Info(fmt.Sprintf(
9✔
317
                "Domain Update requested. isFailover: %v, isGraceFailover: %v, Request: %#v.",
9✔
318
                isFailover,
9✔
319
                isGraceFailover,
9✔
320
                updateRequest))
9✔
321

9✔
322
        if wh.isShuttingDown() {
9✔
323
                logger.Error("Won't apply the domain update since workflowHandler is shutting down.",
×
324
                        tag.Error(validate.ErrShuttingDown))
×
325
                return nil, validate.ErrShuttingDown
×
326
        }
×
327

328
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
9✔
329
                logger.Error("Won't apply the domain update since client version is not supported.",
×
330
                        tag.Error(err))
×
331
                return nil, err
×
332
        }
×
333

334
        // don't require permission for failover request
335
        if isFailover {
11✔
336
                // reject the failover if the cluster is in lockdown
2✔
337
                if err := checkFailOverPermission(wh.config, updateRequest.GetName()); err != nil {
3✔
338
                        logger.Error("Domain failover request rejected since domain is in lockdown.",
1✔
339
                                tag.Error(err))
1✔
340
                        return nil, err
1✔
341
                }
1✔
342
        } else {
7✔
343
                if err := validate.CheckPermission(wh.config, updateRequest.SecurityToken); err != nil {
7✔
344
                        logger.Error("Domain update request rejected due to failing permissions.",
×
345
                                tag.Error(err))
×
346
                        return nil, err
×
347
                }
×
348
        }
349

350
        if isGraceFailover {
9✔
351
                if err := wh.checkOngoingFailover(
1✔
352
                        ctx,
1✔
353
                        &updateRequest.Name,
1✔
354
                ); err != nil {
1✔
355
                        logger.Error("Graceful domain failover request failed. Not able to check ongoing failovers.",
×
356
                                tag.Error(err))
×
357
                        return nil, err
×
358
                }
×
359
        }
360

361
        if updateRequest.GetName() == "" {
8✔
362
                logger.Error("Domain not set on request.",
×
363
                        tag.Error(validate.ErrDomainNotSet))
×
364
                return nil, validate.ErrDomainNotSet
×
365
        }
×
366
        // TODO: call remote clusters to verify domain data
367
        resp, err := wh.domainHandler.UpdateDomain(ctx, updateRequest)
8✔
368
        if err != nil {
10✔
369
                logger.Error("Domain update operation failed.",
2✔
370
                        tag.Error(err))
2✔
371
                return nil, err
2✔
372
        }
2✔
373
        logger.Info("Domain update operation succeeded.")
6✔
374
        return resp, nil
6✔
375
}
376

377
// DeprecateDomain us used to update status of a registered domain to DEPRECATED. Once the domain is deprecated
378
// it cannot be used to start new workflow executions.  Existing workflow executions will continue to run on
379
// deprecated domains.
380
func (wh *WorkflowHandler) DeprecateDomain(ctx context.Context, deprecateRequest *types.DeprecateDomainRequest) (retError error) {
×
381
        if wh.isShuttingDown() {
×
382
                return validate.ErrShuttingDown
×
383
        }
×
384

385
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
386
                return err
×
387
        }
×
388

389
        if deprecateRequest == nil {
×
390
                return validate.ErrRequestNotSet
×
391
        }
×
392

393
        if err := validate.CheckPermission(wh.config, deprecateRequest.SecurityToken); err != nil {
×
394
                return err
×
395
        }
×
396

397
        if deprecateRequest.GetName() == "" {
×
398
                return validate.ErrDomainNotSet
×
399
        }
×
400

401
        return wh.domainHandler.DeprecateDomain(ctx, deprecateRequest)
×
402
}
403

404
// PollForActivityTask - Poll for an activity task.
405
func (wh *WorkflowHandler) PollForActivityTask(
406
        ctx context.Context,
407
        pollRequest *types.PollForActivityTaskRequest,
408
) (resp *types.PollForActivityTaskResponse, retError error) {
708✔
409
        callTime := time.Now()
708✔
410

708✔
411
        if wh.isShuttingDown() {
708✔
412
                return nil, validate.ErrShuttingDown
×
413
        }
×
414

415
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
708✔
416
                return nil, err
×
417
        }
×
418

419
        if pollRequest == nil {
708✔
420
                return nil, validate.ErrRequestNotSet
×
421
        }
×
422

423
        domainName := pollRequest.GetDomain()
708✔
424
        if domainName == "" {
708✔
425
                return nil, validate.ErrDomainNotSet
×
426
        }
×
427

428
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForActivityTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
708✔
429
        wh.GetLogger().Debug("Received PollForActivityTask")
708✔
430
        if err := common.ValidateLongPollContextTimeout(
708✔
431
                ctx,
708✔
432
                "PollForActivityTask",
708✔
433
                wh.GetThrottledLogger(),
708✔
434
        ); err != nil {
710✔
435
                return nil, err
2✔
436
        }
2✔
437

438
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
706✔
439
        if !common.IsValidIDLength(
706✔
440
                domainName,
706✔
441
                scope,
706✔
442
                idLengthWarnLimit,
706✔
443
                wh.config.DomainNameMaxLength(domainName),
706✔
444
                metrics.CadenceErrDomainNameExceededWarnLimit,
706✔
445
                domainName,
706✔
446
                wh.GetLogger(),
706✔
447
                tag.IDTypeDomainName) {
706✔
448
                return nil, validate.ErrDomainTooLong
×
449
        }
×
450

451
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
706✔
452
                return nil, err
×
453
        }
×
454

455
        if !common.IsValidIDLength(
706✔
456
                pollRequest.GetIdentity(),
706✔
457
                scope,
706✔
458
                idLengthWarnLimit,
706✔
459
                wh.config.IdentityMaxLength(domainName),
706✔
460
                metrics.CadenceErrIdentityExceededWarnLimit,
706✔
461
                domainName,
706✔
462
                wh.GetLogger(),
706✔
463
                tag.IDTypeIdentity) {
706✔
464
                return nil, validate.ErrIdentityTooLong
×
465
        }
×
466

467
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
706✔
468
        if err != nil {
973✔
469
                return nil, err
267✔
470
        }
267✔
471

472
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
439✔
473
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
440✔
474
                return &types.PollForActivityTaskResponse{}, nil
1✔
475
        }
1✔
476
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
477
        // in this case, return an empty response
478
        if err := common.ValidateLongPollContextTimeout(
438✔
479
                ctx,
438✔
480
                "PollForActivityTask",
438✔
481
                wh.GetThrottledLogger(),
438✔
482
        ); err != nil {
438✔
483
                return &types.PollForActivityTaskResponse{}, nil
×
484
        }
×
485
        pollerID := uuid.New().String()
438✔
486
        op := func() error {
876✔
487
                resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{
438✔
488
                        DomainUUID:     domainID,
438✔
489
                        PollerID:       pollerID,
438✔
490
                        PollRequest:    pollRequest,
438✔
491
                        IsolationGroup: isolationGroup,
438✔
492
                })
438✔
493
                return err
438✔
494
        }
438✔
495

496
        err = wh.throttleRetry.Do(ctx, op)
438✔
497
        if err != nil {
504✔
498
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeActivity, pollRequest.TaskList, pollerID)
66✔
499
                if err != nil {
66✔
500
                        // For all other errors log an error and return it back to client.
×
501
                        ctxTimeout := "not-set"
×
502
                        ctxDeadline, ok := ctx.Deadline()
×
503
                        if ok {
×
504
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
505
                        }
×
506
                        wh.GetLogger().Error("PollForActivityTask failed.",
×
507
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
508
                                tag.Value(ctxTimeout),
×
509
                                tag.Error(err))
×
510
                        return nil, err
×
511
                }
512
        }
513
        return resp, nil
438✔
514
}
515

516
// PollForDecisionTask - Poll for a decision task.
517
func (wh *WorkflowHandler) PollForDecisionTask(
518
        ctx context.Context,
519
        pollRequest *types.PollForDecisionTaskRequest,
520
) (resp *types.PollForDecisionTaskResponse, retError error) {
1,491✔
521
        callTime := time.Now()
1,491✔
522

1,491✔
523
        if wh.isShuttingDown() {
1,491✔
524
                return nil, validate.ErrShuttingDown
×
525
        }
×
526

527
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1,491✔
528
                return nil, err
×
529
        }
×
530

531
        if pollRequest == nil {
1,491✔
532
                return nil, validate.ErrRequestNotSet
×
533
        }
×
534

535
        domainName := pollRequest.GetDomain()
1,491✔
536
        tags := getDomainWfIDRunIDTags(domainName, nil)
1,491✔
537

1,491✔
538
        if domainName == "" {
1,491✔
539
                return nil, validate.ErrDomainNotSet
×
540
        }
×
541

542
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForDecisionTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1,491✔
543
        wh.GetLogger().Debug("Received PollForDecisionTask")
1,491✔
544
        if err := common.ValidateLongPollContextTimeout(
1,491✔
545
                ctx,
1,491✔
546
                "PollForDecisionTask",
1,491✔
547
                wh.GetThrottledLogger(),
1,491✔
548
        ); err != nil {
1,493✔
549
                return nil, err
2✔
550
        }
2✔
551

552
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
1,489✔
553
        if !common.IsValidIDLength(
1,489✔
554
                domainName,
1,489✔
555
                scope,
1,489✔
556
                idLengthWarnLimit,
1,489✔
557
                wh.config.DomainNameMaxLength(domainName),
1,489✔
558
                metrics.CadenceErrDomainNameExceededWarnLimit,
1,489✔
559
                domainName,
1,489✔
560
                wh.GetLogger(),
1,489✔
561
                tag.IDTypeDomainName) {
1,489✔
562
                return nil, validate.ErrDomainTooLong
×
563
        }
×
564

565
        if !common.IsValidIDLength(
1,489✔
566
                pollRequest.GetIdentity(),
1,489✔
567
                scope,
1,489✔
568
                idLengthWarnLimit,
1,489✔
569
                wh.config.IdentityMaxLength(domainName),
1,489✔
570
                metrics.CadenceErrIdentityExceededWarnLimit,
1,489✔
571
                domainName,
1,489✔
572
                wh.GetLogger(),
1,489✔
573
                tag.IDTypeIdentity) {
1,489✔
574
                return nil, validate.ErrIdentityTooLong
×
575
        }
×
576

577
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
1,489✔
578
                return nil, err
×
579
        }
×
580

581
        domainEntry, err := wh.GetDomainCache().GetDomain(domainName)
1,489✔
582
        if err != nil {
1,758✔
583
                return nil, err
269✔
584
        }
269✔
585
        domainID := domainEntry.GetInfo().ID
1,220✔
586

1,220✔
587
        wh.GetLogger().Debug("Poll for decision.", tag.WorkflowDomainName(domainName), tag.WorkflowDomainID(domainID))
1,220✔
588
        if err := wh.checkBadBinary(domainEntry, pollRequest.GetBinaryChecksum()); err != nil {
1,220✔
589
                return nil, err
×
590
        }
×
591

592
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
1,220✔
593
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
1,221✔
594
                return &types.PollForDecisionTaskResponse{}, nil
1✔
595
        }
1✔
596
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
597
        // in this case, return an empty response
598
        if err := common.ValidateLongPollContextTimeout(
1,219✔
599
                ctx,
1,219✔
600
                "PollForDecisionTask",
1,219✔
601
                wh.GetThrottledLogger(),
1,219✔
602
        ); err != nil {
1,219✔
603
                return &types.PollForDecisionTaskResponse{}, nil
×
604
        }
×
605

606
        pollerID := uuid.New().String()
1,219✔
607
        var matchingResp *types.MatchingPollForDecisionTaskResponse
1,219✔
608
        op := func() error {
2,438✔
609
                matchingResp, err = wh.GetMatchingClient().PollForDecisionTask(ctx, &types.MatchingPollForDecisionTaskRequest{
1,219✔
610
                        DomainUUID:     domainID,
1,219✔
611
                        PollerID:       pollerID,
1,219✔
612
                        PollRequest:    pollRequest,
1,219✔
613
                        IsolationGroup: isolationGroup,
1,219✔
614
                })
1,219✔
615
                return err
1,219✔
616
        }
1,219✔
617

618
        err = wh.throttleRetry.Do(ctx, op)
1,219✔
619
        if err != nil {
1,285✔
620
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeDecision, pollRequest.TaskList, pollerID)
66✔
621
                if err != nil {
66✔
622
                        // For all other errors log an error and return it back to client.
×
623
                        ctxTimeout := "not-set"
×
624
                        ctxDeadline, ok := ctx.Deadline()
×
625
                        if ok {
×
626
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
627
                        }
×
628
                        wh.GetLogger().Error("PollForDecisionTask failed.",
×
629
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
630
                                tag.Value(ctxTimeout),
×
631
                                tag.Error(err))
×
632
                        return nil, err
×
633
                }
634

635
                // Must be cancellation error.  Does'nt matter what we return here.  Client already went away.
636
                return nil, nil
66✔
637
        }
638

639
        tags = append(tags, []tag.Tag{tag.WorkflowID(
1,153✔
640
                matchingResp.GetWorkflowExecution().GetWorkflowID()),
1,153✔
641
                tag.WorkflowRunID(matchingResp.GetWorkflowExecution().GetRunID())}...)
1,153✔
642
        resp, err = wh.createPollForDecisionTaskResponse(ctx, scope, domainID, matchingResp, matchingResp.GetBranchToken())
1,153✔
643
        if err != nil {
1,153✔
644
                return nil, err
×
645
        }
×
646
        return resp, nil
1,153✔
647
}
648

649
func (wh *WorkflowHandler) getIsolationGroup(ctx context.Context, domainName string) string {
2,868✔
650
        return partition.IsolationGroupFromContext(ctx)
2,868✔
651
}
2,868✔
652

653
func (wh *WorkflowHandler) getPartitionConfig(ctx context.Context, domainName string) map[string]string {
478✔
654
        return partition.ConfigFromContext(ctx)
478✔
655
}
478✔
656

657
func (wh *WorkflowHandler) isIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,209✔
658
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,211✔
659
                isDrained, err := wh.GetIsolationGroupState().IsDrained(ctx, domainName, isolationGroup)
2✔
660
                if err != nil {
2✔
661
                        wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
662
                        return true
×
663
                }
×
664
                return !isDrained
2✔
665
        }
666
        return true
1,207✔
667
}
668

669
func (wh *WorkflowHandler) waitUntilIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,659✔
670
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,661✔
671
                ticker := time.NewTicker(time.Second * 30)
2✔
672
                defer ticker.Stop()
2✔
673
                childCtx, cancel := common.CreateChildContext(ctx, 0.05)
2✔
674
                defer cancel()
2✔
675
                for {
4✔
676
                        isDrained, err := wh.GetIsolationGroupState().IsDrained(childCtx, domainName, isolationGroup)
2✔
677
                        if err != nil {
2✔
678
                                wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
679
                                return true
×
680
                        }
×
681
                        if !isDrained {
2✔
682
                                break
×
683
                        }
684
                        select {
2✔
685
                        case <-childCtx.Done():
2✔
686
                                return false
2✔
687
                        case <-ticker.C:
×
688
                        }
689
                }
690
        }
691
        return true
1,657✔
692
}
693

694
func (wh *WorkflowHandler) checkBadBinary(domainEntry *cache.DomainCacheEntry, binaryChecksum string) error {
1,220✔
695
        if domainEntry.GetConfig().BadBinaries.Binaries != nil {
2,439✔
696
                badBinaries := domainEntry.GetConfig().BadBinaries.Binaries
1,219✔
697
                _, ok := badBinaries[binaryChecksum]
1,219✔
698
                if ok {
1,219✔
699
                        wh.GetMetricsClient().IncCounter(metrics.FrontendPollForDecisionTaskScope, metrics.CadenceErrBadBinaryCounter)
×
700
                        return &types.BadRequestError{
×
701
                                Message: fmt.Sprintf("binary %v already marked as bad deployment", binaryChecksum),
×
702
                        }
×
703
                }
×
704
        }
705
        return nil
1,220✔
706
}
707

708
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error, domainID string, taskListType int32,
709
        taskList *types.TaskList, pollerID string) error {
132✔
710
        // First check if this err is due to context cancellation.  This means client connection to frontend is closed.
132✔
711
        if ctx.Err() == context.Canceled {
264✔
712
                // Our rpc stack does not propagates context cancellation to the other service.  Lets make an explicit
132✔
713
                // call to matching to notify this poller is gone to prevent any tasks being dispatched to zombie pollers.
132✔
714
                err = wh.GetMatchingClient().CancelOutstandingPoll(context.Background(), &types.CancelOutstandingPollRequest{
132✔
715
                        DomainUUID:   domainID,
132✔
716
                        TaskListType: common.Int32Ptr(taskListType),
132✔
717
                        TaskList:     taskList,
132✔
718
                        PollerID:     pollerID,
132✔
719
                })
132✔
720
                // We can not do much if this call fails.  Just log the error and move on
132✔
721
                if err != nil {
132✔
722
                        wh.GetLogger().Warn("Failed to cancel outstanding poller.",
×
723
                                tag.WorkflowTaskListName(taskList.GetName()), tag.Error(err))
×
724
                }
×
725

726
                // Clear error as we don't want to report context cancellation error to count against our SLA
727
                return nil
132✔
728
        }
729

730
        return err
×
731
}
732

733
// RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
734
func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
735
        ctx context.Context,
736
        heartbeatRequest *types.RecordActivityTaskHeartbeatRequest,
737
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
384✔
738
        if wh.isShuttingDown() {
384✔
739
                return nil, validate.ErrShuttingDown
×
740
        }
×
741

742
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
384✔
743
                return nil, err
×
744
        }
×
745

746
        if heartbeatRequest == nil {
385✔
747
                return nil, validate.ErrRequestNotSet
1✔
748
        }
1✔
749

750
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeat")
383✔
751
        if heartbeatRequest.TaskToken == nil {
384✔
752
                return nil, validate.ErrTaskTokenNotSet
1✔
753
        }
1✔
754
        taskToken, err := wh.tokenSerializer.Deserialize(heartbeatRequest.TaskToken)
382✔
755
        if err != nil {
382✔
756
                return nil, err
×
757
        }
×
758
        if taskToken.DomainID == "" {
382✔
759
                return nil, validate.ErrDomainNotSet
×
760
        }
×
761

762
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
382✔
763
        if err != nil {
382✔
764
                return nil, err
×
765
        }
×
766

767
        dw := domainWrapper{
382✔
768
                domain: domainName,
382✔
769
        }
382✔
770
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
382✔
771
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
382✔
772
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
382✔
773

382✔
774
        if err := common.CheckEventBlobSizeLimit(
382✔
775
                len(heartbeatRequest.Details),
382✔
776
                sizeLimitWarn,
382✔
777
                sizeLimitError,
382✔
778
                taskToken.DomainID,
382✔
779
                taskToken.WorkflowID,
382✔
780
                taskToken.RunID,
382✔
781
                scope,
382✔
782
                wh.GetThrottledLogger(),
382✔
783
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeat"),
382✔
784
        ); err != nil {
382✔
785
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
786
                failRequest := &types.RespondActivityTaskFailedRequest{
×
787
                        TaskToken: heartbeatRequest.TaskToken,
×
788
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
789
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
790
                        Identity:  heartbeatRequest.Identity,
×
791
                }
×
792
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
793
                        DomainUUID:    taskToken.DomainID,
×
794
                        FailedRequest: failRequest,
×
795
                })
×
796
                if err != nil {
×
797
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
798
                }
×
799
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
800
        } else {
382✔
801
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
382✔
802
                        DomainUUID:       taskToken.DomainID,
382✔
803
                        HeartbeatRequest: heartbeatRequest,
382✔
804
                })
382✔
805
                if err != nil {
382✔
806
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
807
                }
×
808
        }
809

810
        return resp, nil
382✔
811
}
812

813
// RecordActivityTaskHeartbeatByID - Record Activity Task Heart beat.
814
func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
815
        ctx context.Context,
816
        heartbeatRequest *types.RecordActivityTaskHeartbeatByIDRequest,
817
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
3✔
818
        if wh.isShuttingDown() {
3✔
819
                return nil, validate.ErrShuttingDown
×
820
        }
×
821

822
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
823
                return nil, err
×
824
        }
×
825

826
        if heartbeatRequest == nil {
4✔
827
                return nil, validate.ErrRequestNotSet
1✔
828
        }
1✔
829

830
        domainName := heartbeatRequest.GetDomain()
2✔
831
        if domainName == "" {
3✔
832
                return nil, validate.ErrDomainNotSet
1✔
833
        }
1✔
834

835
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID")
1✔
836
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
1✔
837
        if err != nil {
1✔
838
                return nil, err
×
839
        }
×
840
        workflowID := heartbeatRequest.GetWorkflowID()
1✔
841
        runID := heartbeatRequest.GetRunID() // runID is optional so can be empty
1✔
842
        activityID := heartbeatRequest.GetActivityID()
1✔
843

1✔
844
        if domainID == "" {
1✔
845
                return nil, validate.ErrDomainNotSet
×
846
        }
×
847
        if workflowID == "" {
1✔
848
                return nil, validate.ErrWorkflowIDNotSet
×
849
        }
×
850
        if activityID == "" {
1✔
851
                return nil, validate.ErrActivityIDNotSet
×
852
        }
×
853

854
        taskToken := &common.TaskToken{
1✔
855
                DomainID:   domainID,
1✔
856
                RunID:      runID,
1✔
857
                WorkflowID: workflowID,
1✔
858
                ScheduleID: common.EmptyEventID,
1✔
859
                ActivityID: activityID,
1✔
860
        }
1✔
861
        token, err := wh.tokenSerializer.Serialize(taskToken)
1✔
862
        if err != nil {
1✔
863
                return nil, err
×
864
        }
×
865

866
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatByIDScope, heartbeatRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1✔
867
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
1✔
868
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
1✔
869

1✔
870
        if err := common.CheckEventBlobSizeLimit(
1✔
871
                len(heartbeatRequest.Details),
1✔
872
                sizeLimitWarn,
1✔
873
                sizeLimitError,
1✔
874
                taskToken.DomainID,
1✔
875
                taskToken.WorkflowID,
1✔
876
                taskToken.RunID,
1✔
877
                scope,
1✔
878
                wh.GetThrottledLogger(),
1✔
879
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeatByID"),
1✔
880
        ); err != nil {
1✔
881
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
882
                failRequest := &types.RespondActivityTaskFailedRequest{
×
883
                        TaskToken: token,
×
884
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
885
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
886
                        Identity:  heartbeatRequest.Identity,
×
887
                }
×
888
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
889
                        DomainUUID:    taskToken.DomainID,
×
890
                        FailedRequest: failRequest,
×
891
                })
×
892
                if err != nil {
×
893
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
894
                }
×
895
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
896
        } else {
1✔
897
                req := &types.RecordActivityTaskHeartbeatRequest{
1✔
898
                        TaskToken: token,
1✔
899
                        Details:   heartbeatRequest.Details,
1✔
900
                        Identity:  heartbeatRequest.Identity,
1✔
901
                }
1✔
902

1✔
903
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
1✔
904
                        DomainUUID:       taskToken.DomainID,
1✔
905
                        HeartbeatRequest: req,
1✔
906
                })
1✔
907
                if err != nil {
1✔
908
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
909
                }
×
910
        }
911

912
        return resp, nil
1✔
913
}
914

915
// RespondActivityTaskCompleted - response to an activity task
916
func (wh *WorkflowHandler) RespondActivityTaskCompleted(
917
        ctx context.Context,
918
        completeRequest *types.RespondActivityTaskCompletedRequest,
919
) (retError error) {
247✔
920
        if wh.isShuttingDown() {
247✔
921
                return validate.ErrShuttingDown
×
922
        }
×
923

924
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
247✔
925
                return err
×
926
        }
×
927

928
        if completeRequest == nil {
247✔
929
                return validate.ErrRequestNotSet
×
930
        }
×
931

932
        if completeRequest.TaskToken == nil {
247✔
933
                return validate.ErrTaskTokenNotSet
×
934
        }
×
935
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
247✔
936
        if err != nil {
247✔
937
                return err
×
938
        }
×
939
        if taskToken.DomainID == "" {
247✔
940
                return validate.ErrDomainNotSet
×
941
        }
×
942

943
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
247✔
944
        if err != nil {
247✔
945
                return err
×
946
        }
×
947

948
        dw := domainWrapper{
247✔
949
                domain: domainName,
247✔
950
        }
247✔
951
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
247✔
952
        if !common.IsValidIDLength(
247✔
953
                completeRequest.GetIdentity(),
247✔
954
                scope,
247✔
955
                wh.config.MaxIDLengthWarnLimit(),
247✔
956
                wh.config.IdentityMaxLength(domainName),
247✔
957
                metrics.CadenceErrIdentityExceededWarnLimit,
247✔
958
                domainName,
247✔
959
                wh.GetLogger(),
247✔
960
                tag.IDTypeIdentity) {
247✔
961
                return validate.ErrIdentityTooLong
×
962
        }
×
963

964
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
247✔
965
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
247✔
966

247✔
967
        if err := common.CheckEventBlobSizeLimit(
247✔
968
                len(completeRequest.Result),
247✔
969
                sizeLimitWarn,
247✔
970
                sizeLimitError,
247✔
971
                taskToken.DomainID,
247✔
972
                taskToken.WorkflowID,
247✔
973
                taskToken.RunID,
247✔
974
                scope,
247✔
975
                wh.GetThrottledLogger(),
247✔
976
                tag.BlobSizeViolationOperation("RespondActivityTaskCompleted"),
247✔
977
        ); err != nil {
247✔
978
                // result exceeds blob size limit, we would record it as failure
×
979
                failRequest := &types.RespondActivityTaskFailedRequest{
×
980
                        TaskToken: completeRequest.TaskToken,
×
981
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
982
                        Details:   completeRequest.Result[0:sizeLimitError],
×
983
                        Identity:  completeRequest.Identity,
×
984
                }
×
985
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
986
                        DomainUUID:    taskToken.DomainID,
×
987
                        FailedRequest: failRequest,
×
988
                })
×
989
                if err != nil {
×
990
                        return wh.normalizeVersionedErrors(ctx, err)
×
991
                }
×
992
        } else {
247✔
993
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
247✔
994
                        DomainUUID:      taskToken.DomainID,
247✔
995
                        CompleteRequest: completeRequest,
247✔
996
                })
247✔
997
                if err != nil {
292✔
998
                        return wh.normalizeVersionedErrors(ctx, err)
45✔
999
                }
45✔
1000
        }
1001

1002
        return nil
202✔
1003
}
1004

1005
// RespondActivityTaskCompletedByID - response to an activity task
1006
func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
1007
        ctx context.Context,
1008
        completeRequest *types.RespondActivityTaskCompletedByIDRequest,
1009
) (retError error) {
76✔
1010
        if wh.isShuttingDown() {
76✔
1011
                return validate.ErrShuttingDown
×
1012
        }
×
1013

1014
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
76✔
1015
                return err
×
1016
        }
×
1017

1018
        if completeRequest == nil {
76✔
1019
                return validate.ErrRequestNotSet
×
1020
        }
×
1021

1022
        domainName := completeRequest.GetDomain()
76✔
1023
        if domainName == "" {
76✔
1024
                return validate.ErrDomainNotSet
×
1025
        }
×
1026
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
76✔
1027
        if err != nil {
76✔
1028
                return err
×
1029
        }
×
1030
        workflowID := completeRequest.GetWorkflowID()
76✔
1031
        runID := completeRequest.GetRunID() // runID is optional so can be empty
76✔
1032
        activityID := completeRequest.GetActivityID()
76✔
1033

76✔
1034
        if domainID == "" {
76✔
1035
                return validate.ErrDomainNotSet
×
1036
        }
×
1037
        if workflowID == "" {
76✔
1038
                return validate.ErrWorkflowIDNotSet
×
1039
        }
×
1040
        if activityID == "" {
76✔
1041
                return validate.ErrActivityIDNotSet
×
1042
        }
×
1043

1044
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedByIDScope, completeRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
76✔
1045
        if !common.IsValidIDLength(
76✔
1046
                completeRequest.GetIdentity(),
76✔
1047
                scope,
76✔
1048
                wh.config.MaxIDLengthWarnLimit(),
76✔
1049
                wh.config.IdentityMaxLength(domainName),
76✔
1050
                metrics.CadenceErrIdentityExceededWarnLimit,
76✔
1051
                domainName,
76✔
1052
                wh.GetLogger(),
76✔
1053
                tag.IDTypeIdentity) {
76✔
1054
                return validate.ErrIdentityTooLong
×
1055
        }
×
1056

1057
        taskToken := &common.TaskToken{
76✔
1058
                DomainID:   domainID,
76✔
1059
                RunID:      runID,
76✔
1060
                WorkflowID: workflowID,
76✔
1061
                ScheduleID: common.EmptyEventID,
76✔
1062
                ActivityID: activityID,
76✔
1063
        }
76✔
1064
        token, err := wh.tokenSerializer.Serialize(taskToken)
76✔
1065
        if err != nil {
76✔
1066
                return err
×
1067
        }
×
1068

1069
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
76✔
1070
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
76✔
1071

76✔
1072
        if err := common.CheckEventBlobSizeLimit(
76✔
1073
                len(completeRequest.Result),
76✔
1074
                sizeLimitWarn,
76✔
1075
                sizeLimitError,
76✔
1076
                taskToken.DomainID,
76✔
1077
                taskToken.WorkflowID,
76✔
1078
                taskToken.RunID,
76✔
1079
                scope,
76✔
1080
                wh.GetThrottledLogger(),
76✔
1081
                tag.BlobSizeViolationOperation("RespondActivityTaskCompletedByID"),
76✔
1082
        ); err != nil {
76✔
1083
                // result exceeds blob size limit, we would record it as failure
×
1084
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1085
                        TaskToken: token,
×
1086
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
1087
                        Details:   completeRequest.Result[0:sizeLimitError],
×
1088
                        Identity:  completeRequest.Identity,
×
1089
                }
×
1090
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1091
                        DomainUUID:    taskToken.DomainID,
×
1092
                        FailedRequest: failRequest,
×
1093
                })
×
1094
                if err != nil {
×
1095
                        return wh.normalizeVersionedErrors(ctx, err)
×
1096
                }
×
1097
        } else {
76✔
1098
                req := &types.RespondActivityTaskCompletedRequest{
76✔
1099
                        TaskToken: token,
76✔
1100
                        Result:    completeRequest.Result,
76✔
1101
                        Identity:  completeRequest.Identity,
76✔
1102
                }
76✔
1103

76✔
1104
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
76✔
1105
                        DomainUUID:      taskToken.DomainID,
76✔
1106
                        CompleteRequest: req,
76✔
1107
                })
76✔
1108
                if err != nil {
76✔
1109
                        return wh.normalizeVersionedErrors(ctx, err)
×
1110
                }
×
1111
        }
1112

1113
        return nil
76✔
1114
}
1115

1116
// RespondActivityTaskFailed - response to an activity task failure
1117
func (wh *WorkflowHandler) RespondActivityTaskFailed(
1118
        ctx context.Context,
1119
        failedRequest *types.RespondActivityTaskFailedRequest,
1120
) (retError error) {
12✔
1121
        if wh.isShuttingDown() {
12✔
1122
                return validate.ErrShuttingDown
×
1123
        }
×
1124

1125
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
12✔
1126
                return err
×
1127
        }
×
1128

1129
        if failedRequest == nil {
12✔
1130
                return validate.ErrRequestNotSet
×
1131
        }
×
1132

1133
        if failedRequest.TaskToken == nil {
12✔
1134
                return validate.ErrTaskTokenNotSet
×
1135
        }
×
1136
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
12✔
1137
        if err != nil {
12✔
1138
                return err
×
1139
        }
×
1140
        if taskToken.DomainID == "" {
12✔
1141
                return validate.ErrDomainNotSet
×
1142
        }
×
1143

1144
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
12✔
1145
        if err != nil {
12✔
1146
                return err
×
1147
        }
×
1148

1149
        dw := domainWrapper{
12✔
1150
                domain: domainName,
12✔
1151
        }
12✔
1152
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
12✔
1153
        if !common.IsValidIDLength(
12✔
1154
                failedRequest.GetIdentity(),
12✔
1155
                scope,
12✔
1156
                wh.config.MaxIDLengthWarnLimit(),
12✔
1157
                wh.config.IdentityMaxLength(domainName),
12✔
1158
                metrics.CadenceErrIdentityExceededWarnLimit,
12✔
1159
                domainName,
12✔
1160
                wh.GetLogger(),
12✔
1161
                tag.IDTypeIdentity) {
12✔
1162
                return validate.ErrIdentityTooLong
×
1163
        }
×
1164

1165
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
12✔
1166
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
12✔
1167

12✔
1168
        if err := common.CheckEventBlobSizeLimit(
12✔
1169
                len(failedRequest.Details),
12✔
1170
                sizeLimitWarn,
12✔
1171
                sizeLimitError,
12✔
1172
                taskToken.DomainID,
12✔
1173
                taskToken.WorkflowID,
12✔
1174
                taskToken.RunID,
12✔
1175
                scope,
12✔
1176
                wh.GetThrottledLogger(),
12✔
1177
                tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
12✔
1178
        ); err != nil {
12✔
1179
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1180
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1181
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1182
        }
×
1183

1184
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
12✔
1185
                DomainUUID:    taskToken.DomainID,
12✔
1186
                FailedRequest: failedRequest,
12✔
1187
        })
12✔
1188
        if err != nil {
12✔
1189
                return wh.normalizeVersionedErrors(ctx, err)
×
1190
        }
×
1191
        return nil
12✔
1192
}
1193

1194
// RespondActivityTaskFailedByID - response to an activity task failure
1195
func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
1196
        ctx context.Context,
1197
        failedRequest *types.RespondActivityTaskFailedByIDRequest,
1198
) (retError error) {
×
1199
        if wh.isShuttingDown() {
×
1200
                return validate.ErrShuttingDown
×
1201
        }
×
1202

1203
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1204
                return err
×
1205
        }
×
1206

1207
        if failedRequest == nil {
×
1208
                return validate.ErrRequestNotSet
×
1209
        }
×
1210

1211
        domainName := failedRequest.GetDomain()
×
1212

×
1213
        if domainName == "" {
×
1214
                return validate.ErrDomainNotSet
×
1215
        }
×
1216
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1217
        if err != nil {
×
1218
                return err
×
1219
        }
×
1220
        workflowID := failedRequest.GetWorkflowID()
×
1221
        runID := failedRequest.GetRunID() // runID is optional so can be empty
×
1222
        activityID := failedRequest.GetActivityID()
×
1223

×
1224
        if domainID == "" {
×
1225
                return validate.ErrDomainNotSet
×
1226
        }
×
1227
        if workflowID == "" {
×
1228
                return validate.ErrWorkflowIDNotSet
×
1229
        }
×
1230
        if activityID == "" {
×
1231
                return validate.ErrActivityIDNotSet
×
1232
        }
×
1233

1234
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedByIDScope, failedRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1235
        if !common.IsValidIDLength(
×
1236
                failedRequest.GetIdentity(),
×
1237
                scope,
×
1238
                wh.config.MaxIDLengthWarnLimit(),
×
1239
                wh.config.IdentityMaxLength(failedRequest.GetDomain()),
×
1240
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1241
                domainName,
×
1242
                wh.GetLogger(),
×
1243
                tag.IDTypeIdentity) {
×
1244
                return validate.ErrIdentityTooLong
×
1245
        }
×
1246

1247
        taskToken := &common.TaskToken{
×
1248
                DomainID:   domainID,
×
1249
                RunID:      runID,
×
1250
                WorkflowID: workflowID,
×
1251
                ScheduleID: common.EmptyEventID,
×
1252
                ActivityID: activityID,
×
1253
        }
×
1254
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1255
        if err != nil {
×
1256
                return err
×
1257
        }
×
1258

1259
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1260
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1261

×
1262
        if err := common.CheckEventBlobSizeLimit(
×
1263
                len(failedRequest.Details),
×
1264
                sizeLimitWarn,
×
1265
                sizeLimitError,
×
1266
                taskToken.DomainID,
×
1267
                taskToken.WorkflowID,
×
1268
                taskToken.RunID,
×
1269
                scope,
×
1270
                wh.GetThrottledLogger(),
×
1271
                tag.BlobSizeViolationOperation("RespondActivityTaskFailedByID"),
×
1272
        ); err != nil {
×
1273
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1274
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1275
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1276
        }
×
1277

1278
        req := &types.RespondActivityTaskFailedRequest{
×
1279
                TaskToken: token,
×
1280
                Reason:    failedRequest.Reason,
×
1281
                Details:   failedRequest.Details,
×
1282
                Identity:  failedRequest.Identity,
×
1283
        }
×
1284

×
1285
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1286
                DomainUUID:    taskToken.DomainID,
×
1287
                FailedRequest: req,
×
1288
        })
×
1289
        if err != nil {
×
1290
                return wh.normalizeVersionedErrors(ctx, err)
×
1291
        }
×
1292
        return nil
×
1293
}
1294

1295
// RespondActivityTaskCanceled - called to cancel an activity task
1296
func (wh *WorkflowHandler) RespondActivityTaskCanceled(
1297
        ctx context.Context,
1298
        cancelRequest *types.RespondActivityTaskCanceledRequest,
1299
) (retError error) {
×
1300
        if wh.isShuttingDown() {
×
1301
                return validate.ErrShuttingDown
×
1302
        }
×
1303

1304
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1305
                return err
×
1306
        }
×
1307

1308
        if cancelRequest == nil {
×
1309
                return validate.ErrRequestNotSet
×
1310
        }
×
1311

1312
        if cancelRequest.TaskToken == nil {
×
1313
                return validate.ErrTaskTokenNotSet
×
1314
        }
×
1315

1316
        taskToken, err := wh.tokenSerializer.Deserialize(cancelRequest.TaskToken)
×
1317
        if err != nil {
×
1318
                return err
×
1319
        }
×
1320

1321
        if taskToken.DomainID == "" {
×
1322
                return validate.ErrDomainNotSet
×
1323
        }
×
1324

1325
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
×
1326
        if err != nil {
×
1327
                return err
×
1328
        }
×
1329

1330
        dw := domainWrapper{
×
1331
                domain: domainName,
×
1332
        }
×
1333
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1334
        if !common.IsValidIDLength(
×
1335
                cancelRequest.GetIdentity(),
×
1336
                scope,
×
1337
                wh.config.MaxIDLengthWarnLimit(),
×
1338
                wh.config.IdentityMaxLength(domainName),
×
1339
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1340
                domainName,
×
1341
                wh.GetLogger(),
×
1342
                tag.IDTypeIdentity) {
×
1343
                return validate.ErrIdentityTooLong
×
1344
        }
×
1345

1346
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1347
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1348

×
1349
        if err := common.CheckEventBlobSizeLimit(
×
1350
                len(cancelRequest.Details),
×
1351
                sizeLimitWarn,
×
1352
                sizeLimitError,
×
1353
                taskToken.DomainID,
×
1354
                taskToken.WorkflowID,
×
1355
                taskToken.RunID,
×
1356
                scope,
×
1357
                wh.GetThrottledLogger(),
×
1358
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceled"),
×
1359
        ); err != nil {
×
1360
                // details exceeds blob size limit, we would record it as failure
×
1361
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1362
                        TaskToken: cancelRequest.TaskToken,
×
1363
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1364
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1365
                        Identity:  cancelRequest.Identity,
×
1366
                }
×
1367
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1368
                        DomainUUID:    taskToken.DomainID,
×
1369
                        FailedRequest: failRequest,
×
1370
                })
×
1371
                if err != nil {
×
1372
                        return wh.normalizeVersionedErrors(ctx, err)
×
1373
                }
×
1374
        } else {
×
1375
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1376
                        DomainUUID:    taskToken.DomainID,
×
1377
                        CancelRequest: cancelRequest,
×
1378
                })
×
1379
                if err != nil {
×
1380
                        return wh.normalizeVersionedErrors(ctx, err)
×
1381
                }
×
1382
        }
1383

1384
        return nil
×
1385
}
1386

1387
// RespondActivityTaskCanceledByID - called to cancel an activity task
1388
func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
1389
        ctx context.Context,
1390
        cancelRequest *types.RespondActivityTaskCanceledByIDRequest,
1391
) (retError error) {
×
1392
        if wh.isShuttingDown() {
×
1393
                return validate.ErrShuttingDown
×
1394
        }
×
1395

1396
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1397
                return err
×
1398
        }
×
1399

1400
        if cancelRequest == nil {
×
1401
                return validate.ErrRequestNotSet
×
1402
        }
×
1403

1404
        domainName := cancelRequest.GetDomain()
×
1405
        if domainName == "" {
×
1406
                return validate.ErrDomainNotSet
×
1407
        }
×
1408
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1409
        if err != nil {
×
1410
                return err
×
1411
        }
×
1412
        workflowID := cancelRequest.GetWorkflowID()
×
1413
        runID := cancelRequest.GetRunID() // runID is optional so can be empty
×
1414
        activityID := cancelRequest.GetActivityID()
×
1415

×
1416
        if domainID == "" {
×
1417
                return validate.ErrDomainNotSet
×
1418
        }
×
1419
        if workflowID == "" {
×
1420
                return validate.ErrWorkflowIDNotSet
×
1421
        }
×
1422
        if activityID == "" {
×
1423
                return validate.ErrActivityIDNotSet
×
1424
        }
×
1425

1426
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledByIDScope, cancelRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1427
        if !common.IsValidIDLength(
×
1428
                cancelRequest.GetIdentity(),
×
1429
                scope,
×
1430
                wh.config.MaxIDLengthWarnLimit(),
×
1431
                wh.config.IdentityMaxLength(cancelRequest.GetDomain()),
×
1432
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1433
                domainName,
×
1434
                wh.GetLogger(),
×
1435
                tag.IDTypeIdentity) {
×
1436
                return validate.ErrIdentityTooLong
×
1437
        }
×
1438

1439
        taskToken := &common.TaskToken{
×
1440
                DomainID:   domainID,
×
1441
                RunID:      runID,
×
1442
                WorkflowID: workflowID,
×
1443
                ScheduleID: common.EmptyEventID,
×
1444
                ActivityID: activityID,
×
1445
        }
×
1446
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1447
        if err != nil {
×
1448
                return err
×
1449
        }
×
1450

1451
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1452
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1453

×
1454
        if err := common.CheckEventBlobSizeLimit(
×
1455
                len(cancelRequest.Details),
×
1456
                sizeLimitWarn,
×
1457
                sizeLimitError,
×
1458
                taskToken.DomainID,
×
1459
                taskToken.WorkflowID,
×
1460
                taskToken.RunID,
×
1461
                scope,
×
1462
                wh.GetThrottledLogger(),
×
1463
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceledByID"),
×
1464
        ); err != nil {
×
1465
                // details exceeds blob size limit, we would record it as failure
×
1466
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1467
                        TaskToken: token,
×
1468
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1469
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1470
                        Identity:  cancelRequest.Identity,
×
1471
                }
×
1472
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1473
                        DomainUUID:    taskToken.DomainID,
×
1474
                        FailedRequest: failRequest,
×
1475
                })
×
1476
                if err != nil {
×
1477
                        return wh.normalizeVersionedErrors(ctx, err)
×
1478
                }
×
1479
        } else {
×
1480
                req := &types.RespondActivityTaskCanceledRequest{
×
1481
                        TaskToken: token,
×
1482
                        Details:   cancelRequest.Details,
×
1483
                        Identity:  cancelRequest.Identity,
×
1484
                }
×
1485

×
1486
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1487
                        DomainUUID:    taskToken.DomainID,
×
1488
                        CancelRequest: req,
×
1489
                })
×
1490
                if err != nil {
×
1491
                        return wh.normalizeVersionedErrors(ctx, err)
×
1492
                }
×
1493
        }
1494

1495
        return nil
×
1496
}
1497

1498
// RespondDecisionTaskCompleted - response to a decision task
1499
func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
1500
        ctx context.Context,
1501
        completeRequest *types.RespondDecisionTaskCompletedRequest,
1502
) (resp *types.RespondDecisionTaskCompletedResponse, retError error) {
930✔
1503
        if wh.isShuttingDown() {
930✔
1504
                return nil, validate.ErrShuttingDown
×
1505
        }
×
1506

1507
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
930✔
1508
                return nil, err
×
1509
        }
×
1510

1511
        if completeRequest == nil {
930✔
1512
                return nil, validate.ErrRequestNotSet
×
1513
        }
×
1514

1515
        if completeRequest.TaskToken == nil {
930✔
1516
                return nil, validate.ErrTaskTokenNotSet
×
1517
        }
×
1518
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
930✔
1519
        if err != nil {
930✔
1520
                return nil, err
×
1521
        }
×
1522
        if taskToken.DomainID == "" {
930✔
1523
                return nil, validate.ErrDomainNotSet
×
1524
        }
×
1525

1526
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
930✔
1527
        if err != nil {
930✔
1528
                return nil, err
×
1529
        }
×
1530

1531
        dw := domainWrapper{
930✔
1532
                domain: domainName,
930✔
1533
        }
930✔
1534
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
930✔
1535
        if !common.IsValidIDLength(
930✔
1536
                completeRequest.GetIdentity(),
930✔
1537
                scope,
930✔
1538
                wh.config.MaxIDLengthWarnLimit(),
930✔
1539
                wh.config.IdentityMaxLength(domainName),
930✔
1540
                metrics.CadenceErrIdentityExceededWarnLimit,
930✔
1541
                domainName,
930✔
1542
                wh.GetLogger(),
930✔
1543
                tag.IDTypeIdentity) {
930✔
1544
                return nil, validate.ErrIdentityTooLong
×
1545
        }
×
1546

1547
        if err := common.CheckDecisionResultLimit(
930✔
1548
                len(completeRequest.Decisions),
930✔
1549
                wh.config.DecisionResultCountLimit(domainName),
930✔
1550
                scope); err != nil {
930✔
1551
                return nil, err
×
1552
        }
×
1553

1554
        histResp, err := wh.GetHistoryClient().RespondDecisionTaskCompleted(ctx, &types.HistoryRespondDecisionTaskCompletedRequest{
930✔
1555
                DomainUUID:      taskToken.DomainID,
930✔
1556
                CompleteRequest: completeRequest},
930✔
1557
        )
930✔
1558
        if err != nil {
939✔
1559
                return nil, wh.normalizeVersionedErrors(ctx, err)
9✔
1560
        }
9✔
1561

1562
        completedResp := &types.RespondDecisionTaskCompletedResponse{}
921✔
1563
        completedResp.ActivitiesToDispatchLocally = histResp.ActivitiesToDispatchLocally
921✔
1564
        if completeRequest.GetReturnNewDecisionTask() && histResp != nil && histResp.StartedResponse != nil {
981✔
1565
                taskToken := &common.TaskToken{
60✔
1566
                        DomainID:        taskToken.DomainID,
60✔
1567
                        WorkflowID:      taskToken.WorkflowID,
60✔
1568
                        RunID:           taskToken.RunID,
60✔
1569
                        ScheduleID:      histResp.StartedResponse.GetScheduledEventID(),
60✔
1570
                        ScheduleAttempt: histResp.StartedResponse.GetAttempt(),
60✔
1571
                }
60✔
1572
                token, _ := wh.tokenSerializer.Serialize(taskToken)
60✔
1573
                workflowExecution := &types.WorkflowExecution{
60✔
1574
                        WorkflowID: taskToken.WorkflowID,
60✔
1575
                        RunID:      taskToken.RunID,
60✔
1576
                }
60✔
1577
                matchingResp := common.CreateMatchingPollForDecisionTaskResponse(histResp.StartedResponse, workflowExecution, token)
60✔
1578

60✔
1579
                newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, scope, taskToken.DomainID, matchingResp, matchingResp.GetBranchToken())
60✔
1580
                if err != nil {
60✔
1581
                        return nil, err
×
1582
                }
×
1583
                completedResp.DecisionTask = newDecisionTask
60✔
1584
        }
1585

1586
        return completedResp, nil
921✔
1587
}
1588

1589
// RespondDecisionTaskFailed - failed response to a decision task
1590
func (wh *WorkflowHandler) RespondDecisionTaskFailed(
1591
        ctx context.Context,
1592
        failedRequest *types.RespondDecisionTaskFailedRequest,
1593
) (retError error) {
159✔
1594
        if wh.isShuttingDown() {
159✔
1595
                return validate.ErrShuttingDown
×
1596
        }
×
1597

1598
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
159✔
1599
                return err
×
1600
        }
×
1601

1602
        if failedRequest == nil {
159✔
1603
                return validate.ErrRequestNotSet
×
1604
        }
×
1605

1606
        if failedRequest.TaskToken == nil {
159✔
1607
                return validate.ErrTaskTokenNotSet
×
1608
        }
×
1609
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
159✔
1610
        if err != nil {
159✔
1611
                return err
×
1612
        }
×
1613
        if taskToken.DomainID == "" {
159✔
1614
                return validate.ErrDomainNotSet
×
1615
        }
×
1616

1617
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
159✔
1618
        if err != nil {
159✔
1619
                return err
×
1620
        }
×
1621

1622
        dw := domainWrapper{
159✔
1623
                domain: domainName,
159✔
1624
        }
159✔
1625
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
159✔
1626
        if !common.IsValidIDLength(
159✔
1627
                failedRequest.GetIdentity(),
159✔
1628
                scope,
159✔
1629
                wh.config.MaxIDLengthWarnLimit(),
159✔
1630
                wh.config.IdentityMaxLength(domainName),
159✔
1631
                metrics.CadenceErrIdentityExceededWarnLimit,
159✔
1632
                domainName,
159✔
1633
                wh.GetLogger(),
159✔
1634
                tag.IDTypeIdentity) {
159✔
1635
                return validate.ErrIdentityTooLong
×
1636
        }
×
1637

1638
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
159✔
1639
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
159✔
1640

159✔
1641
        if err := common.CheckEventBlobSizeLimit(
159✔
1642
                len(failedRequest.Details),
159✔
1643
                sizeLimitWarn,
159✔
1644
                sizeLimitError,
159✔
1645
                taskToken.DomainID,
159✔
1646
                taskToken.WorkflowID,
159✔
1647
                taskToken.RunID,
159✔
1648
                scope,
159✔
1649
                wh.GetThrottledLogger(),
159✔
1650
                tag.BlobSizeViolationOperation("RespondDecisionTaskFailed"),
159✔
1651
        ); err != nil {
159✔
1652
                // details exceed, we would just truncate the size for decision task failed as the details is not used anywhere by client code
×
1653
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1654
        }
×
1655

1656
        err = wh.GetHistoryClient().RespondDecisionTaskFailed(ctx, &types.HistoryRespondDecisionTaskFailedRequest{
159✔
1657
                DomainUUID:    taskToken.DomainID,
159✔
1658
                FailedRequest: failedRequest,
159✔
1659
        })
159✔
1660
        if err != nil {
159✔
1661
                return wh.normalizeVersionedErrors(ctx, err)
×
1662
        }
×
1663
        return nil
159✔
1664
}
1665

1666
// RespondQueryTaskCompleted - response to a query task
1667
func (wh *WorkflowHandler) RespondQueryTaskCompleted(
1668
        ctx context.Context,
1669
        completeRequest *types.RespondQueryTaskCompletedRequest,
1670
) (retError error) {
30✔
1671
        if wh.isShuttingDown() {
30✔
1672
                return validate.ErrShuttingDown
×
1673
        }
×
1674

1675
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
30✔
1676
                return err
×
1677
        }
×
1678

1679
        if completeRequest == nil {
30✔
1680
                return validate.ErrRequestNotSet
×
1681
        }
×
1682

1683
        if completeRequest.TaskToken == nil {
30✔
1684
                return validate.ErrTaskTokenNotSet
×
1685
        }
×
1686
        queryTaskToken, err := wh.tokenSerializer.DeserializeQueryTaskToken(completeRequest.TaskToken)
30✔
1687
        if err != nil {
30✔
1688
                return err
×
1689
        }
×
1690
        if queryTaskToken.DomainID == "" || queryTaskToken.TaskList == "" || queryTaskToken.TaskID == "" {
30✔
1691
                return validate.ErrInvalidTaskToken
×
1692
        }
×
1693

1694
        domainName, err := wh.GetDomainCache().GetDomainName(queryTaskToken.DomainID)
30✔
1695
        if err != nil {
30✔
1696
                return err
×
1697
        }
×
1698

1699
        dw := domainWrapper{
30✔
1700
                domain: domainName,
30✔
1701
        }
30✔
1702
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
30✔
1703
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
30✔
1704

30✔
1705
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondQueryTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
30✔
1706
        if err := common.CheckEventBlobSizeLimit(
30✔
1707
                len(completeRequest.GetQueryResult()),
30✔
1708
                sizeLimitWarn,
30✔
1709
                sizeLimitError,
30✔
1710
                queryTaskToken.DomainID,
30✔
1711
                "",
30✔
1712
                "",
30✔
1713
                scope,
30✔
1714
                wh.GetThrottledLogger(),
30✔
1715
                tag.BlobSizeViolationOperation("RespondQueryTaskCompleted"),
30✔
1716
        ); err != nil {
30✔
1717
                completeRequest = &types.RespondQueryTaskCompletedRequest{
×
1718
                        TaskToken:     completeRequest.TaskToken,
×
1719
                        CompletedType: types.QueryTaskCompletedTypeFailed.Ptr(),
×
1720
                        QueryResult:   nil,
×
1721
                        ErrorMessage:  err.Error(),
×
1722
                }
×
1723
        }
×
1724

1725
        call := yarpc.CallFromContext(ctx)
30✔
1726

30✔
1727
        completeRequest.WorkerVersionInfo = &types.WorkerVersionInfo{
30✔
1728
                Impl:           call.Header(common.ClientImplHeaderName),
30✔
1729
                FeatureVersion: call.Header(common.FeatureVersionHeaderName),
30✔
1730
        }
30✔
1731
        matchingRequest := &types.MatchingRespondQueryTaskCompletedRequest{
30✔
1732
                DomainUUID:       queryTaskToken.DomainID,
30✔
1733
                TaskList:         &types.TaskList{Name: queryTaskToken.TaskList},
30✔
1734
                TaskID:           queryTaskToken.TaskID,
30✔
1735
                CompletedRequest: completeRequest,
30✔
1736
        }
30✔
1737

30✔
1738
        err = wh.GetMatchingClient().RespondQueryTaskCompleted(ctx, matchingRequest)
30✔
1739
        if err != nil {
30✔
1740
                return err
×
1741
        }
×
1742
        return nil
30✔
1743
}
1744

1745
func (wh *WorkflowHandler) StartWorkflowExecutionAsync(
1746
        ctx context.Context,
1747
        startRequest *types.StartWorkflowExecutionAsyncRequest,
1748
) (resp *types.StartWorkflowExecutionAsyncResponse, retError error) {
3✔
1749
        if wh.isShuttingDown() {
3✔
1750
                return nil, validate.ErrShuttingDown
×
1751
        }
×
1752
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionAsyncScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
3✔
1753
        // validate request before pushing to queue
3✔
1754
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest.StartWorkflowExecutionRequest, scope)
3✔
1755
        if err != nil {
3✔
1756
                return nil, err
×
1757
        }
×
1758

1759
        producer, err := wh.producerManager.GetProducerByDomain(startRequest.GetDomain())
3✔
1760
        if err != nil {
4✔
1761
                return nil, err
1✔
1762
        }
1✔
1763
        // serialize the message to be sent to the queue
1764
        payload, err := json.Marshal(startRequest)
2✔
1765
        if err != nil {
2✔
1766
                return nil, err
×
1767
        }
×
1768
        // propagate the headers from the context to the message
1769
        clientHeaders := common.GetClientHeaders(ctx)
2✔
1770
        header := &shared.Header{
2✔
1771
                Fields: map[string][]byte{},
2✔
1772
        }
2✔
1773
        for k, v := range clientHeaders {
12✔
1774
                header.Fields[k] = []byte(v)
10✔
1775
        }
10✔
1776
        messageType := sqlblobs.AsyncRequestTypeStartWorkflowExecutionAsyncRequest
2✔
1777
        message := &sqlblobs.AsyncRequestMessage{
2✔
1778
                PartitionKey: common.StringPtr(startRequest.GetWorkflowID()),
2✔
1779
                Type:         &messageType,
2✔
1780
                Header:       header,
2✔
1781
                Encoding:     common.StringPtr(string(common.EncodingTypeJSON)),
2✔
1782
                Payload:      payload,
2✔
1783
        }
2✔
1784
        err = producer.Publish(ctx, message)
2✔
1785
        if err != nil {
3✔
1786
                return nil, err
1✔
1787
        }
1✔
1788
        return &types.StartWorkflowExecutionAsyncResponse{}, nil
1✔
1789
}
1790

1791
// StartWorkflowExecution - Creates a new workflow execution
1792
func (wh *WorkflowHandler) StartWorkflowExecution(
1793
        ctx context.Context,
1794
        startRequest *types.StartWorkflowExecutionRequest,
1795
) (resp *types.StartWorkflowExecutionResponse, retError error) {
455✔
1796
        if wh.isShuttingDown() {
455✔
1797
                return nil, validate.ErrShuttingDown
×
1798
        }
×
1799
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
455✔
1800
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest, scope)
455✔
1801
        if err != nil {
466✔
1802
                return nil, err
11✔
1803
        }
11✔
1804
        domainName := startRequest.GetDomain()
444✔
1805
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
444✔
1806
        if err != nil {
444✔
1807
                return nil, err
×
1808
        }
×
1809
        wh.GetLogger().Debug("Start workflow execution request domainID", tag.WorkflowDomainID(domainID))
444✔
1810
        historyRequest, err := common.CreateHistoryStartWorkflowRequest(
444✔
1811
                domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
444✔
1812
        if err != nil {
444✔
1813
                return nil, err
×
1814
        }
×
1815

1816
        resp, err = wh.GetHistoryClient().StartWorkflowExecution(ctx, historyRequest)
444✔
1817
        if err != nil {
462✔
1818
                return nil, err
18✔
1819
        }
18✔
1820
        return resp, nil
426✔
1821
}
1822

1823
func (wh *WorkflowHandler) validateStartWorkflowExecutionRequest(ctx context.Context, startRequest *types.StartWorkflowExecutionRequest, scope metrics.Scope) error {
458✔
1824
        if startRequest == nil {
459✔
1825
                return validate.ErrRequestNotSet
1✔
1826
        }
1✔
1827
        domainName := startRequest.GetDomain()
457✔
1828
        if domainName == "" {
458✔
1829
                return validate.ErrDomainNotSet
1✔
1830
        }
1✔
1831
        if startRequest.GetWorkflowID() == "" {
457✔
1832
                return validate.ErrWorkflowIDNotSet
1✔
1833
        }
1✔
1834
        if _, err := uuid.Parse(startRequest.RequestID); err != nil {
457✔
1835
                return &types.BadRequestError{Message: fmt.Sprintf("requestId %q is not a valid UUID", startRequest.RequestID)}
2✔
1836
        }
2✔
1837
        if startRequest.WorkflowType == nil || startRequest.WorkflowType.GetName() == "" {
454✔
1838
                return validate.ErrWorkflowTypeNotSet
1✔
1839
        }
1✔
1840
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
452✔
1841
                return err
×
1842
        }
×
1843
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
452✔
1844
        if !common.IsValidIDLength(
452✔
1845
                domainName,
452✔
1846
                scope,
452✔
1847
                idLengthWarnLimit,
452✔
1848
                wh.config.DomainNameMaxLength(domainName),
452✔
1849
                metrics.CadenceErrDomainNameExceededWarnLimit,
452✔
1850
                domainName,
452✔
1851
                wh.GetLogger(),
452✔
1852
                tag.IDTypeDomainName) {
452✔
1853
                return validate.ErrDomainTooLong
×
1854
        }
×
1855
        if !common.IsValidIDLength(
452✔
1856
                startRequest.GetWorkflowID(),
452✔
1857
                scope,
452✔
1858
                idLengthWarnLimit,
452✔
1859
                wh.config.WorkflowIDMaxLength(domainName),
452✔
1860
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
452✔
1861
                domainName,
452✔
1862
                wh.GetLogger(),
452✔
1863
                tag.IDTypeWorkflowID) {
452✔
1864
                return validate.ErrWorkflowIDTooLong
×
1865
        }
×
1866
        if err := common.ValidateRetryPolicy(startRequest.RetryPolicy); err != nil {
452✔
1867
                return err
×
1868
        }
×
1869
        wh.GetLogger().Debug(
452✔
1870
                "Received StartWorkflowExecution. WorkflowID",
452✔
1871
                tag.WorkflowID(startRequest.GetWorkflowID()))
452✔
1872
        if !common.IsValidIDLength(
452✔
1873
                startRequest.WorkflowType.GetName(),
452✔
1874
                scope,
452✔
1875
                idLengthWarnLimit,
452✔
1876
                wh.config.WorkflowTypeMaxLength(domainName),
452✔
1877
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
452✔
1878
                domainName,
452✔
1879
                wh.GetLogger(),
452✔
1880
                tag.IDTypeWorkflowType) {
452✔
1881
                return validate.ErrWorkflowTypeTooLong
×
1882
        }
×
1883
        if err := wh.validateTaskList(startRequest.TaskList, scope, domainName); err != nil {
453✔
1884
                return err
1✔
1885
        }
1✔
1886
        if startRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
452✔
1887
                return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
1✔
1888
        }
1✔
1889
        if startRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
451✔
1890
                return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
1✔
1891
        }
1✔
1892
        if startRequest.GetDelayStartSeconds() < 0 {
450✔
1893
                return validate.ErrInvalidDelayStartSeconds
1✔
1894
        }
1✔
1895
        if startRequest.GetJitterStartSeconds() < 0 {
448✔
1896
                return validate.ErrInvalidJitterStartSeconds
×
1897
        }
×
1898
        jitter := startRequest.GetJitterStartSeconds()
448✔
1899
        cron := startRequest.GetCronSchedule()
448✔
1900
        if cron != "" {
466✔
1901
                if _, err := backoff.ValidateSchedule(startRequest.GetCronSchedule()); err != nil {
18✔
1902
                        return err
×
1903
                }
×
1904
        }
1905
        if jitter > 0 && cron != "" {
448✔
1906
                // Calculate the cron duration and ensure that jitter is not greater than the cron duration,
×
1907
                // because that would be confusing to users.
×
1908

×
1909
                // Request using start/end time zero value, which will get us an exact answer (i.e. its not in the
×
1910
                // middle of a minute)
×
1911
                backoffSeconds, err := backoff.GetBackoffForNextScheduleInSeconds(cron, time.Time{}, time.Time{}, jitter)
×
1912
                if err != nil {
×
1913
                        return err
×
1914
                }
×
1915
                if jitter > backoffSeconds {
×
1916
                        return validate.ErrInvalidJitterStartSeconds2
×
1917
                }
×
1918
        }
1919
        if !common.IsValidIDLength(
448✔
1920
                startRequest.GetRequestID(),
448✔
1921
                scope,
448✔
1922
                idLengthWarnLimit,
448✔
1923
                wh.config.RequestIDMaxLength(domainName),
448✔
1924
                metrics.CadenceErrRequestIDExceededWarnLimit,
448✔
1925
                domainName,
448✔
1926
                wh.GetLogger(),
448✔
1927
                tag.IDTypeRequestID) {
448✔
1928
                return validate.ErrRequestIDTooLong
×
1929
        }
×
1930
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(startRequest.SearchAttributes, domainName); err != nil {
448✔
1931
                return err
×
1932
        }
×
1933
        wh.GetLogger().Debug("Start workflow execution request domain", tag.WorkflowDomainName(domainName))
448✔
1934
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
448✔
1935
        if err != nil {
448✔
1936
                return err
×
1937
        }
×
1938
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
448✔
1939
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
448✔
1940
        actualSize := len(startRequest.Input)
448✔
1941
        if startRequest.Memo != nil {
460✔
1942
                actualSize += common.GetSizeOfMapStringToByteArray(startRequest.Memo.GetFields())
12✔
1943
        }
12✔
1944
        if err := common.CheckEventBlobSizeLimit(
448✔
1945
                actualSize,
448✔
1946
                sizeLimitWarn,
448✔
1947
                sizeLimitError,
448✔
1948
                domainID,
448✔
1949
                startRequest.GetWorkflowID(),
448✔
1950
                "",
448✔
1951
                scope,
448✔
1952
                wh.GetThrottledLogger(),
448✔
1953
                tag.BlobSizeViolationOperation("StartWorkflowExecution"),
448✔
1954
        ); err != nil {
448✔
1955
                return err
×
1956
        }
×
1957
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
448✔
1958
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
449✔
1959
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
1960
        }
1✔
1961
        return nil
447✔
1962
}
1963

1964
// GetWorkflowExecutionHistory - retrieves the history of workflow execution
1965
func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
1966
        ctx context.Context,
1967
        getRequest *types.GetWorkflowExecutionHistoryRequest,
1968
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
460✔
1969
        if wh.isShuttingDown() {
460✔
1970
                return nil, validate.ErrShuttingDown
×
1971
        }
×
1972

1973
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
460✔
1974
                return nil, err
×
1975
        }
×
1976

1977
        if getRequest == nil {
460✔
1978
                return nil, validate.ErrRequestNotSet
×
1979
        }
×
1980

1981
        domainName := getRequest.GetDomain()
460✔
1982
        wfExecution := getRequest.GetExecution()
460✔
1983

460✔
1984
        if domainName == "" {
460✔
1985
                return nil, validate.ErrDomainNotSet
×
1986
        }
×
1987

1988
        if err := validate.CheckExecution(wfExecution); err != nil {
460✔
1989
                return nil, err
×
1990
        }
×
1991

1992
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
460✔
1993
        if err != nil {
460✔
1994
                return nil, err
×
1995
        }
×
1996

1997
        if getRequest.GetMaximumPageSize() <= 0 {
788✔
1998
                getRequest.MaximumPageSize = int32(wh.config.HistoryMaxPageSize(getRequest.GetDomain()))
328✔
1999
        }
328✔
2000
        // force limit page size if exceed
2001
        if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize {
460✔
2002
                wh.GetThrottledLogger().Warn("GetHistory page size is larger than threshold",
×
2003
                        tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2004
                        tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2005
                        tag.WorkflowDomainID(domainID),
×
2006
                        tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))
×
2007
                getRequest.MaximumPageSize = common.GetHistoryMaxPageSize
×
2008
        }
×
2009

2010
        scope := getMetricsScopeWithDomain(metrics.FrontendGetWorkflowExecutionHistoryScope, getRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
460✔
2011
        if !getRequest.GetSkipArchival() {
902✔
2012
                enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled()
442✔
2013
                historyArchived := wh.historyArchived(ctx, getRequest, domainID)
442✔
2014
                if enableArchivalRead && historyArchived {
466✔
2015
                        return wh.getArchivedHistory(ctx, getRequest, domainID)
24✔
2016
                }
24✔
2017
        }
2018

2019
        // this function return the following 6 things,
2020
        // 1. branch token
2021
        // 2. the workflow run ID
2022
        // 3. the last first event ID (the event ID of the last batch of events in the history)
2023
        // 4. the next event ID
2024
        // 5. whether the workflow is closed
2025
        // 6. error if any
2026
        queryHistory := func(
436✔
2027
                domainUUID string,
436✔
2028
                execution *types.WorkflowExecution,
436✔
2029
                expectedNextEventID int64,
436✔
2030
                currentBranchToken []byte,
436✔
2031
        ) ([]byte, string, int64, int64, bool, error) {
828✔
2032
                response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
392✔
2033
                        DomainUUID:          domainUUID,
392✔
2034
                        Execution:           execution,
392✔
2035
                        ExpectedNextEventID: expectedNextEventID,
392✔
2036
                        CurrentBranchToken:  currentBranchToken,
392✔
2037
                })
392✔
2038

392✔
2039
                if err != nil {
392✔
2040
                        return nil, "", 0, 0, false, err
×
2041
                }
×
2042
                isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
392✔
2043

392✔
2044
                return response.CurrentBranchToken,
392✔
2045
                        response.Execution.GetRunID(),
392✔
2046
                        response.GetLastFirstEventID(),
392✔
2047
                        response.GetNextEventID(),
392✔
2048
                        isWorkflowRunning,
392✔
2049
                        nil
392✔
2050
        }
2051

2052
        isLongPoll := getRequest.GetWaitForNewEvent()
436✔
2053
        isCloseEventOnly := getRequest.GetHistoryEventFilterType() == types.HistoryEventFilterTypeCloseEvent
436✔
2054
        execution := getRequest.Execution
436✔
2055
        token := &getHistoryContinuationToken{}
436✔
2056

436✔
2057
        var runID string
436✔
2058
        lastFirstEventID := common.FirstEventID
436✔
2059
        var nextEventID int64
436✔
2060
        var isWorkflowRunning bool
436✔
2061

436✔
2062
        // process the token for paging
436✔
2063
        queryNextEventID := common.EndEventID
436✔
2064
        if getRequest.NextPageToken != nil {
480✔
2065
                token, err = deserializeHistoryToken(getRequest.NextPageToken)
44✔
2066
                if err != nil {
44✔
2067
                        return nil, validate.ErrInvalidNextPageToken
×
2068
                }
×
2069
                if execution.RunID != "" && execution.GetRunID() != token.RunID {
44✔
2070
                        return nil, validate.ErrNextPageTokenRunIDMismatch
×
2071
                }
×
2072

2073
                execution.RunID = token.RunID
44✔
2074

44✔
2075
                // we need to update the current next event ID and whether workflow is running
44✔
2076
                if len(token.PersistenceToken) == 0 && isLongPoll && token.IsWorkflowRunning {
44✔
2077
                        logger := wh.GetLogger().WithTags(
×
2078
                                tag.WorkflowDomainName(getRequest.GetDomain()),
×
2079
                                tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2080
                                tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2081
                        )
×
2082
                        // TODO: for now we only log the invalid timeout (this is done inside the helper function) in case
×
2083
                        // this change breaks existing customers. Once we are sure no one is calling this API with very short timeout
×
2084
                        // we can return the error.
×
2085
                        _ = common.ValidateLongPollContextTimeout(ctx, "GetWorkflowExecutionHistory", logger)
×
2086

×
2087
                        if !isCloseEventOnly {
×
2088
                                queryNextEventID = token.NextEventID
×
2089
                        }
×
2090
                        token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
×
2091
                                queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
×
2092
                        if err != nil {
×
2093
                                return nil, err
×
2094
                        }
×
2095
                        token.FirstEventID = token.NextEventID
×
2096
                        token.NextEventID = nextEventID
×
2097
                        token.IsWorkflowRunning = isWorkflowRunning
×
2098
                }
2099
        } else {
392✔
2100
                if !isCloseEventOnly {
766✔
2101
                        queryNextEventID = common.FirstEventID
374✔
2102
                }
374✔
2103
                token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
392✔
2104
                        queryHistory(domainID, execution, queryNextEventID, nil)
392✔
2105
                if err != nil {
392✔
2106
                        return nil, err
×
2107
                }
×
2108

2109
                execution.RunID = runID
392✔
2110

392✔
2111
                token.RunID = runID
392✔
2112
                token.FirstEventID = common.FirstEventID
392✔
2113
                token.NextEventID = nextEventID
392✔
2114
                token.IsWorkflowRunning = isWorkflowRunning
392✔
2115
                token.PersistenceToken = nil
392✔
2116
        }
2117

2118
        call := yarpc.CallFromContext(ctx)
436✔
2119
        clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
436✔
2120
        clientImpl := call.Header(common.ClientImplHeaderName)
436✔
2121
        supportsRawHistoryQuery := wh.versionChecker.SupportsRawHistoryQuery(clientImpl, clientFeatureVersion) == nil
436✔
2122
        isRawHistoryEnabled := wh.config.SendRawWorkflowHistory(domainName) && supportsRawHistoryQuery
436✔
2123

436✔
2124
        history := &types.History{}
436✔
2125
        history.Events = []*types.HistoryEvent{}
436✔
2126
        var historyBlob []*types.DataBlob
436✔
2127

436✔
2128
        // helper function to just getHistory
436✔
2129
        getHistory := func(firstEventID, nextEventID int64, nextPageToken []byte) error {
872✔
2130
                if isRawHistoryEnabled {
438✔
2131
                        historyBlob, token.PersistenceToken, err = wh.getRawHistory(
2✔
2132
                                ctx,
2✔
2133
                                scope,
2✔
2134
                                domainID,
2✔
2135
                                domainName,
2✔
2136
                                *execution,
2✔
2137
                                firstEventID,
2✔
2138
                                nextEventID,
2✔
2139
                                getRequest.GetMaximumPageSize(),
2✔
2140
                                nextPageToken,
2✔
2141
                                token.TransientDecision,
2✔
2142
                                token.BranchToken,
2✔
2143
                        )
2✔
2144
                } else {
436✔
2145
                        history, token.PersistenceToken, err = wh.getHistory(
434✔
2146
                                ctx,
434✔
2147
                                scope,
434✔
2148
                                domainID,
434✔
2149
                                domainName,
434✔
2150
                                *execution,
434✔
2151
                                firstEventID,
434✔
2152
                                nextEventID,
434✔
2153
                                getRequest.GetMaximumPageSize(),
434✔
2154
                                nextPageToken,
434✔
2155
                                token.TransientDecision,
434✔
2156
                                token.BranchToken,
434✔
2157
                        )
434✔
2158
                }
434✔
2159
                if err != nil {
436✔
2160
                        return err
×
2161
                }
×
2162
                return nil
436✔
2163
        }
2164

2165
        if isCloseEventOnly {
454✔
2166
                if !isWorkflowRunning {
36✔
2167
                        if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil {
18✔
2168
                                return nil, err
×
2169
                        }
×
2170
                        if isRawHistoryEnabled {
18✔
2171
                                // since getHistory func will not return empty history, so the below is safe
×
2172
                                historyBlob = historyBlob[len(historyBlob)-1:]
×
2173
                        } else {
18✔
2174
                                // since getHistory func will not return empty history, so the below is safe
18✔
2175
                                history.Events = history.Events[len(history.Events)-1:]
18✔
2176
                        }
18✔
2177
                        token = nil
18✔
2178
                } else if isLongPoll {
×
2179
                        // set the persistence token to be nil so next time we will query history for updates
×
2180
                        token.PersistenceToken = nil
×
2181
                } else {
×
2182
                        token = nil
×
2183
                }
×
2184
        } else {
418✔
2185
                // return all events
418✔
2186
                if token.FirstEventID >= token.NextEventID {
418✔
2187
                        // currently there is no new event
×
2188
                        history.Events = []*types.HistoryEvent{}
×
2189
                        if !isWorkflowRunning {
×
2190
                                token = nil
×
2191
                        }
×
2192
                } else {
418✔
2193
                        if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil {
418✔
2194
                                return nil, err
×
2195
                        }
×
2196
                        // here, for long pull on history events, we need to intercept the paging token from cassandra
2197
                        // and do something clever
2198
                        if len(token.PersistenceToken) == 0 && (!token.IsWorkflowRunning || !isLongPoll) {
794✔
2199
                                // meaning, there is no more history to be returned
376✔
2200
                                token = nil
376✔
2201
                        }
376✔
2202
                }
2203
        }
2204

2205
        nextToken, err := serializeHistoryToken(token)
436✔
2206
        if err != nil {
436✔
2207
                return nil, err
×
2208
        }
×
2209
        return &types.GetWorkflowExecutionHistoryResponse{
436✔
2210
                History:       history,
436✔
2211
                RawHistory:    historyBlob,
436✔
2212
                NextPageToken: nextToken,
436✔
2213
                Archived:      false,
436✔
2214
        }, nil
436✔
2215
}
2216

2217
// SignalWorkflowExecution is used to send a signal event to running workflow execution.  This results in
2218
// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
2219
func (wh *WorkflowHandler) SignalWorkflowExecution(
2220
        ctx context.Context,
2221
        signalRequest *types.SignalWorkflowExecutionRequest,
2222
) (retError error) {
723✔
2223
        if wh.isShuttingDown() {
723✔
2224
                return validate.ErrShuttingDown
×
2225
        }
×
2226

2227
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
723✔
2228
                return err
×
2229
        }
×
2230

2231
        if signalRequest == nil {
723✔
2232
                return validate.ErrRequestNotSet
×
2233
        }
×
2234

2235
        domainName := signalRequest.GetDomain()
723✔
2236
        wfExecution := signalRequest.GetWorkflowExecution()
723✔
2237

723✔
2238
        if domainName == "" {
723✔
2239
                return validate.ErrDomainNotSet
×
2240
        }
×
2241
        if err := validate.CheckExecution(wfExecution); err != nil {
723✔
2242
                return err
×
2243
        }
×
2244

2245
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWorkflowExecutionScope, signalRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
723✔
2246
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
723✔
2247
        if !common.IsValidIDLength(
723✔
2248
                domainName,
723✔
2249
                scope,
723✔
2250
                idLengthWarnLimit,
723✔
2251
                wh.config.DomainNameMaxLength(domainName),
723✔
2252
                metrics.CadenceErrDomainNameExceededWarnLimit,
723✔
2253
                domainName,
723✔
2254
                wh.GetLogger(),
723✔
2255
                tag.IDTypeDomainName) {
723✔
2256
                return validate.ErrDomainTooLong
×
2257
        }
×
2258

2259
        if signalRequest.GetSignalName() == "" {
723✔
2260
                return validate.ErrSignalNameNotSet
×
2261
        }
×
2262

2263
        if !common.IsValidIDLength(
723✔
2264
                signalRequest.GetSignalName(),
723✔
2265
                scope,
723✔
2266
                idLengthWarnLimit,
723✔
2267
                wh.config.SignalNameMaxLength(domainName),
723✔
2268
                metrics.CadenceErrSignalNameExceededWarnLimit,
723✔
2269
                domainName,
723✔
2270
                wh.GetLogger(),
723✔
2271
                tag.IDTypeSignalName) {
723✔
2272
                return validate.ErrSignalNameTooLong
×
2273
        }
×
2274

2275
        if !common.IsValidIDLength(
723✔
2276
                signalRequest.GetRequestID(),
723✔
2277
                scope,
723✔
2278
                idLengthWarnLimit,
723✔
2279
                wh.config.RequestIDMaxLength(domainName),
723✔
2280
                metrics.CadenceErrRequestIDExceededWarnLimit,
723✔
2281
                domainName,
723✔
2282
                wh.GetLogger(),
723✔
2283
                tag.IDTypeRequestID) {
723✔
2284
                return validate.ErrRequestIDTooLong
×
2285
        }
×
2286

2287
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
723✔
2288
        if err != nil {
723✔
2289
                return err
×
2290
        }
×
2291

2292
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
723✔
2293
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
723✔
2294
        if err := common.CheckEventBlobSizeLimit(
723✔
2295
                len(signalRequest.Input),
723✔
2296
                sizeLimitWarn,
723✔
2297
                sizeLimitError,
723✔
2298
                domainID,
723✔
2299
                signalRequest.GetWorkflowExecution().GetWorkflowID(),
723✔
2300
                signalRequest.GetWorkflowExecution().GetRunID(),
723✔
2301
                scope,
723✔
2302
                wh.GetThrottledLogger(),
723✔
2303
                tag.BlobSizeViolationOperation("SignalWorkflowExecution"),
723✔
2304
        ); err != nil {
723✔
2305
                return err
×
2306
        }
×
2307

2308
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
723✔
2309
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
723✔
2310
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2311
        }
×
2312

2313
        err = wh.GetHistoryClient().SignalWorkflowExecution(ctx, &types.HistorySignalWorkflowExecutionRequest{
723✔
2314
                DomainUUID:    domainID,
723✔
2315
                SignalRequest: signalRequest,
723✔
2316
        })
723✔
2317
        if err != nil {
732✔
2318
                return wh.normalizeVersionedErrors(ctx, err)
9✔
2319
        }
9✔
2320

2321
        return nil
714✔
2322
}
2323

2324
func (wh *WorkflowHandler) SignalWithStartWorkflowExecutionAsync(
2325
        ctx context.Context,
2326
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionAsyncRequest,
2327
) (resp *types.SignalWithStartWorkflowExecutionAsyncResponse, retError error) {
3✔
2328
        if wh.isShuttingDown() {
3✔
2329
                return nil, validate.ErrShuttingDown
×
2330
        }
×
2331
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionAsyncScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
3✔
2332
        // validate request before pushing to queue
3✔
2333
        err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest.SignalWithStartWorkflowExecutionRequest, scope)
3✔
2334
        if err != nil {
3✔
2335
                return nil, err
×
2336
        }
×
2337
        producer, err := wh.producerManager.GetProducerByDomain(signalWithStartRequest.GetDomain())
3✔
2338
        if err != nil {
4✔
2339
                return nil, err
1✔
2340
        }
1✔
2341
        // serialize the message to be sent to the queue
2342
        payload, err := json.Marshal(signalWithStartRequest)
2✔
2343
        if err != nil {
2✔
2344
                return nil, err
×
2345
        }
×
2346
        // propagate the headers from the context to the message
2347
        clientHeaders := common.GetClientHeaders(ctx)
2✔
2348
        header := &shared.Header{
2✔
2349
                Fields: map[string][]byte{},
2✔
2350
        }
2✔
2351
        for k, v := range clientHeaders {
12✔
2352
                header.Fields[k] = []byte(v)
10✔
2353
        }
10✔
2354
        messageType := sqlblobs.AsyncRequestTypeSignalWithStartWorkflowExecutionAsyncRequest
2✔
2355
        message := &sqlblobs.AsyncRequestMessage{
2✔
2356
                PartitionKey: common.StringPtr(signalWithStartRequest.GetWorkflowID()),
2✔
2357
                Type:         &messageType,
2✔
2358
                Header:       header,
2✔
2359
                Encoding:     common.StringPtr(string(common.EncodingTypeJSON)),
2✔
2360
                Payload:      payload,
2✔
2361
        }
2✔
2362
        err = producer.Publish(ctx, message)
2✔
2363
        if err != nil {
3✔
2364
                return nil, err
1✔
2365
        }
1✔
2366
        return &types.SignalWithStartWorkflowExecutionAsyncResponse{}, nil
1✔
2367
}
2368

2369
// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
2370
// If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history
2371
// and a decision task being created for the execution.
2372
// If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled
2373
// event recorded in history, and a decision task being created for the execution
2374
func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(
2375
        ctx context.Context,
2376
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest,
2377
) (resp *types.StartWorkflowExecutionResponse, retError error) {
33✔
2378
        if wh.isShuttingDown() {
33✔
2379
                return nil, validate.ErrShuttingDown
×
2380
        }
×
2381

2382
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
33✔
2383
        err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest, scope)
33✔
2384
        if err != nil {
33✔
2385
                return nil, err
×
2386
        }
×
2387

2388
        domainName := signalWithStartRequest.GetDomain()
33✔
2389
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
33✔
2390
        if err != nil {
33✔
2391
                return nil, err
×
2392
        }
×
2393
        resp, err = wh.GetHistoryClient().SignalWithStartWorkflowExecution(ctx, &types.HistorySignalWithStartWorkflowExecutionRequest{
33✔
2394
                DomainUUID:             domainID,
33✔
2395
                SignalWithStartRequest: signalWithStartRequest,
33✔
2396
                PartitionConfig:        wh.getPartitionConfig(ctx, domainName),
33✔
2397
        })
33✔
2398
        if err != nil {
39✔
2399
                return nil, err
6✔
2400
        }
6✔
2401

2402
        return resp, nil
27✔
2403
}
2404

2405
func (wh *WorkflowHandler) validateSignalWithStartWorkflowExecutionRequest(ctx context.Context, signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest, scope metrics.Scope) error {
36✔
2406
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
36✔
2407
                return err
×
2408
        }
×
2409

2410
        if signalWithStartRequest == nil {
36✔
2411
                return validate.ErrRequestNotSet
×
2412
        }
×
2413

2414
        domainName := signalWithStartRequest.GetDomain()
36✔
2415
        if domainName == "" {
36✔
2416
                return validate.ErrDomainNotSet
×
2417
        }
×
2418
        if signalWithStartRequest.GetWorkflowID() == "" {
36✔
2419
                return validate.ErrWorkflowIDNotSet
×
2420
        }
×
2421

2422
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
36✔
2423
        if !common.IsValidIDLength(
36✔
2424
                domainName,
36✔
2425
                scope,
36✔
2426
                idLengthWarnLimit,
36✔
2427
                wh.config.DomainNameMaxLength(domainName),
36✔
2428
                metrics.CadenceErrDomainNameExceededWarnLimit,
36✔
2429
                domainName,
36✔
2430
                wh.GetLogger(),
36✔
2431
                tag.IDTypeDomainName) {
36✔
2432
                return validate.ErrDomainTooLong
×
2433
        }
×
2434

2435
        if !common.IsValidIDLength(
36✔
2436
                signalWithStartRequest.GetWorkflowID(),
36✔
2437
                scope,
36✔
2438
                idLengthWarnLimit,
36✔
2439
                wh.config.WorkflowIDMaxLength(domainName),
36✔
2440
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
36✔
2441
                domainName,
36✔
2442
                wh.GetLogger(),
36✔
2443
                tag.IDTypeWorkflowID) {
36✔
2444
                return validate.ErrWorkflowIDTooLong
×
2445
        }
×
2446

2447
        if signalWithStartRequest.GetSignalName() == "" {
36✔
2448
                return validate.ErrSignalNameNotSet
×
2449
        }
×
2450

2451
        if !common.IsValidIDLength(
36✔
2452
                signalWithStartRequest.GetSignalName(),
36✔
2453
                scope,
36✔
2454
                idLengthWarnLimit,
36✔
2455
                wh.config.SignalNameMaxLength(domainName),
36✔
2456
                metrics.CadenceErrSignalNameExceededWarnLimit,
36✔
2457
                domainName,
36✔
2458
                wh.GetLogger(),
36✔
2459
                tag.IDTypeSignalName) {
36✔
2460
                return validate.ErrSignalNameTooLong
×
2461
        }
×
2462

2463
        if signalWithStartRequest.WorkflowType == nil || signalWithStartRequest.WorkflowType.GetName() == "" {
36✔
2464
                return validate.ErrWorkflowTypeNotSet
×
2465
        }
×
2466

2467
        if !common.IsValidIDLength(
36✔
2468
                signalWithStartRequest.WorkflowType.GetName(),
36✔
2469
                scope,
36✔
2470
                idLengthWarnLimit,
36✔
2471
                wh.config.WorkflowTypeMaxLength(domainName),
36✔
2472
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
36✔
2473
                domainName,
36✔
2474
                wh.GetLogger(),
36✔
2475
                tag.IDTypeWorkflowType) {
36✔
2476
                return validate.ErrWorkflowTypeTooLong
×
2477
        }
×
2478

2479
        if err := wh.validateTaskList(signalWithStartRequest.TaskList, scope, domainName); err != nil {
36✔
2480
                return err
×
2481
        }
×
2482

2483
        if !common.IsValidIDLength(
36✔
2484
                signalWithStartRequest.GetRequestID(),
36✔
2485
                scope,
36✔
2486
                idLengthWarnLimit,
36✔
2487
                wh.config.RequestIDMaxLength(domainName),
36✔
2488
                metrics.CadenceErrRequestIDExceededWarnLimit,
36✔
2489
                domainName,
36✔
2490
                wh.GetLogger(),
36✔
2491
                tag.IDTypeRequestID) {
36✔
2492
                return validate.ErrRequestIDTooLong
×
2493
        }
×
2494

2495
        if signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
36✔
2496
                return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
×
2497
        }
×
2498

2499
        if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
36✔
2500
                return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
×
2501
        }
×
2502

2503
        if err := common.ValidateRetryPolicy(signalWithStartRequest.RetryPolicy); err != nil {
36✔
2504
                return err
×
2505
        }
×
2506

2507
        if signalWithStartRequest.GetCronSchedule() != "" {
36✔
2508
                if _, err := backoff.ValidateSchedule(signalWithStartRequest.GetCronSchedule()); err != nil {
×
2509
                        return err
×
2510
                }
×
2511
        }
2512

2513
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(signalWithStartRequest.SearchAttributes, domainName); err != nil {
36✔
2514
                return err
×
2515
        }
×
2516

2517
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
36✔
2518
        if err != nil {
36✔
2519
                return err
×
2520
        }
×
2521

2522
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
36✔
2523
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
36✔
2524
        if err := common.CheckEventBlobSizeLimit(
36✔
2525
                len(signalWithStartRequest.SignalInput),
36✔
2526
                sizeLimitWarn,
36✔
2527
                sizeLimitError,
36✔
2528
                domainID,
36✔
2529
                signalWithStartRequest.GetWorkflowID(),
36✔
2530
                "",
36✔
2531
                scope,
36✔
2532
                wh.GetThrottledLogger(),
36✔
2533
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
36✔
2534
        ); err != nil {
36✔
2535
                return err
×
2536
        }
×
2537
        actualSize := len(signalWithStartRequest.Input) + common.GetSizeOfMapStringToByteArray(signalWithStartRequest.Memo.GetFields())
36✔
2538
        if err := common.CheckEventBlobSizeLimit(
36✔
2539
                actualSize,
36✔
2540
                sizeLimitWarn,
36✔
2541
                sizeLimitError,
36✔
2542
                domainID,
36✔
2543
                signalWithStartRequest.GetWorkflowID(),
36✔
2544
                "",
36✔
2545
                scope,
36✔
2546
                wh.GetThrottledLogger(),
36✔
2547
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
36✔
2548
        ); err != nil {
36✔
2549
                return err
×
2550
        }
×
2551

2552
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
36✔
2553
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
36✔
2554
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2555
        }
×
2556
        return nil
36✔
2557
}
2558

2559
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event
2560
// in the history and immediately terminating the execution instance.
2561
func (wh *WorkflowHandler) TerminateWorkflowExecution(
2562
        ctx context.Context,
2563
        terminateRequest *types.TerminateWorkflowExecutionRequest,
2564
) (retError error) {
48✔
2565
        if wh.isShuttingDown() {
48✔
2566
                return validate.ErrShuttingDown
×
2567
        }
×
2568

2569
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
48✔
2570
                return err
×
2571
        }
×
2572

2573
        if terminateRequest == nil {
48✔
2574
                return validate.ErrRequestNotSet
×
2575
        }
×
2576

2577
        domainName := terminateRequest.GetDomain()
48✔
2578
        wfExecution := terminateRequest.GetWorkflowExecution()
48✔
2579
        if terminateRequest.GetDomain() == "" {
48✔
2580
                return validate.ErrDomainNotSet
×
2581
        }
×
2582
        if err := validate.CheckExecution(wfExecution); err != nil {
48✔
2583
                return err
×
2584
        }
×
2585

2586
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
48✔
2587
        if err != nil {
48✔
2588
                return err
×
2589
        }
×
2590

2591
        err = wh.GetHistoryClient().TerminateWorkflowExecution(ctx, &types.HistoryTerminateWorkflowExecutionRequest{
48✔
2592
                DomainUUID:       domainID,
48✔
2593
                TerminateRequest: terminateRequest,
48✔
2594
        })
48✔
2595
        if err != nil {
48✔
2596
                return wh.normalizeVersionedErrors(ctx, err)
×
2597
        }
×
2598

2599
        return nil
48✔
2600
}
2601

2602
// ResetWorkflowExecution reset an existing workflow execution to the nextFirstEventID
2603
// in the history and immediately terminating the current execution instance.
2604
func (wh *WorkflowHandler) ResetWorkflowExecution(
2605
        ctx context.Context,
2606
        resetRequest *types.ResetWorkflowExecutionRequest,
2607
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
15✔
2608
        if wh.isShuttingDown() {
15✔
2609
                return nil, validate.ErrShuttingDown
×
2610
        }
×
2611

2612
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2613
                return nil, err
×
2614
        }
×
2615

2616
        if resetRequest == nil {
15✔
2617
                return nil, validate.ErrRequestNotSet
×
2618
        }
×
2619

2620
        domainName := resetRequest.GetDomain()
15✔
2621
        wfExecution := resetRequest.GetWorkflowExecution()
15✔
2622
        if domainName == "" {
15✔
2623
                return nil, validate.ErrDomainNotSet
×
2624
        }
×
2625
        if err := validate.CheckExecution(wfExecution); err != nil {
15✔
2626
                return nil, err
×
2627
        }
×
2628

2629
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
15✔
2630
        if err != nil {
15✔
2631
                return nil, err
×
2632
        }
×
2633

2634
        resp, err = wh.GetHistoryClient().ResetWorkflowExecution(ctx, &types.HistoryResetWorkflowExecutionRequest{
15✔
2635
                DomainUUID:   domainID,
15✔
2636
                ResetRequest: resetRequest,
15✔
2637
        })
15✔
2638
        if err != nil {
15✔
2639
                return nil, err
×
2640
        }
×
2641

2642
        return resp, nil
15✔
2643
}
2644

2645
// RequestCancelWorkflowExecution - requests to cancel a workflow execution
2646
func (wh *WorkflowHandler) RequestCancelWorkflowExecution(
2647
        ctx context.Context,
2648
        cancelRequest *types.RequestCancelWorkflowExecutionRequest,
2649
) (retError error) {
6✔
2650
        if wh.isShuttingDown() {
6✔
2651
                return validate.ErrShuttingDown
×
2652
        }
×
2653

2654
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
6✔
2655
                return err
×
2656
        }
×
2657

2658
        if cancelRequest == nil {
6✔
2659
                return validate.ErrRequestNotSet
×
2660
        }
×
2661

2662
        domainName := cancelRequest.GetDomain()
6✔
2663
        wfExecution := cancelRequest.GetWorkflowExecution()
6✔
2664
        if domainName == "" {
6✔
2665
                return validate.ErrDomainNotSet
×
2666
        }
×
2667
        if err := validate.CheckExecution(wfExecution); err != nil {
6✔
2668
                return err
×
2669
        }
×
2670

2671
        domainID, err := wh.GetDomainCache().GetDomainID(cancelRequest.GetDomain())
6✔
2672
        if err != nil {
6✔
2673
                return err
×
2674
        }
×
2675

2676
        err = wh.GetHistoryClient().RequestCancelWorkflowExecution(ctx, &types.HistoryRequestCancelWorkflowExecutionRequest{
6✔
2677
                DomainUUID:    domainID,
6✔
2678
                CancelRequest: cancelRequest,
6✔
2679
        })
6✔
2680
        if err != nil {
9✔
2681
                return wh.normalizeVersionedErrors(ctx, err)
3✔
2682
        }
3✔
2683

2684
        return nil
3✔
2685
}
2686

2687
// ListOpenWorkflowExecutions - retrieves info for open workflow executions in a domain
2688
func (wh *WorkflowHandler) ListOpenWorkflowExecutions(
2689
        ctx context.Context,
2690
        listRequest *types.ListOpenWorkflowExecutionsRequest,
2691
) (resp *types.ListOpenWorkflowExecutionsResponse, retError error) {
106✔
2692
        if wh.isShuttingDown() {
106✔
2693
                return nil, validate.ErrShuttingDown
×
2694
        }
×
2695

2696
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
106✔
2697
                return nil, err
×
2698
        }
×
2699

2700
        if listRequest == nil {
106✔
2701
                return nil, validate.ErrRequestNotSet
×
2702
        }
×
2703

2704
        if listRequest.GetDomain() == "" {
106✔
2705
                return nil, validate.ErrDomainNotSet
×
2706
        }
×
2707

2708
        if listRequest.StartTimeFilter == nil {
106✔
2709
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2710
        }
×
2711

2712
        if listRequest.StartTimeFilter.EarliestTime == nil {
106✔
2713
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2714
        }
×
2715

2716
        if listRequest.StartTimeFilter.LatestTime == nil {
106✔
2717
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2718
        }
×
2719

2720
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
106✔
2721
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2722
        }
×
2723

2724
        if listRequest.ExecutionFilter != nil && listRequest.TypeFilter != nil {
106✔
2725
                return nil, &types.BadRequestError{
×
2726
                        Message: "Only one of ExecutionFilter or TypeFilter is allowed"}
×
2727
        }
×
2728

2729
        if listRequest.GetMaximumPageSize() <= 0 {
167✔
2730
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
61✔
2731
        }
61✔
2732

2733
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
106✔
2734
                return nil, &types.BadRequestError{
×
2735
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2736
        }
×
2737

2738
        domain := listRequest.GetDomain()
106✔
2739
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
106✔
2740
        if err != nil {
106✔
2741
                return nil, err
×
2742
        }
×
2743

2744
        baseReq := persistence.ListWorkflowExecutionsRequest{
106✔
2745
                DomainUUID:    domainID,
106✔
2746
                Domain:        domain,
106✔
2747
                PageSize:      int(listRequest.GetMaximumPageSize()),
106✔
2748
                NextPageToken: listRequest.NextPageToken,
106✔
2749
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
106✔
2750
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
106✔
2751
        }
106✔
2752

106✔
2753
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
106✔
2754
        if listRequest.ExecutionFilter != nil {
206✔
2755
                if wh.config.DisableListVisibilityByFilter(domain) {
101✔
2756
                        err = validate.ErrNoPermission
1✔
2757
                } else {
100✔
2758
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByWorkflowID(
99✔
2759
                                ctx,
99✔
2760
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
99✔
2761
                                        ListWorkflowExecutionsRequest: baseReq,
99✔
2762
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
99✔
2763
                                })
99✔
2764
                }
99✔
2765
                wh.GetLogger().Debug("List open workflow with filter",
100✔
2766
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
100✔
2767
        } else if listRequest.TypeFilter != nil {
7✔
2768
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2769
                        err = validate.ErrNoPermission
1✔
2770
                } else {
1✔
2771
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByType(
×
2772
                                ctx,
×
2773
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2774
                                        ListWorkflowExecutionsRequest: baseReq,
×
2775
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2776
                                },
×
2777
                        )
×
2778
                }
×
2779
                wh.GetLogger().Debug("List open workflow with filter",
1✔
2780
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2781
        } else {
5✔
2782
                persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutions(ctx, &baseReq)
5✔
2783
        }
5✔
2784

2785
        if err != nil {
108✔
2786
                return nil, err
2✔
2787
        }
2✔
2788

2789
        resp = &types.ListOpenWorkflowExecutionsResponse{}
104✔
2790
        resp.Executions = persistenceResp.Executions
104✔
2791
        resp.NextPageToken = persistenceResp.NextPageToken
104✔
2792
        return resp, nil
104✔
2793
}
2794

2795
// ListArchivedWorkflowExecutions - retrieves archived info for closed workflow executions in a domain
2796
func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(
2797
        ctx context.Context,
2798
        listRequest *types.ListArchivedWorkflowExecutionsRequest,
2799
) (resp *types.ListArchivedWorkflowExecutionsResponse, retError error) {
15✔
2800
        if wh.isShuttingDown() {
15✔
2801
                return nil, validate.ErrShuttingDown
×
2802
        }
×
2803

2804
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2805
                return nil, err
×
2806
        }
×
2807

2808
        if listRequest == nil {
15✔
2809
                return nil, validate.ErrRequestNotSet
×
2810
        }
×
2811

2812
        if listRequest.GetDomain() == "" {
16✔
2813
                return nil, validate.ErrDomainNotSet
1✔
2814
        }
1✔
2815

2816
        if listRequest.GetPageSize() <= 0 {
14✔
2817
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
2818
        }
×
2819

2820
        maxPageSize := wh.config.VisibilityArchivalQueryMaxPageSize()
14✔
2821
        if int(listRequest.GetPageSize()) > maxPageSize {
14✔
2822
                return nil, &types.BadRequestError{
×
2823
                        Message: fmt.Sprintf("Pagesize is larger than allowed %d", maxPageSize)}
×
2824
        }
×
2825

2826
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() {
15✔
2827
                return nil, &types.BadRequestError{Message: "Cluster is not configured for visibility archival"}
1✔
2828
        }
1✔
2829

2830
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ReadEnabled() {
13✔
2831
                return nil, &types.BadRequestError{Message: "Cluster is not configured for reading archived visibility records"}
×
2832
        }
×
2833

2834
        entry, err := wh.GetDomainCache().GetDomain(listRequest.GetDomain())
13✔
2835
        if err != nil {
14✔
2836
                return nil, err
1✔
2837
        }
1✔
2838

2839
        if entry.GetConfig().VisibilityArchivalStatus != types.ArchivalStatusEnabled {
14✔
2840
                return nil, &types.BadRequestError{Message: "Domain is not configured for visibility archival"}
2✔
2841
        }
2✔
2842

2843
        URI, err := archiver.NewURI(entry.GetConfig().VisibilityArchivalURI)
10✔
2844
        if err != nil {
10✔
2845
                return nil, err
×
2846
        }
×
2847

2848
        visibilityArchiver, err := wh.GetArchiverProvider().GetVisibilityArchiver(URI.Scheme(), service.Frontend)
10✔
2849
        if err != nil {
10✔
2850
                return nil, err
×
2851
        }
×
2852

2853
        archiverRequest := &archiver.QueryVisibilityRequest{
10✔
2854
                DomainID:      entry.GetInfo().ID,
10✔
2855
                PageSize:      int(listRequest.GetPageSize()),
10✔
2856
                NextPageToken: listRequest.NextPageToken,
10✔
2857
                Query:         listRequest.GetQuery(),
10✔
2858
        }
10✔
2859

10✔
2860
        archiverResponse, err := visibilityArchiver.Query(ctx, URI, archiverRequest)
10✔
2861
        if err != nil {
10✔
2862
                return nil, err
×
2863
        }
×
2864

2865
        // special handling of ExecutionTime for cron or retry
2866
        for _, execution := range archiverResponse.Executions {
25✔
2867
                if execution.GetExecutionTime() == 0 {
30✔
2868
                        execution.ExecutionTime = common.Int64Ptr(execution.GetStartTime())
15✔
2869
                }
15✔
2870
        }
2871

2872
        return &types.ListArchivedWorkflowExecutionsResponse{
10✔
2873
                Executions:    archiverResponse.Executions,
10✔
2874
                NextPageToken: archiverResponse.NextPageToken,
10✔
2875
        }, nil
10✔
2876
}
2877

2878
// ListClosedWorkflowExecutions - retrieves info for closed workflow executions in a domain
2879
func (wh *WorkflowHandler) ListClosedWorkflowExecutions(
2880
        ctx context.Context,
2881
        listRequest *types.ListClosedWorkflowExecutionsRequest,
2882
) (resp *types.ListClosedWorkflowExecutionsResponse, retError error) {
29✔
2883
        if wh.isShuttingDown() {
29✔
2884
                return nil, validate.ErrShuttingDown
×
2885
        }
×
2886

2887
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
29✔
2888
                return nil, err
×
2889
        }
×
2890

2891
        if listRequest == nil {
29✔
2892
                return nil, validate.ErrRequestNotSet
×
2893
        }
×
2894

2895
        if listRequest.GetDomain() == "" {
29✔
2896
                return nil, validate.ErrDomainNotSet
×
2897
        }
×
2898

2899
        if listRequest.StartTimeFilter == nil {
29✔
2900
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2901
        }
×
2902

2903
        if listRequest.StartTimeFilter.EarliestTime == nil {
29✔
2904
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2905
        }
×
2906

2907
        if listRequest.StartTimeFilter.LatestTime == nil {
29✔
2908
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2909
        }
×
2910

2911
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
29✔
2912
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2913
        }
×
2914

2915
        filterCount := 0
29✔
2916
        if listRequest.TypeFilter != nil {
30✔
2917
                filterCount++
1✔
2918
        }
1✔
2919
        if listRequest.StatusFilter != nil {
30✔
2920
                filterCount++
1✔
2921
        }
1✔
2922

2923
        if filterCount > 1 {
29✔
2924
                return nil, &types.BadRequestError{
×
2925
                        Message: "Only one of ExecutionFilter, TypeFilter or StatusFilter is allowed"}
×
2926
        } // If ExecutionFilter is provided with one of TypeFilter or StatusFilter, use ExecutionFilter and ignore other filter
×
2927

2928
        if listRequest.GetMaximumPageSize() <= 0 {
30✔
2929
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
1✔
2930
        }
1✔
2931

2932
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
29✔
2933
                return nil, &types.BadRequestError{
×
2934
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2935
        }
×
2936

2937
        domain := listRequest.GetDomain()
29✔
2938
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
29✔
2939
        if err != nil {
29✔
2940
                return nil, err
×
2941
        }
×
2942

2943
        baseReq := persistence.ListWorkflowExecutionsRequest{
29✔
2944
                DomainUUID:    domainID,
29✔
2945
                Domain:        domain,
29✔
2946
                PageSize:      int(listRequest.GetMaximumPageSize()),
29✔
2947
                NextPageToken: listRequest.NextPageToken,
29✔
2948
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
29✔
2949
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
29✔
2950
        }
29✔
2951

29✔
2952
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
29✔
2953
        if listRequest.ExecutionFilter != nil {
45✔
2954
                if wh.config.DisableListVisibilityByFilter(domain) {
17✔
2955
                        err = validate.ErrNoPermission
1✔
2956
                } else {
16✔
2957
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByWorkflowID(
15✔
2958
                                ctx,
15✔
2959
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
15✔
2960
                                        ListWorkflowExecutionsRequest: baseReq,
15✔
2961
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
15✔
2962
                                },
15✔
2963
                        )
15✔
2964
                }
15✔
2965
                wh.GetLogger().Debug("List closed workflow with filter",
16✔
2966
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
16✔
2967
        } else if listRequest.TypeFilter != nil {
14✔
2968
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2969
                        err = validate.ErrNoPermission
1✔
2970
                } else {
1✔
2971
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByType(
×
2972
                                ctx,
×
2973
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2974
                                        ListWorkflowExecutionsRequest: baseReq,
×
2975
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2976
                                },
×
2977
                        )
×
2978
                }
×
2979
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
2980
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2981
        } else if listRequest.StatusFilter != nil {
13✔
2982
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2983
                        err = validate.ErrNoPermission
1✔
2984
                } else {
1✔
2985
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByStatus(
×
2986
                                ctx,
×
2987
                                &persistence.ListClosedWorkflowExecutionsByStatusRequest{
×
2988
                                        ListWorkflowExecutionsRequest: baseReq,
×
2989
                                        Status:                        listRequest.GetStatusFilter(),
×
2990
                                },
×
2991
                        )
×
2992
                }
×
2993
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
2994
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByStatus)
1✔
2995
        } else {
11✔
2996
                persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutions(ctx, &baseReq)
11✔
2997
        }
11✔
2998

2999
        if err != nil {
32✔
3000
                return nil, err
3✔
3001
        }
3✔
3002

3003
        resp = &types.ListClosedWorkflowExecutionsResponse{}
26✔
3004
        resp.Executions = persistenceResp.Executions
26✔
3005
        resp.NextPageToken = persistenceResp.NextPageToken
26✔
3006
        return resp, nil
26✔
3007
}
3008

3009
// ListWorkflowExecutions - retrieves info for workflow executions in a domain
3010
func (wh *WorkflowHandler) ListWorkflowExecutions(
3011
        ctx context.Context,
3012
        listRequest *types.ListWorkflowExecutionsRequest,
3013
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
142✔
3014
        if wh.isShuttingDown() {
142✔
3015
                return nil, validate.ErrShuttingDown
×
3016
        }
×
3017

3018
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
142✔
3019
                return nil, err
×
3020
        }
×
3021

3022
        if listRequest == nil {
142✔
3023
                return nil, validate.ErrRequestNotSet
×
3024
        }
×
3025

3026
        if listRequest.GetDomain() == "" {
142✔
3027
                return nil, validate.ErrDomainNotSet
×
3028
        }
×
3029

3030
        if listRequest.GetPageSize() <= 0 {
142✔
3031
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3032
        }
×
3033

3034
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
142✔
3035
                return nil, &types.BadRequestError{
×
3036
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3037
        }
×
3038

3039
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
142✔
3040
        if err != nil {
145✔
3041
                return nil, err
3✔
3042
        }
3✔
3043

3044
        domain := listRequest.GetDomain()
139✔
3045
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
139✔
3046
        if err != nil {
139✔
3047
                return nil, err
×
3048
        }
×
3049

3050
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
139✔
3051
                DomainUUID:    domainID,
139✔
3052
                Domain:        domain,
139✔
3053
                PageSize:      int(listRequest.GetPageSize()),
139✔
3054
                NextPageToken: listRequest.NextPageToken,
139✔
3055
                Query:         validatedQuery,
139✔
3056
        }
139✔
3057
        persistenceResp, err := wh.GetVisibilityManager().ListWorkflowExecutions(ctx, req)
139✔
3058
        if err != nil {
139✔
3059
                return nil, err
×
3060
        }
×
3061

3062
        resp = &types.ListWorkflowExecutionsResponse{}
139✔
3063
        resp.Executions = persistenceResp.Executions
139✔
3064
        resp.NextPageToken = persistenceResp.NextPageToken
139✔
3065
        return resp, nil
139✔
3066
}
3067

3068
// RestartWorkflowExecution - retrieves info for an existing workflow then restarts it
3069
func (wh *WorkflowHandler) RestartWorkflowExecution(ctx context.Context, request *types.RestartWorkflowExecutionRequest) (resp *types.RestartWorkflowExecutionResponse, retError error) {
2✔
3070
        if wh.isShuttingDown() {
2✔
3071
                return nil, validate.ErrShuttingDown
×
3072
        }
×
3073

3074
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
3075
                return nil, err
×
3076
        }
×
3077

3078
        if request == nil {
2✔
3079
                return nil, validate.ErrRequestNotSet
×
3080
        }
×
3081

3082
        domainName := request.GetDomain()
2✔
3083
        wfExecution := request.GetWorkflowExecution()
2✔
3084

2✔
3085
        if request.GetDomain() == "" {
2✔
3086
                return nil, validate.ErrDomainNotSet
×
3087
        }
×
3088

3089
        if err := validate.CheckExecution(wfExecution); err != nil {
2✔
3090
                return nil, err
×
3091
        }
×
3092

3093
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
2✔
3094
        if err != nil {
2✔
3095
                return nil, err
×
3096
        }
×
3097

3098
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
2✔
3099
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
3✔
3100
                return nil, &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
3101
        }
1✔
3102

3103
        history, err := wh.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
1✔
3104
                Domain: domainName,
1✔
3105
                Execution: &types.WorkflowExecution{
1✔
3106
                        WorkflowID: wfExecution.WorkflowID,
1✔
3107
                        RunID:      wfExecution.RunID,
1✔
3108
                },
1✔
3109
                SkipArchival: true,
1✔
3110
        })
1✔
3111
        if err != nil {
1✔
3112
                return nil, validate.ErrHistoryNotFound
×
3113
        }
×
3114
        startRequest := constructRestartWorkflowRequest(history.History.Events[0].WorkflowExecutionStartedEventAttributes,
1✔
3115
                domainName, request.Identity, wfExecution.WorkflowID)
1✔
3116
        req, err := common.CreateHistoryStartWorkflowRequest(domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
1✔
3117
        if err != nil {
1✔
3118
                return nil, err
×
3119
        }
×
3120
        startResp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, req)
1✔
3121
        if err != nil {
1✔
3122
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3123
        }
×
3124
        resp = &types.RestartWorkflowExecutionResponse{
1✔
3125
                RunID: startResp.RunID,
1✔
3126
        }
1✔
3127

1✔
3128
        return resp, nil
1✔
3129
}
3130

3131
// ScanWorkflowExecutions - retrieves info for large amount of workflow executions in a domain without order
3132
func (wh *WorkflowHandler) ScanWorkflowExecutions(
3133
        ctx context.Context,
3134
        listRequest *types.ListWorkflowExecutionsRequest,
3135
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
28✔
3136
        if wh.isShuttingDown() {
28✔
3137
                return nil, validate.ErrShuttingDown
×
3138
        }
×
3139

3140
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
28✔
3141
                return nil, err
×
3142
        }
×
3143

3144
        if listRequest == nil {
28✔
3145
                return nil, validate.ErrRequestNotSet
×
3146
        }
×
3147

3148
        if listRequest.GetDomain() == "" {
28✔
3149
                return nil, validate.ErrDomainNotSet
×
3150
        }
×
3151

3152
        if listRequest.GetPageSize() <= 0 {
28✔
3153
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3154
        }
×
3155

3156
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
28✔
3157
                return nil, &types.BadRequestError{
×
3158
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3159
        }
×
3160

3161
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
28✔
3162
        if err != nil {
29✔
3163
                return nil, err
1✔
3164
        }
1✔
3165

3166
        domain := listRequest.GetDomain()
27✔
3167
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
27✔
3168
        if err != nil {
27✔
3169
                return nil, err
×
3170
        }
×
3171

3172
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
27✔
3173
                DomainUUID:    domainID,
27✔
3174
                Domain:        domain,
27✔
3175
                PageSize:      int(listRequest.GetPageSize()),
27✔
3176
                NextPageToken: listRequest.NextPageToken,
27✔
3177
                Query:         validatedQuery,
27✔
3178
        }
27✔
3179
        persistenceResp, err := wh.GetVisibilityManager().ScanWorkflowExecutions(ctx, req)
27✔
3180
        if err != nil {
27✔
3181
                return nil, err
×
3182
        }
×
3183

3184
        resp = &types.ListWorkflowExecutionsResponse{}
27✔
3185
        resp.Executions = persistenceResp.Executions
27✔
3186
        resp.NextPageToken = persistenceResp.NextPageToken
27✔
3187
        return resp, nil
27✔
3188
}
3189

3190
// CountWorkflowExecutions - count number of workflow executions in a domain
3191
func (wh *WorkflowHandler) CountWorkflowExecutions(
3192
        ctx context.Context,
3193
        countRequest *types.CountWorkflowExecutionsRequest,
3194
) (resp *types.CountWorkflowExecutionsResponse, retError error) {
14✔
3195
        if wh.isShuttingDown() {
14✔
3196
                return nil, validate.ErrShuttingDown
×
3197
        }
×
3198

3199
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
14✔
3200
                return nil, err
×
3201
        }
×
3202

3203
        if countRequest == nil {
14✔
3204
                return nil, validate.ErrRequestNotSet
×
3205
        }
×
3206

3207
        if countRequest.GetDomain() == "" {
14✔
3208
                return nil, validate.ErrDomainNotSet
×
3209
        }
×
3210

3211
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(countRequest.GetQuery())
14✔
3212
        if err != nil {
15✔
3213
                return nil, err
1✔
3214
        }
1✔
3215

3216
        domain := countRequest.GetDomain()
13✔
3217
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
13✔
3218
        if err != nil {
13✔
3219
                return nil, err
×
3220
        }
×
3221

3222
        req := &persistence.CountWorkflowExecutionsRequest{
13✔
3223
                DomainUUID: domainID,
13✔
3224
                Domain:     domain,
13✔
3225
                Query:      validatedQuery,
13✔
3226
        }
13✔
3227
        persistenceResp, err := wh.GetVisibilityManager().CountWorkflowExecutions(ctx, req)
13✔
3228
        if err != nil {
13✔
3229
                return nil, err
×
3230
        }
×
3231

3232
        resp = &types.CountWorkflowExecutionsResponse{
13✔
3233
                Count: persistenceResp.Count,
13✔
3234
        }
13✔
3235
        return resp, nil
13✔
3236
}
3237

3238
// GetSearchAttributes return valid indexed keys
3239
func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context) (resp *types.GetSearchAttributesResponse, retError error) {
1✔
3240
        if wh.isShuttingDown() {
1✔
3241
                return nil, validate.ErrShuttingDown
×
3242
        }
×
3243

3244
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1✔
3245
                return nil, err
×
3246
        }
×
3247

3248
        keys := wh.config.ValidSearchAttributes()
1✔
3249
        resp = &types.GetSearchAttributesResponse{
1✔
3250
                Keys: wh.convertIndexedKeyToThrift(keys),
1✔
3251
        }
1✔
3252
        return resp, nil
1✔
3253
}
3254

3255
// ResetStickyTaskList reset the volatile information in mutable state of a given workflow.
3256
func (wh *WorkflowHandler) ResetStickyTaskList(
3257
        ctx context.Context,
3258
        resetRequest *types.ResetStickyTaskListRequest,
3259
) (resp *types.ResetStickyTaskListResponse, retError error) {
3✔
3260
        if wh.isShuttingDown() {
3✔
3261
                return nil, validate.ErrShuttingDown
×
3262
        }
×
3263

3264
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
3265
                return nil, err
×
3266
        }
×
3267

3268
        if resetRequest == nil {
3✔
3269
                return nil, validate.ErrRequestNotSet
×
3270
        }
×
3271

3272
        domainName := resetRequest.GetDomain()
3✔
3273
        wfExecution := resetRequest.GetExecution()
3✔
3274

3✔
3275
        if domainName == "" {
3✔
3276
                return nil, validate.ErrDomainNotSet
×
3277
        }
×
3278

3279
        if err := validate.CheckExecution(wfExecution); err != nil {
3✔
3280
                return nil, err
×
3281
        }
×
3282

3283
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
3✔
3284
        if err != nil {
3✔
3285
                return nil, err
×
3286
        }
×
3287

3288
        _, err = wh.GetHistoryClient().ResetStickyTaskList(ctx, &types.HistoryResetStickyTaskListRequest{
3✔
3289
                DomainUUID: domainID,
3✔
3290
                Execution:  resetRequest.Execution,
3✔
3291
        })
3✔
3292
        if err != nil {
3✔
3293
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3294
        }
×
3295
        return &types.ResetStickyTaskListResponse{}, nil
3✔
3296
}
3297

3298
// QueryWorkflow returns query result for a specified workflow execution
3299
func (wh *WorkflowHandler) QueryWorkflow(
3300
        ctx context.Context,
3301
        queryRequest *types.QueryWorkflowRequest,
3302
) (resp *types.QueryWorkflowResponse, retError error) {
45✔
3303
        if wh.isShuttingDown() {
45✔
3304
                return nil, validate.ErrShuttingDown
×
3305
        }
×
3306

3307
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
45✔
3308
                return nil, err
×
3309
        }
×
3310

3311
        if queryRequest == nil {
45✔
3312
                return nil, validate.ErrRequestNotSet
×
3313
        }
×
3314

3315
        domainName := queryRequest.GetDomain()
45✔
3316
        wfExecution := queryRequest.GetExecution()
45✔
3317

45✔
3318
        if domainName == "" {
45✔
3319
                return nil, validate.ErrDomainNotSet
×
3320
        }
×
3321

3322
        if err := validate.CheckExecution(wfExecution); err != nil {
45✔
3323
                return nil, err
×
3324
        }
×
3325

3326
        if wh.config.DisallowQuery(domainName) {
45✔
3327
                return nil, validate.ErrQueryDisallowedForDomain
×
3328
        }
×
3329

3330
        if queryRequest.Query == nil {
45✔
3331
                return nil, validate.ErrQueryNotSet
×
3332
        }
×
3333

3334
        if queryRequest.Query.GetQueryType() == "" {
45✔
3335
                return nil, validate.ErrQueryTypeNotSet
×
3336
        }
×
3337

3338
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
45✔
3339
        if err != nil {
45✔
3340
                return nil, err
×
3341
        }
×
3342

3343
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
45✔
3344
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
45✔
3345

45✔
3346
        scope := getMetricsScopeWithDomain(metrics.FrontendQueryWorkflowScope, queryRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
45✔
3347
        if err := common.CheckEventBlobSizeLimit(
45✔
3348
                len(queryRequest.GetQuery().GetQueryArgs()),
45✔
3349
                sizeLimitWarn,
45✔
3350
                sizeLimitError,
45✔
3351
                domainID,
45✔
3352
                queryRequest.GetExecution().GetWorkflowID(),
45✔
3353
                queryRequest.GetExecution().GetRunID(),
45✔
3354
                scope,
45✔
3355
                wh.GetThrottledLogger(),
45✔
3356
                tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil {
45✔
3357
                return nil, err
×
3358
        }
×
3359

3360
        req := &types.HistoryQueryWorkflowRequest{
45✔
3361
                DomainUUID: domainID,
45✔
3362
                Request:    queryRequest,
45✔
3363
        }
45✔
3364
        hResponse, err := wh.GetHistoryClient().QueryWorkflow(ctx, req)
45✔
3365
        if err != nil {
57✔
3366
                return nil, err
12✔
3367
        }
12✔
3368
        return hResponse.GetResponse(), nil
33✔
3369
}
3370

3371
// DescribeWorkflowExecution returns information about the specified workflow execution.
3372
func (wh *WorkflowHandler) DescribeWorkflowExecution(
3373
        ctx context.Context,
3374
        request *types.DescribeWorkflowExecutionRequest,
3375
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
93✔
3376
        if wh.isShuttingDown() {
93✔
3377
                return nil, validate.ErrShuttingDown
×
3378
        }
×
3379

3380
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
93✔
3381
                return nil, err
×
3382
        }
×
3383

3384
        if request == nil {
93✔
3385
                return nil, validate.ErrRequestNotSet
×
3386
        }
×
3387

3388
        domainName := request.GetDomain()
93✔
3389
        wfExecution := request.GetExecution()
93✔
3390
        if domainName == "" {
93✔
3391
                return nil, validate.ErrDomainNotSet
×
3392
        }
×
3393

3394
        if err := validate.CheckExecution(wfExecution); err != nil {
93✔
3395
                return nil, err
×
3396
        }
×
3397

3398
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
93✔
3399
        if err != nil {
93✔
3400
                return nil, err
×
3401
        }
×
3402

3403
        response, err := wh.GetHistoryClient().DescribeWorkflowExecution(ctx, &types.HistoryDescribeWorkflowExecutionRequest{
93✔
3404
                DomainUUID: domainID,
93✔
3405
                Request:    request,
93✔
3406
        })
93✔
3407

93✔
3408
        if err != nil {
93✔
3409
                return nil, err
×
3410
        }
×
3411

3412
        return response, nil
93✔
3413
}
3414

3415
// DescribeTaskList returns information about the target tasklist, right now this API returns the
3416
// pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true,
3417
// it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
3418
func (wh *WorkflowHandler) DescribeTaskList(
3419
        ctx context.Context,
3420
        request *types.DescribeTaskListRequest,
3421
) (resp *types.DescribeTaskListResponse, retError error) {
18✔
3422
        if wh.isShuttingDown() {
18✔
3423
                return nil, validate.ErrShuttingDown
×
3424
        }
×
3425

3426
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
18✔
3427
                return nil, err
×
3428
        }
×
3429

3430
        if request == nil {
18✔
3431
                return nil, validate.ErrRequestNotSet
×
3432
        }
×
3433

3434
        if request.GetDomain() == "" {
18✔
3435
                return nil, validate.ErrDomainNotSet
×
3436
        }
×
3437

3438
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
18✔
3439
        if err != nil {
18✔
3440
                return nil, err
×
3441
        }
×
3442

3443
        scope := getMetricsScopeWithDomain(metrics.FrontendDescribeTaskListScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
18✔
3444
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
18✔
3445
                return nil, err
×
3446
        }
×
3447

3448
        if request.TaskListType == nil {
18✔
3449
                return nil, validate.ErrTaskListTypeNotSet
×
3450
        }
×
3451

3452
        response, err := wh.GetMatchingClient().DescribeTaskList(ctx, &types.MatchingDescribeTaskListRequest{
18✔
3453
                DomainUUID:  domainID,
18✔
3454
                DescRequest: request,
18✔
3455
        })
18✔
3456
        if err != nil {
18✔
3457
                return nil, err
×
3458
        }
×
3459

3460
        return response, nil
18✔
3461
}
3462

3463
// ListTaskListPartitions returns all the partition and host for a taskList
3464
func (wh *WorkflowHandler) ListTaskListPartitions(
3465
        ctx context.Context,
3466
        request *types.ListTaskListPartitionsRequest,
3467
) (resp *types.ListTaskListPartitionsResponse, retError error) {
×
3468
        if wh.isShuttingDown() {
×
3469
                return nil, validate.ErrShuttingDown
×
3470
        }
×
3471

3472
        if request == nil {
×
3473
                return nil, validate.ErrRequestNotSet
×
3474
        }
×
3475

3476
        if request.GetDomain() == "" {
×
3477
                return nil, validate.ErrDomainNotSet
×
3478
        }
×
3479

3480
        scope := getMetricsScopeWithDomain(metrics.FrontendListTaskListPartitionsScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
3481
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
×
3482
                return nil, err
×
3483
        }
×
3484

3485
        resp, err := wh.GetMatchingClient().ListTaskListPartitions(ctx, &types.MatchingListTaskListPartitionsRequest{
×
3486
                Domain:   request.Domain,
×
3487
                TaskList: request.TaskList,
×
3488
        })
×
3489
        return resp, err
×
3490
}
3491

3492
// GetTaskListsByDomain returns all the partition and host for a taskList
3493
func (wh *WorkflowHandler) GetTaskListsByDomain(
3494
        ctx context.Context,
3495
        request *types.GetTaskListsByDomainRequest,
3496
) (resp *types.GetTaskListsByDomainResponse, retError error) {
×
3497
        if wh.isShuttingDown() {
×
3498
                return nil, validate.ErrShuttingDown
×
3499
        }
×
3500

3501
        if request == nil {
×
3502
                return nil, validate.ErrRequestNotSet
×
3503
        }
×
3504

3505
        if request.GetDomain() == "" {
×
3506
                return nil, validate.ErrDomainNotSet
×
3507
        }
×
3508

3509
        resp, err := wh.GetMatchingClient().GetTaskListsByDomain(ctx, &types.GetTaskListsByDomainRequest{
×
3510
                Domain: request.Domain,
×
3511
        })
×
3512
        return resp, err
×
3513
}
3514

3515
// RefreshWorkflowTasks re-generates the workflow tasks
3516
func (wh *WorkflowHandler) RefreshWorkflowTasks(
3517
        ctx context.Context,
3518
        request *types.RefreshWorkflowTasksRequest,
3519
) (err error) {
×
3520
        if request == nil {
×
3521
                return validate.ErrRequestNotSet
×
3522
        }
×
3523
        if err := validate.CheckExecution(request.Execution); err != nil {
×
3524
                return err
×
3525
        }
×
3526
        domainEntry, err := wh.GetDomainCache().GetDomain(request.GetDomain())
×
3527
        if err != nil {
×
3528
                return err
×
3529
        }
×
3530

3531
        err = wh.GetHistoryClient().RefreshWorkflowTasks(ctx, &types.HistoryRefreshWorkflowTasksRequest{
×
3532
                DomainUIID: domainEntry.GetInfo().ID,
×
3533
                Request:    request,
×
3534
        })
×
3535
        if err != nil {
×
3536
                return err
×
3537
        }
×
3538
        return nil
×
3539
}
3540

3541
func (wh *WorkflowHandler) getRawHistory(
3542
        ctx context.Context,
3543
        scope metrics.Scope,
3544
        domainID string,
3545
        domainName string,
3546
        execution types.WorkflowExecution,
3547
        firstEventID int64,
3548
        nextEventID int64,
3549
        pageSize int32,
3550
        nextPageToken []byte,
3551
        transientDecision *types.TransientDecisionInfo,
3552
        branchToken []byte,
3553
) ([]*types.DataBlob, []byte, error) {
2✔
3554
        rawHistory := []*types.DataBlob{}
2✔
3555
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
2✔
3556

2✔
3557
        resp, err := wh.GetHistoryManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
2✔
3558
                BranchToken:   branchToken,
2✔
3559
                MinEventID:    firstEventID,
2✔
3560
                MaxEventID:    nextEventID,
2✔
3561
                PageSize:      int(pageSize),
2✔
3562
                NextPageToken: nextPageToken,
2✔
3563
                ShardID:       common.IntPtr(shardID),
2✔
3564
                DomainName:    domainName,
2✔
3565
        })
2✔
3566
        if err != nil {
2✔
3567
                return nil, nil, err
×
3568
        }
×
3569

3570
        var encoding *types.EncodingType
2✔
3571
        for _, data := range resp.HistoryEventBlobs {
4✔
3572
                switch data.Encoding {
2✔
3573
                case common.EncodingTypeJSON:
×
3574
                        encoding = types.EncodingTypeJSON.Ptr()
×
3575
                case common.EncodingTypeThriftRW:
2✔
3576
                        encoding = types.EncodingTypeThriftRW.Ptr()
2✔
3577
                default:
×
3578
                        panic(fmt.Sprintf("Invalid encoding type for raw history, encoding type: %s", data.Encoding))
×
3579
                }
3580
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3581
                        EncodingType: encoding,
2✔
3582
                        Data:         data.Data,
2✔
3583
                })
2✔
3584
        }
3585

3586
        if len(resp.NextPageToken) == 0 && transientDecision != nil {
4✔
3587
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
2✔
3588
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3589
                        wh.GetLogger().Error("getHistory error",
×
3590
                                tag.WorkflowDomainID(domainID),
×
3591
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3592
                                tag.WorkflowRunID(execution.GetRunID()),
×
3593
                                tag.Error(err))
×
3594
                }
×
3595
                blob, err := wh.GetPayloadSerializer().SerializeBatchEvents(
2✔
3596
                        []*types.HistoryEvent{transientDecision.ScheduledEvent, transientDecision.StartedEvent}, common.EncodingTypeThriftRW)
2✔
3597
                if err != nil {
2✔
3598
                        return nil, nil, err
×
3599
                }
×
3600
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3601
                        EncodingType: types.EncodingTypeThriftRW.Ptr(),
2✔
3602
                        Data:         blob.Data,
2✔
3603
                })
2✔
3604
        }
3605

3606
        return rawHistory, resp.NextPageToken, nil
2✔
3607
}
3608

3609
func (wh *WorkflowHandler) getHistory(
3610
        ctx context.Context,
3611
        scope metrics.Scope,
3612
        domainID string,
3613
        domainName string,
3614
        execution types.WorkflowExecution,
3615
        firstEventID, nextEventID int64,
3616
        pageSize int32,
3617
        nextPageToken []byte,
3618
        transientDecision *types.TransientDecisionInfo,
3619
        branchToken []byte,
3620
) (*types.History, []byte, error) {
1,593✔
3621

1,593✔
3622
        var size int
1,593✔
3623

1,593✔
3624
        isFirstPage := len(nextPageToken) == 0
1,593✔
3625
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
1,593✔
3626
        var err error
1,593✔
3627
        historyEvents, size, nextPageToken, err := persistenceutils.ReadFullPageV2Events(ctx, wh.GetHistoryManager(), &persistence.ReadHistoryBranchRequest{
1,593✔
3628
                BranchToken:   branchToken,
1,593✔
3629
                MinEventID:    firstEventID,
1,593✔
3630
                MaxEventID:    nextEventID,
1,593✔
3631
                PageSize:      int(pageSize),
1,593✔
3632
                NextPageToken: nextPageToken,
1,593✔
3633
                ShardID:       common.IntPtr(shardID),
1,593✔
3634
                DomainName:    domainName,
1,593✔
3635
        })
1,593✔
3636

1,593✔
3637
        if err != nil {
1,593✔
3638
                return nil, nil, err
×
3639
        }
×
3640

3641
        scope.RecordTimer(metrics.HistorySize, time.Duration(size))
1,593✔
3642

1,593✔
3643
        isLastPage := len(nextPageToken) == 0
1,593✔
3644
        if err := verifyHistoryIsComplete(
1,593✔
3645
                historyEvents,
1,593✔
3646
                firstEventID,
1,593✔
3647
                nextEventID-1,
1,593✔
3648
                isFirstPage,
1,593✔
3649
                isLastPage,
1,593✔
3650
                int(pageSize)); err != nil {
1,593✔
3651
                scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3652
                wh.GetLogger().Error("getHistory: incomplete history",
×
3653
                        tag.WorkflowDomainID(domainID),
×
3654
                        tag.WorkflowID(execution.GetWorkflowID()),
×
3655
                        tag.WorkflowRunID(execution.GetRunID()),
×
3656
                        tag.Error(err))
×
3657
                return nil, nil, err
×
3658
        }
×
3659

3660
        if len(nextPageToken) == 0 && transientDecision != nil {
1,764✔
3661
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
171✔
3662
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3663
                        wh.GetLogger().Error("getHistory error",
×
3664
                                tag.WorkflowDomainID(domainID),
×
3665
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3666
                                tag.WorkflowRunID(execution.GetRunID()),
×
3667
                                tag.Error(err))
×
3668
                }
×
3669
                // Append the transient decision events once we are done enumerating everything from the events table
3670
                historyEvents = append(historyEvents, transientDecision.ScheduledEvent, transientDecision.StartedEvent)
171✔
3671
        }
3672

3673
        executionHistory := &types.History{}
1,593✔
3674
        executionHistory.Events = historyEvents
1,593✔
3675
        return executionHistory, nextPageToken, nil
1,593✔
3676
}
3677

3678
func (wh *WorkflowHandler) validateTransientDecisionEvents(
3679
        expectedNextEventID int64,
3680
        decision *types.TransientDecisionInfo,
3681
) error {
173✔
3682

173✔
3683
        if decision.ScheduledEvent.ID == expectedNextEventID &&
173✔
3684
                decision.StartedEvent.ID == expectedNextEventID+1 {
346✔
3685
                return nil
173✔
3686
        }
173✔
3687

3688
        return fmt.Errorf(
×
3689
                "invalid transient decision: "+
×
3690
                        "expectedScheduledEventID=%v expectedStartedEventID=%v but have scheduledEventID=%v startedEventID=%v",
×
3691
                expectedNextEventID,
×
3692
                expectedNextEventID+1,
×
3693
                decision.ScheduledEvent.ID,
×
3694
                decision.StartedEvent.ID)
×
3695
}
3696

3697
func (wh *WorkflowHandler) validateTaskList(t *types.TaskList, scope metrics.Scope, domain string) error {
2,701✔
3698
        if t == nil || t.GetName() == "" {
2,702✔
3699
                return validate.ErrTaskListNotSet
1✔
3700
        }
1✔
3701

3702
        if !common.IsValidIDLength(
2,700✔
3703
                t.GetName(),
2,700✔
3704
                scope,
2,700✔
3705
                wh.config.MaxIDLengthWarnLimit(),
2,700✔
3706
                wh.config.TaskListNameMaxLength(domain),
2,700✔
3707
                metrics.CadenceErrTaskListNameExceededWarnLimit,
2,700✔
3708
                domain,
2,700✔
3709
                wh.GetLogger(),
2,700✔
3710
                tag.IDTypeTaskListName) {
2,700✔
3711
                return validate.ErrTaskListTooLong
×
3712
        }
×
3713
        return nil
2,700✔
3714
}
3715

3716
func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
3717
        ctx context.Context,
3718
        scope metrics.Scope,
3719
        domainID string,
3720
        matchingResp *types.MatchingPollForDecisionTaskResponse,
3721
        branchToken []byte,
3722
) (*types.PollForDecisionTaskResponse, error) {
1,213✔
3723

1,213✔
3724
        if matchingResp.WorkflowExecution == nil {
1,262✔
3725
                // this will happen if there is no decision task to be send to worker / caller
49✔
3726
                return &types.PollForDecisionTaskResponse{}, nil
49✔
3727
        }
49✔
3728

3729
        var history *types.History
1,164✔
3730
        var continuation []byte
1,164✔
3731
        var err error
1,164✔
3732

1,164✔
3733
        if matchingResp.GetStickyExecutionEnabled() && matchingResp.Query != nil {
1,170✔
3734
                // meaning sticky query, we should not return any events to worker
6✔
3735
                // since query task only check the current status
6✔
3736
                history = &types.History{
6✔
3737
                        Events: []*types.HistoryEvent{},
6✔
3738
                }
6✔
3739
        } else {
1,164✔
3740
                // here we have 3 cases:
1,158✔
3741
                // 1. sticky && non query task
1,158✔
3742
                // 2. non sticky &&  non query task
1,158✔
3743
                // 3. non sticky && query task
1,158✔
3744
                // for 1, partial history have to be send back
1,158✔
3745
                // for 2 and 3, full history have to be send back
1,158✔
3746

1,158✔
3747
                var persistenceToken []byte
1,158✔
3748

1,158✔
3749
                firstEventID := common.FirstEventID
1,158✔
3750
                nextEventID := matchingResp.GetNextEventID()
1,158✔
3751
                if matchingResp.GetStickyExecutionEnabled() {
1,254✔
3752
                        firstEventID = matchingResp.GetPreviousStartedEventID() + 1
96✔
3753
                }
96✔
3754
                domainName, dErr := wh.GetDomainCache().GetDomainName(domainID)
1,158✔
3755
                if dErr != nil {
1,158✔
3756
                        return nil, dErr
×
3757
                }
×
3758
                scope = scope.Tagged(metrics.DomainTag(domainName))
1,158✔
3759
                history, persistenceToken, err = wh.getHistory(
1,158✔
3760
                        ctx,
1,158✔
3761
                        scope,
1,158✔
3762
                        domainID,
1,158✔
3763
                        domainName,
1,158✔
3764
                        *matchingResp.WorkflowExecution,
1,158✔
3765
                        firstEventID,
1,158✔
3766
                        nextEventID,
1,158✔
3767
                        int32(wh.config.HistoryMaxPageSize(domainName)),
1,158✔
3768
                        nil,
1,158✔
3769
                        matchingResp.DecisionInfo,
1,158✔
3770
                        branchToken,
1,158✔
3771
                )
1,158✔
3772
                if err != nil {
1,158✔
3773
                        return nil, err
×
3774
                }
×
3775

3776
                if len(persistenceToken) != 0 {
1,158✔
3777
                        continuation, err = serializeHistoryToken(&getHistoryContinuationToken{
×
3778
                                RunID:             matchingResp.WorkflowExecution.GetRunID(),
×
3779
                                FirstEventID:      firstEventID,
×
3780
                                NextEventID:       nextEventID,
×
3781
                                PersistenceToken:  persistenceToken,
×
3782
                                TransientDecision: matchingResp.DecisionInfo,
×
3783
                                BranchToken:       branchToken,
×
3784
                        })
×
3785
                        if err != nil {
×
3786
                                return nil, err
×
3787
                        }
×
3788
                }
3789
        }
3790

3791
        resp := &types.PollForDecisionTaskResponse{
1,164✔
3792
                TaskToken:                 matchingResp.TaskToken,
1,164✔
3793
                WorkflowExecution:         matchingResp.WorkflowExecution,
1,164✔
3794
                WorkflowType:              matchingResp.WorkflowType,
1,164✔
3795
                PreviousStartedEventID:    matchingResp.PreviousStartedEventID,
1,164✔
3796
                StartedEventID:            matchingResp.StartedEventID, // this field is not set for query tasks as there's no decision task started event
1,164✔
3797
                Query:                     matchingResp.Query,
1,164✔
3798
                BacklogCountHint:          matchingResp.BacklogCountHint,
1,164✔
3799
                Attempt:                   matchingResp.Attempt,
1,164✔
3800
                History:                   history,
1,164✔
3801
                NextPageToken:             continuation,
1,164✔
3802
                WorkflowExecutionTaskList: matchingResp.WorkflowExecutionTaskList,
1,164✔
3803
                ScheduledTimestamp:        matchingResp.ScheduledTimestamp,
1,164✔
3804
                StartedTimestamp:          matchingResp.StartedTimestamp,
1,164✔
3805
                Queries:                   matchingResp.Queries,
1,164✔
3806
                NextEventID:               matchingResp.NextEventID,
1,164✔
3807
                TotalHistoryBytes:         matchingResp.TotalHistoryBytes,
1,164✔
3808
        }
1,164✔
3809

1,164✔
3810
        return resp, nil
1,164✔
3811
}
3812

3813
func verifyHistoryIsComplete(
3814
        events []*types.HistoryEvent,
3815
        expectedFirstEventID int64,
3816
        expectedLastEventID int64,
3817
        isFirstPage bool,
3818
        isLastPage bool,
3819
        pageSize int,
3820
) error {
1,612✔
3821

1,612✔
3822
        nEvents := len(events)
1,612✔
3823
        if nEvents == 0 {
1,624✔
3824
                if isLastPage {
24✔
3825
                        // we seem to be returning a non-nil pageToken on the lastPage which
12✔
3826
                        // in turn cases the client to call getHistory again - only to find
12✔
3827
                        // there are no more events to consume - bail out if this is the case here
12✔
3828
                        return nil
12✔
3829
                }
12✔
3830
                return fmt.Errorf("invalid history: contains zero events")
×
3831
        }
3832

3833
        firstEventID := events[0].ID
1,600✔
3834
        lastEventID := events[nEvents-1].ID
1,600✔
3835

1,600✔
3836
        if !isFirstPage { // atleast one page of history has been read previously
1,636✔
3837
                if firstEventID <= expectedFirstEventID {
36✔
3838
                        // not first page and no events have been read in the previous pages - not possible
×
3839
                        return &types.InternalServiceError{
×
3840
                                Message: fmt.Sprintf(
×
3841
                                        "invalid history: expected first eventID to be > %v but got %v", expectedFirstEventID, firstEventID),
×
3842
                        }
×
3843
                }
×
3844
                expectedFirstEventID = firstEventID
36✔
3845
        }
3846

3847
        if !isLastPage {
1,649✔
3848
                // estimate lastEventID based on pageSize. This is a lower bound
49✔
3849
                // since the persistence layer counts "batch of events" as a single page
49✔
3850
                expectedLastEventID = expectedFirstEventID + int64(pageSize) - 1
49✔
3851
        }
49✔
3852

3853
        nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1
1,600✔
3854

1,600✔
3855
        if firstEventID == expectedFirstEventID &&
1,600✔
3856
                ((isLastPage && lastEventID == expectedLastEventID && int64(nEvents) == nExpectedEvents) ||
1,600✔
3857
                        (!isLastPage && lastEventID >= expectedLastEventID && int64(nEvents) >= nExpectedEvents)) {
3,188✔
3858
                return nil
1,588✔
3859
        }
1,588✔
3860

3861
        return &types.InternalServiceError{
12✔
3862
                Message: fmt.Sprintf(
12✔
3863
                        "incomplete history: "+
12✔
3864
                                "expected events [%v-%v] but got events [%v-%v] of length %v:"+
12✔
3865
                                "isFirstPage=%v,isLastPage=%v,pageSize=%v",
12✔
3866
                        expectedFirstEventID,
12✔
3867
                        expectedLastEventID,
12✔
3868
                        firstEventID,
12✔
3869
                        lastEventID,
12✔
3870
                        nEvents,
12✔
3871
                        isFirstPage,
12✔
3872
                        isLastPage,
12✔
3873
                        pageSize),
12✔
3874
        }
12✔
3875
}
3876

3877
func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) {
44✔
3878
        token := &getHistoryContinuationToken{}
44✔
3879
        err := json.Unmarshal(bytes, token)
44✔
3880
        return token, err
44✔
3881
}
44✔
3882

3883
func serializeHistoryToken(token *getHistoryContinuationToken) ([]byte, error) {
436✔
3884
        if token == nil {
830✔
3885
                return nil, nil
394✔
3886
        }
394✔
3887

3888
        bytes, err := json.Marshal(token)
42✔
3889
        return bytes, err
42✔
3890
}
3891

3892
func isFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3893
        return updateRequest.ActiveClusterName != nil
9✔
3894
}
9✔
3895

3896
func isGraceFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3897
        return updateRequest.FailoverTimeoutInSeconds != nil
9✔
3898
}
9✔
3899

3900
func (wh *WorkflowHandler) checkOngoingFailover(
3901
        ctx context.Context,
3902
        domainName *string,
3903
) error {
1✔
3904

1✔
3905
        enabledClusters := wh.GetClusterMetadata().GetEnabledClusterInfo()
1✔
3906
        respChan := make(chan *types.DescribeDomainResponse, len(enabledClusters))
1✔
3907

1✔
3908
        g := &errgroup.Group{}
1✔
3909
        for clusterName := range enabledClusters {
3✔
3910
                frontendClient := wh.GetRemoteFrontendClient(clusterName)
2✔
3911
                g.Go(func() (e error) {
4✔
3912
                        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &e) }()
4✔
3913

3914
                        resp, _ := frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{Name: domainName})
2✔
3915
                        respChan <- resp
2✔
3916
                        return nil
2✔
3917
                })
3918
        }
3919
        g.Wait()
1✔
3920
        close(respChan)
1✔
3921

1✔
3922
        var failoverVersion *int64
1✔
3923
        for resp := range respChan {
3✔
3924
                if resp == nil {
2✔
3925
                        return &types.InternalServiceError{
×
3926
                                Message: "Failed to verify failover version from all clusters",
×
3927
                        }
×
3928
                }
×
3929
                if failoverVersion == nil {
3✔
3930
                        failoverVersion = &resp.FailoverVersion
1✔
3931
                }
1✔
3932
                if *failoverVersion != resp.GetFailoverVersion() {
2✔
3933
                        return &types.BadRequestError{
×
3934
                                Message: "Concurrent failover is not allow.",
×
3935
                        }
×
3936
                }
×
3937
        }
3938
        return nil
1✔
3939
}
3940

3941
func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *types.GetWorkflowExecutionHistoryRequest, domainID string) bool {
447✔
3942
        if request.GetExecution() == nil || request.GetExecution().GetRunID() == "" {
482✔
3943
                return false
35✔
3944
        }
35✔
3945
        getMutableStateRequest := &types.GetMutableStateRequest{
412✔
3946
                DomainUUID: domainID,
412✔
3947
                Execution:  request.Execution,
412✔
3948
        }
412✔
3949
        _, err := wh.GetHistoryClient().GetMutableState(ctx, getMutableStateRequest)
412✔
3950
        if err == nil {
798✔
3951
                return false
386✔
3952
        }
386✔
3953
        switch err.(type) {
26✔
3954
        case *types.EntityNotExistsError:
25✔
3955
                // the only case in which history is assumed to be archived is if getting mutable state returns entity not found error
25✔
3956
                return true
25✔
3957
        }
3958
        return false
1✔
3959
}
3960

3961
func (wh *WorkflowHandler) getArchivedHistory(
3962
        ctx context.Context,
3963
        request *types.GetWorkflowExecutionHistoryRequest,
3964
        domainID string,
3965
) (*types.GetWorkflowExecutionHistoryResponse, error) {
28✔
3966
        entry, err := wh.GetDomainCache().GetDomainByID(domainID)
28✔
3967
        if err != nil {
29✔
3968
                return nil, err
1✔
3969
        }
1✔
3970

3971
        URIString := entry.GetConfig().HistoryArchivalURI
27✔
3972
        if URIString == "" {
28✔
3973
                // if URI is empty, it means the domain has never enabled for archival.
1✔
3974
                // the error is not "workflow has passed retention period", because
1✔
3975
                // we have no way to tell if the requested workflow exists or not.
1✔
3976
                return nil, validate.ErrHistoryNotFound
1✔
3977
        }
1✔
3978

3979
        URI, err := archiver.NewURI(URIString)
26✔
3980
        if err != nil {
27✔
3981
                return nil, err
1✔
3982
        }
1✔
3983

3984
        historyArchiver, err := wh.GetArchiverProvider().GetHistoryArchiver(URI.Scheme(), service.Frontend)
25✔
3985
        if err != nil {
25✔
3986
                return nil, err
×
3987
        }
×
3988

3989
        resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{
25✔
3990
                DomainID:      domainID,
25✔
3991
                WorkflowID:    request.GetExecution().GetWorkflowID(),
25✔
3992
                RunID:         request.GetExecution().GetRunID(),
25✔
3993
                NextPageToken: request.GetNextPageToken(),
25✔
3994
                PageSize:      int(request.GetMaximumPageSize()),
25✔
3995
        })
25✔
3996
        if err != nil {
28✔
3997
                return nil, err
3✔
3998
        }
3✔
3999

4000
        history := &types.History{}
22✔
4001
        for _, batch := range resp.HistoryBatches {
279✔
4002
                history.Events = append(history.Events, batch.Events...)
257✔
4003
        }
257✔
4004
        return &types.GetWorkflowExecutionHistoryResponse{
22✔
4005
                History:       history,
22✔
4006
                NextPageToken: resp.NextPageToken,
22✔
4007
                Archived:      true,
22✔
4008
        }, nil
22✔
4009
}
4010

4011
func (wh *WorkflowHandler) convertIndexedKeyToThrift(keys map[string]interface{}) map[string]types.IndexedValueType {
3✔
4012
        converted := make(map[string]types.IndexedValueType)
3✔
4013
        for k, v := range keys {
51✔
4014
                converted[k] = common.ConvertIndexedValueTypeToInternalType(v, wh.GetLogger())
48✔
4015
        }
48✔
4016
        return converted
2✔
4017
}
4018

4019
func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain string) bool {
305✔
4020
        return common.IsAdvancedVisibilityReadingEnabled(wh.config.EnableReadVisibilityFromES(domain), wh.config.IsAdvancedVisConfigExist) &&
305✔
4021
                pageSize > int32(wh.config.ESIndexMaxResultWindow())
305✔
4022
}
305✔
4023

4024
// GetClusterInfo return information about cadence deployment
4025
func (wh *WorkflowHandler) GetClusterInfo(
4026
        ctx context.Context,
4027
) (resp *types.ClusterInfo, err error) {
×
4028
        return &types.ClusterInfo{
×
4029
                SupportedClientVersions: &types.SupportedClientVersions{
×
4030
                        GoSdk:   client.SupportedGoSDKVersion,
×
4031
                        JavaSdk: client.SupportedJavaSDKVersion,
×
4032
                },
×
4033
        }, nil
×
4034
}
×
4035

4036
func checkFailOverPermission(config *config.Config, domainName string) error {
2✔
4037
        if config.Lockdown(domainName) {
3✔
4038
                return validate.ErrDomainInLockdown
1✔
4039
        }
1✔
4040
        return nil
1✔
4041
}
4042

4043
type domainWrapper struct {
4044
        domain string
4045
}
4046

4047
func (d domainWrapper) GetDomain() string {
1,760✔
4048
        return d.domain
1,760✔
4049
}
1,760✔
4050

4051
func (hs HealthStatus) String() string {
2✔
4052
        switch hs {
2✔
4053
        case HealthStatusOK:
1✔
4054
                return "OK"
1✔
4055
        case HealthStatusWarmingUp:
1✔
4056
                return "WarmingUp"
1✔
4057
        case HealthStatusShuttingDown:
×
4058
                return "ShuttingDown"
×
4059
        default:
×
4060
                return "unknown"
×
4061
        }
4062
}
4063

4064
func getDomainWfIDRunIDTags(
4065
        domainName string,
4066
        wf *types.WorkflowExecution,
4067
) []tag.Tag {
1,491✔
4068
        tags := []tag.Tag{tag.WorkflowDomainName(domainName)}
1,491✔
4069
        if wf == nil {
2,982✔
4070
                return tags
1,491✔
4071
        }
1,491✔
4072
        return append(
×
4073
                tags,
×
4074
                tag.WorkflowID(wf.GetWorkflowID()),
×
4075
                tag.WorkflowRunID(wf.GetRunID()),
×
4076
        )
×
4077
}
4078

4079
func checkRequiredDomainDataKVs(requiredDomainDataKeys map[string]interface{}, domainData map[string]string) error {
45✔
4080
        // check requiredDomainDataKeys
45✔
4081
        for k := range requiredDomainDataKeys {
46✔
4082
                _, ok := domainData[k]
1✔
4083
                if !ok {
2✔
4084
                        return fmt.Errorf("domain data error, missing required key %v . All required keys: %v", k, requiredDomainDataKeys)
1✔
4085
                }
1✔
4086
        }
4087
        return nil
44✔
4088
}
4089

4090
// Some error types are introduced later that some clients might not support
4091
// To make them backward compatible, we continue returning the legacy error types
4092
// for older clients
4093
func (wh *WorkflowHandler) normalizeVersionedErrors(ctx context.Context, err error) error {
66✔
4094
        switch err.(type) {
66✔
4095
        case *types.WorkflowExecutionAlreadyCompletedError:
17✔
4096
                call := yarpc.CallFromContext(ctx)
17✔
4097
                clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
17✔
4098
                clientImpl := call.Header(common.ClientImplHeaderName)
17✔
4099
                featureFlags := client.GetFeatureFlagsFromHeader(call)
17✔
4100

17✔
4101
                vErr := wh.versionChecker.SupportsWorkflowAlreadyCompletedError(clientImpl, clientFeatureVersion, featureFlags)
17✔
4102
                if vErr == nil {
20✔
4103
                        return err
3✔
4104
                }
3✔
4105
                return &types.EntityNotExistsError{Message: "Workflow execution already completed."}
14✔
4106
        default:
49✔
4107
                return err
49✔
4108
        }
4109
}
4110
func constructRestartWorkflowRequest(w *types.WorkflowExecutionStartedEventAttributes, domain string, identity string, workflowID string) *types.StartWorkflowExecutionRequest {
1✔
4111

1✔
4112
        startRequest := &types.StartWorkflowExecutionRequest{
1✔
4113
                RequestID:  uuid.New().String(),
1✔
4114
                Domain:     domain,
1✔
4115
                WorkflowID: workflowID,
1✔
4116
                WorkflowType: &types.WorkflowType{
1✔
4117
                        Name: w.WorkflowType.Name,
1✔
4118
                },
1✔
4119
                TaskList: &types.TaskList{
1✔
4120
                        Name: w.TaskList.Name,
1✔
4121
                },
1✔
4122
                Input:                               w.Input,
1✔
4123
                ExecutionStartToCloseTimeoutSeconds: w.ExecutionStartToCloseTimeoutSeconds,
1✔
4124
                TaskStartToCloseTimeoutSeconds:      w.TaskStartToCloseTimeoutSeconds,
1✔
4125
                Identity:                            identity,
1✔
4126
                WorkflowIDReusePolicy:               types.WorkflowIDReusePolicyTerminateIfRunning.Ptr(),
1✔
4127
        }
1✔
4128
        startRequest.CronSchedule = w.CronSchedule
1✔
4129
        startRequest.RetryPolicy = w.RetryPolicy
1✔
4130
        startRequest.DelayStartSeconds = w.FirstDecisionTaskBackoffSeconds
1✔
4131
        startRequest.Header = w.Header
1✔
4132
        startRequest.Memo = w.Memo
1✔
4133
        startRequest.SearchAttributes = w.SearchAttributes
1✔
4134

1✔
4135
        return startRequest
1✔
4136
}
1✔
4137

4138
func getMetricsScopeWithDomain(
4139
        scope int,
4140
        d domainGetter,
4141
        metricsClient metrics.Client,
4142
) metrics.Scope {
5,776✔
4143
        var metricsScope metrics.Scope
5,776✔
4144
        if d != nil {
11,552✔
4145
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainTag(d.GetDomain()))
5,776✔
4146
        } else {
5,776✔
4147
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainUnknownTag())
×
4148
        }
×
4149
        return metricsScope
5,776✔
4150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc