• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018df99b-a386-4814-ad28-09490f5042bf

01 Mar 2024 10:42AM UTC coverage: 62.868% (-0.03%) from 62.9%
018df99b-a386-4814-ad28-09490f5042bf

push

buildkite

web-flow
Fix the local integration test docker-compose file (#5695)

What changed?
Moved updates to the docker-compose file used in the CI to the local
docker-compose.

Why?
This enables us to run the integration tests locally in docker

How did you test it?
Local tests

Potential risks

Release notes

Documentation Changes

92923 of 147806 relevant lines covered (62.87%)

2352.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.05
/service/frontend/api/handler.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package api
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "fmt"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/google/uuid"
31
        "go.uber.org/yarpc"
32
        "golang.org/x/sync/errgroup"
33

34
        "github.com/uber/cadence/.gen/go/shared"
35
        "github.com/uber/cadence/.gen/go/sqlblobs"
36
        "github.com/uber/cadence/common"
37
        "github.com/uber/cadence/common/archiver"
38
        "github.com/uber/cadence/common/backoff"
39
        "github.com/uber/cadence/common/cache"
40
        "github.com/uber/cadence/common/client"
41
        "github.com/uber/cadence/common/domain"
42
        "github.com/uber/cadence/common/elasticsearch/validator"
43
        "github.com/uber/cadence/common/log"
44
        "github.com/uber/cadence/common/log/tag"
45
        "github.com/uber/cadence/common/metrics"
46
        "github.com/uber/cadence/common/partition"
47
        "github.com/uber/cadence/common/persistence"
48
        persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
49
        "github.com/uber/cadence/common/resource"
50
        "github.com/uber/cadence/common/service"
51
        "github.com/uber/cadence/common/types"
52
        "github.com/uber/cadence/service/frontend/config"
53
        "github.com/uber/cadence/service/frontend/validate"
54
)
55

56
const (
57
        // HealthStatusOK is used when this node is healthy and rpc requests are allowed
58
        HealthStatusOK HealthStatus = iota + 1
59
        // HealthStatusWarmingUp is used when the rpc handler is warming up
60
        HealthStatusWarmingUp
61
        // HealthStatusShuttingDown is used when the rpc handler is shutting down
62
        HealthStatusShuttingDown
63
)
64

65
var _ Handler = (*WorkflowHandler)(nil)
66

67
type (
68
        // WorkflowHandler - Thrift handler interface for workflow service
69
        WorkflowHandler struct {
70
                resource.Resource
71

72
                shuttingDown              int32
73
                healthStatus              int32
74
                tokenSerializer           common.TaskTokenSerializer
75
                config                    *config.Config
76
                versionChecker            client.VersionChecker
77
                domainHandler             domain.Handler
78
                visibilityQueryValidator  *validator.VisibilityQueryValidator
79
                searchAttributesValidator *validator.SearchAttributesValidator
80
                throttleRetry             *backoff.ThrottleRetry
81
                producerManager           ProducerManager
82
        }
83

84
        getHistoryContinuationToken struct {
85
                RunID             string
86
                FirstEventID      int64
87
                NextEventID       int64
88
                IsWorkflowRunning bool
89
                PersistenceToken  []byte
90
                TransientDecision *types.TransientDecisionInfo
91
                BranchToken       []byte
92
        }
93

94
        domainGetter interface {
95
                GetDomain() string
96
        }
97

98
        // HealthStatus is an enum that refers to the rpc handler health status
99
        HealthStatus int32
100
)
101

102
var (
103
        frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
104
)
105

106
// NewWorkflowHandler creates a thrift handler for the cadence service
107
func NewWorkflowHandler(
108
        resource resource.Resource,
109
        config *config.Config,
110
        versionChecker client.VersionChecker,
111
        domainHandler domain.Handler,
112
) *WorkflowHandler {
84✔
113
        return &WorkflowHandler{
84✔
114
                Resource:        resource,
84✔
115
                config:          config,
84✔
116
                healthStatus:    int32(HealthStatusWarmingUp),
84✔
117
                tokenSerializer: common.NewJSONTaskTokenSerializer(),
84✔
118
                versionChecker:  versionChecker,
84✔
119
                domainHandler:   domainHandler,
84✔
120
                visibilityQueryValidator: validator.NewQueryValidator(
84✔
121
                        config.ValidSearchAttributes,
84✔
122
                        config.EnableQueryAttributeValidation,
84✔
123
                ),
84✔
124
                searchAttributesValidator: validator.NewSearchAttributesValidator(
84✔
125
                        resource.GetLogger(),
84✔
126
                        config.EnableQueryAttributeValidation,
84✔
127
                        config.ValidSearchAttributes,
84✔
128
                        config.SearchAttributesNumberOfKeysLimit,
84✔
129
                        config.SearchAttributesSizeOfValueLimit,
84✔
130
                        config.SearchAttributesTotalSizeLimit,
84✔
131
                ),
84✔
132
                throttleRetry: backoff.NewThrottleRetry(
84✔
133
                        backoff.WithRetryPolicy(frontendServiceRetryPolicy),
84✔
134
                        backoff.WithRetryableError(common.IsServiceTransientError),
84✔
135
                ),
84✔
136
                producerManager: NewProducerManager(
84✔
137
                        resource.GetDomainCache(),
84✔
138
                        resource.GetAsyncWorkflowQueueProvider(),
84✔
139
                        resource.GetLogger(),
84✔
140
                        resource.GetMetricsClient(),
84✔
141
                ),
84✔
142
        }
84✔
143
}
84✔
144

145
// Start starts the handler
146
func (wh *WorkflowHandler) Start() {
15✔
147
        // TODO: Get warmup duration from config. Even better, run proactive checks such as probing downstream connections.
15✔
148
        const warmUpDuration = 30 * time.Second
15✔
149

15✔
150
        warmupTimer := time.NewTimer(warmUpDuration)
15✔
151
        go func() {
30✔
152
                <-warmupTimer.C
15✔
153
                wh.GetLogger().Warn("Service warmup duration has elapsed.")
15✔
154
                if atomic.CompareAndSwapInt32(&wh.healthStatus, int32(HealthStatusWarmingUp), int32(HealthStatusOK)) {
27✔
155
                        wh.GetLogger().Warn("Warmup time has elapsed. Service is healthy.")
12✔
156
                } else {
12✔
157
                        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
×
158
                        wh.GetLogger().Warn(fmt.Sprintf("Warmup time has elapsed. Service status is: %v", status.String()))
×
159
                }
×
160
        }()
161
}
162

163
// Stop stops the handler
164
func (wh *WorkflowHandler) Stop() {
15✔
165
        atomic.StoreInt32(&wh.shuttingDown, 1)
15✔
166
}
15✔
167

168
// UpdateHealthStatus sets the health status for this rpc handler.
169
// This health status will be used within the rpc health check handler
170
func (wh *WorkflowHandler) UpdateHealthStatus(status HealthStatus) {
16✔
171
        atomic.StoreInt32(&wh.healthStatus, int32(status))
16✔
172
}
16✔
173

174
func (wh *WorkflowHandler) isShuttingDown() bool {
6,437✔
175
        return atomic.LoadInt32(&wh.shuttingDown) != 0
6,437✔
176
}
6,437✔
177

178
// Health is for health check
179
func (wh *WorkflowHandler) Health(ctx context.Context) (*types.HealthStatus, error) {
2✔
180
        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
2✔
181
        msg := status.String()
2✔
182

2✔
183
        if status != HealthStatusOK {
3✔
184
                wh.GetLogger().Warn(fmt.Sprintf("Service status is: %v", msg))
1✔
185
        }
1✔
186

187
        return &types.HealthStatus{
2✔
188
                Ok:  status == HealthStatusOK,
2✔
189
                Msg: msg,
2✔
190
        }, nil
2✔
191
}
192

193
// RegisterDomain creates a new domain which can be used as a container for all resources.  Domain is a top level
194
// entity within Cadence, used as a container for all resources like workflow executions, tasklists, etc.  Domain
195
// acts as a sandbox and provides isolation for all resources within the domain.  All resources belongs to exactly one
196
// domain.
197
func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *types.RegisterDomainRequest) (retError error) {
45✔
198
        if wh.isShuttingDown() {
45✔
199
                return validate.ErrShuttingDown
×
200
        }
×
201

202
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
45✔
203
                return err
×
204
        }
×
205

206
        if registerRequest == nil {
45✔
207
                return validate.ErrRequestNotSet
×
208
        }
×
209

210
        if registerRequest.GetWorkflowExecutionRetentionPeriodInDays() > int32(wh.config.DomainConfig.MaxRetentionDays()) {
45✔
211
                return validate.ErrInvalidRetention
×
212
        }
×
213

214
        if err := validate.CheckPermission(wh.config, registerRequest.SecurityToken); err != nil {
45✔
215
                return err
×
216
        }
×
217

218
        if err := checkRequiredDomainDataKVs(wh.config.DomainConfig.RequiredDomainDataKeys(), registerRequest.GetData()); err != nil {
46✔
219
                return err
1✔
220
        }
1✔
221

222
        if registerRequest.GetName() == "" {
44✔
223
                return validate.ErrDomainNotSet
×
224
        }
×
225

226
        return wh.domainHandler.RegisterDomain(ctx, registerRequest)
44✔
227
}
228

229
// ListDomains returns the information and configuration for a registered domain.
230
func (wh *WorkflowHandler) ListDomains(
231
        ctx context.Context,
232
        listRequest *types.ListDomainsRequest,
233
) (response *types.ListDomainsResponse, retError error) {
2✔
234
        if wh.isShuttingDown() {
2✔
235
                return nil, validate.ErrShuttingDown
×
236
        }
×
237

238
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
239
                return nil, err
×
240
        }
×
241

242
        if listRequest == nil {
3✔
243
                return nil, validate.ErrRequestNotSet
1✔
244
        }
1✔
245

246
        return wh.domainHandler.ListDomains(ctx, listRequest)
1✔
247
}
248

249
// DescribeDomain returns the information and configuration for a registered domain.
250
func (wh *WorkflowHandler) DescribeDomain(
251
        ctx context.Context,
252
        describeRequest *types.DescribeDomainRequest,
253
) (response *types.DescribeDomainResponse, retError error) {
134✔
254
        if wh.isShuttingDown() {
134✔
255
                return nil, validate.ErrShuttingDown
×
256
        }
×
257

258
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
134✔
259
                return nil, err
×
260
        }
×
261

262
        if describeRequest == nil {
134✔
263
                return nil, validate.ErrRequestNotSet
×
264
        }
×
265

266
        if describeRequest.GetName() == "" && describeRequest.GetUUID() == "" {
134✔
267
                return nil, validate.ErrDomainNotSet
×
268
        }
×
269

270
        resp, err := wh.domainHandler.DescribeDomain(ctx, describeRequest)
134✔
271
        if err != nil {
134✔
272
                return nil, err
×
273
        }
×
274

275
        if resp.GetFailoverInfo() != nil && resp.GetFailoverInfo().GetFailoverExpireTimestamp() > 0 {
134✔
276
                // fetch ongoing failover info from history service
×
277
                failoverResp, err := wh.GetHistoryClient().GetFailoverInfo(ctx, &types.GetFailoverInfoRequest{
×
278
                        DomainID: resp.GetDomainInfo().UUID,
×
279
                })
×
280
                if err != nil {
×
281
                        // despite the error from history, return describe domain response
×
282
                        wh.GetLogger().Error(
×
283
                                fmt.Sprintf("Failed to get failover info for domain %s", resp.DomainInfo.GetName()),
×
284
                                tag.Error(err),
×
285
                        )
×
286
                        return resp, nil
×
287
                }
×
288
                resp.FailoverInfo.CompletedShardCount = failoverResp.GetCompletedShardCount()
×
289
                resp.FailoverInfo.PendingShards = failoverResp.GetPendingShards()
×
290
        }
291
        return resp, nil
134✔
292
}
293

294
// UpdateDomain is used to update the information and configuration for a registered domain.
295
func (wh *WorkflowHandler) UpdateDomain(
296
        ctx context.Context,
297
        updateRequest *types.UpdateDomainRequest,
298
) (resp *types.UpdateDomainResponse, retError error) {
9✔
299
        domainName := ""
9✔
300
        if updateRequest != nil {
18✔
301
                domainName = updateRequest.GetName()
9✔
302
        }
9✔
303

304
        logger := wh.GetLogger().WithTags(
9✔
305
                tag.WorkflowDomainName(domainName),
9✔
306
                tag.OperationName("DomainUpdate"))
9✔
307

9✔
308
        if updateRequest == nil {
9✔
309
                logger.Error("Nil domain update request.",
×
310
                        tag.Error(validate.ErrRequestNotSet))
×
311
                return nil, validate.ErrRequestNotSet
×
312
        }
×
313

314
        isFailover := isFailoverRequest(updateRequest)
9✔
315
        isGraceFailover := isGraceFailoverRequest(updateRequest)
9✔
316
        logger.Info(fmt.Sprintf(
9✔
317
                "Domain Update requested. isFailover: %v, isGraceFailover: %v, Request: %#v.",
9✔
318
                isFailover,
9✔
319
                isGraceFailover,
9✔
320
                updateRequest))
9✔
321

9✔
322
        if wh.isShuttingDown() {
9✔
323
                logger.Error("Won't apply the domain update since workflowHandler is shutting down.",
×
324
                        tag.Error(validate.ErrShuttingDown))
×
325
                return nil, validate.ErrShuttingDown
×
326
        }
×
327

328
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
9✔
329
                logger.Error("Won't apply the domain update since client version is not supported.",
×
330
                        tag.Error(err))
×
331
                return nil, err
×
332
        }
×
333

334
        // don't require permission for failover request
335
        if isFailover {
11✔
336
                // reject the failover if the cluster is in lockdown
2✔
337
                if err := checkFailOverPermission(wh.config, updateRequest.GetName()); err != nil {
3✔
338
                        logger.Error("Domain failover request rejected since domain is in lockdown.",
1✔
339
                                tag.Error(err))
1✔
340
                        return nil, err
1✔
341
                }
1✔
342
        } else {
7✔
343
                if err := validate.CheckPermission(wh.config, updateRequest.SecurityToken); err != nil {
7✔
344
                        logger.Error("Domain update request rejected due to failing permissions.",
×
345
                                tag.Error(err))
×
346
                        return nil, err
×
347
                }
×
348
        }
349

350
        if isGraceFailover {
9✔
351
                if err := wh.checkOngoingFailover(
1✔
352
                        ctx,
1✔
353
                        &updateRequest.Name,
1✔
354
                ); err != nil {
1✔
355
                        logger.Error("Graceful domain failover request failed. Not able to check ongoing failovers.",
×
356
                                tag.Error(err))
×
357
                        return nil, err
×
358
                }
×
359
        }
360

361
        if updateRequest.GetName() == "" {
8✔
362
                logger.Error("Domain not set on request.",
×
363
                        tag.Error(validate.ErrDomainNotSet))
×
364
                return nil, validate.ErrDomainNotSet
×
365
        }
×
366
        // TODO: call remote clusters to verify domain data
367
        resp, err := wh.domainHandler.UpdateDomain(ctx, updateRequest)
8✔
368
        if err != nil {
10✔
369
                logger.Error("Domain update operation failed.",
2✔
370
                        tag.Error(err))
2✔
371
                return nil, err
2✔
372
        }
2✔
373
        logger.Info("Domain update operation succeeded.")
6✔
374
        return resp, nil
6✔
375
}
376

377
// DeprecateDomain us used to update status of a registered domain to DEPRECATED. Once the domain is deprecated
378
// it cannot be used to start new workflow executions.  Existing workflow executions will continue to run on
379
// deprecated domains.
380
func (wh *WorkflowHandler) DeprecateDomain(ctx context.Context, deprecateRequest *types.DeprecateDomainRequest) (retError error) {
×
381
        if wh.isShuttingDown() {
×
382
                return validate.ErrShuttingDown
×
383
        }
×
384

385
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
386
                return err
×
387
        }
×
388

389
        if deprecateRequest == nil {
×
390
                return validate.ErrRequestNotSet
×
391
        }
×
392

393
        if err := validate.CheckPermission(wh.config, deprecateRequest.SecurityToken); err != nil {
×
394
                return err
×
395
        }
×
396

397
        if deprecateRequest.GetName() == "" {
×
398
                return validate.ErrDomainNotSet
×
399
        }
×
400

401
        return wh.domainHandler.DeprecateDomain(ctx, deprecateRequest)
×
402
}
403

404
// PollForActivityTask - Poll for an activity task.
405
func (wh *WorkflowHandler) PollForActivityTask(
406
        ctx context.Context,
407
        pollRequest *types.PollForActivityTaskRequest,
408
) (resp *types.PollForActivityTaskResponse, retError error) {
701✔
409
        callTime := time.Now()
701✔
410

701✔
411
        if wh.isShuttingDown() {
701✔
412
                return nil, validate.ErrShuttingDown
×
413
        }
×
414

415
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
701✔
416
                return nil, err
×
417
        }
×
418

419
        if pollRequest == nil {
701✔
420
                return nil, validate.ErrRequestNotSet
×
421
        }
×
422

423
        domainName := pollRequest.GetDomain()
701✔
424
        if domainName == "" {
701✔
425
                return nil, validate.ErrDomainNotSet
×
426
        }
×
427

428
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForActivityTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
701✔
429
        wh.GetLogger().Debug("Received PollForActivityTask")
701✔
430
        if err := common.ValidateLongPollContextTimeout(
701✔
431
                ctx,
701✔
432
                "PollForActivityTask",
701✔
433
                wh.GetThrottledLogger(),
701✔
434
        ); err != nil {
703✔
435
                return nil, err
2✔
436
        }
2✔
437

438
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
699✔
439
        if !common.IsValidIDLength(
699✔
440
                domainName,
699✔
441
                scope,
699✔
442
                idLengthWarnLimit,
699✔
443
                wh.config.DomainNameMaxLength(domainName),
699✔
444
                metrics.CadenceErrDomainNameExceededWarnLimit,
699✔
445
                domainName,
699✔
446
                wh.GetLogger(),
699✔
447
                tag.IDTypeDomainName) {
699✔
448
                return nil, validate.ErrDomainTooLong
×
449
        }
×
450

451
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
699✔
452
                return nil, err
×
453
        }
×
454

455
        if !common.IsValidIDLength(
699✔
456
                pollRequest.GetIdentity(),
699✔
457
                scope,
699✔
458
                idLengthWarnLimit,
699✔
459
                wh.config.IdentityMaxLength(domainName),
699✔
460
                metrics.CadenceErrIdentityExceededWarnLimit,
699✔
461
                domainName,
699✔
462
                wh.GetLogger(),
699✔
463
                tag.IDTypeIdentity) {
699✔
464
                return nil, validate.ErrIdentityTooLong
×
465
        }
×
466

467
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
699✔
468
        if err != nil {
959✔
469
                return nil, err
260✔
470
        }
260✔
471

472
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
439✔
473
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
440✔
474
                return &types.PollForActivityTaskResponse{}, nil
1✔
475
        }
1✔
476
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
477
        // in this case, return an empty response
478
        if err := common.ValidateLongPollContextTimeout(
438✔
479
                ctx,
438✔
480
                "PollForActivityTask",
438✔
481
                wh.GetThrottledLogger(),
438✔
482
        ); err != nil {
438✔
483
                return &types.PollForActivityTaskResponse{}, nil
×
484
        }
×
485
        pollerID := uuid.New().String()
438✔
486
        op := func() error {
876✔
487
                resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{
438✔
488
                        DomainUUID:     domainID,
438✔
489
                        PollerID:       pollerID,
438✔
490
                        PollRequest:    pollRequest,
438✔
491
                        IsolationGroup: isolationGroup,
438✔
492
                })
438✔
493
                return err
438✔
494
        }
438✔
495

496
        err = wh.throttleRetry.Do(ctx, op)
438✔
497
        if err != nil {
504✔
498
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeActivity, pollRequest.TaskList, pollerID)
66✔
499
                if err != nil {
66✔
500
                        // For all other errors log an error and return it back to client.
×
501
                        ctxTimeout := "not-set"
×
502
                        ctxDeadline, ok := ctx.Deadline()
×
503
                        if ok {
×
504
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
505
                        }
×
506
                        wh.GetLogger().Error("PollForActivityTask failed.",
×
507
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
508
                                tag.Value(ctxTimeout),
×
509
                                tag.Error(err))
×
510
                        return nil, err
×
511
                }
512
        }
513
        return resp, nil
438✔
514
}
515

516
// PollForDecisionTask - Poll for a decision task.
517
func (wh *WorkflowHandler) PollForDecisionTask(
518
        ctx context.Context,
519
        pollRequest *types.PollForDecisionTaskRequest,
520
) (resp *types.PollForDecisionTaskResponse, retError error) {
1,480✔
521
        callTime := time.Now()
1,480✔
522

1,480✔
523
        if wh.isShuttingDown() {
1,480✔
524
                return nil, validate.ErrShuttingDown
×
525
        }
×
526

527
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1,480✔
528
                return nil, err
×
529
        }
×
530

531
        if pollRequest == nil {
1,480✔
532
                return nil, validate.ErrRequestNotSet
×
533
        }
×
534

535
        domainName := pollRequest.GetDomain()
1,480✔
536
        tags := getDomainWfIDRunIDTags(domainName, nil)
1,480✔
537

1,480✔
538
        if domainName == "" {
1,480✔
539
                return nil, validate.ErrDomainNotSet
×
540
        }
×
541

542
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForDecisionTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1,480✔
543
        wh.GetLogger().Debug("Received PollForDecisionTask")
1,480✔
544
        if err := common.ValidateLongPollContextTimeout(
1,480✔
545
                ctx,
1,480✔
546
                "PollForDecisionTask",
1,480✔
547
                wh.GetThrottledLogger(),
1,480✔
548
        ); err != nil {
1,482✔
549
                return nil, err
2✔
550
        }
2✔
551

552
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
1,478✔
553
        if !common.IsValidIDLength(
1,478✔
554
                domainName,
1,478✔
555
                scope,
1,478✔
556
                idLengthWarnLimit,
1,478✔
557
                wh.config.DomainNameMaxLength(domainName),
1,478✔
558
                metrics.CadenceErrDomainNameExceededWarnLimit,
1,478✔
559
                domainName,
1,478✔
560
                wh.GetLogger(),
1,478✔
561
                tag.IDTypeDomainName) {
1,478✔
562
                return nil, validate.ErrDomainTooLong
×
563
        }
×
564

565
        if !common.IsValidIDLength(
1,478✔
566
                pollRequest.GetIdentity(),
1,478✔
567
                scope,
1,478✔
568
                idLengthWarnLimit,
1,478✔
569
                wh.config.IdentityMaxLength(domainName),
1,478✔
570
                metrics.CadenceErrIdentityExceededWarnLimit,
1,478✔
571
                domainName,
1,478✔
572
                wh.GetLogger(),
1,478✔
573
                tag.IDTypeIdentity) {
1,478✔
574
                return nil, validate.ErrIdentityTooLong
×
575
        }
×
576

577
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
1,478✔
578
                return nil, err
×
579
        }
×
580

581
        domainEntry, err := wh.GetDomainCache().GetDomain(domainName)
1,478✔
582
        if err != nil {
1,739✔
583
                return nil, err
261✔
584
        }
261✔
585
        domainID := domainEntry.GetInfo().ID
1,217✔
586

1,217✔
587
        wh.GetLogger().Debug("Poll for decision.", tag.WorkflowDomainName(domainName), tag.WorkflowDomainID(domainID))
1,217✔
588
        if err := wh.checkBadBinary(domainEntry, pollRequest.GetBinaryChecksum()); err != nil {
1,217✔
589
                return nil, err
×
590
        }
×
591

592
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
1,217✔
593
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
1,218✔
594
                return &types.PollForDecisionTaskResponse{}, nil
1✔
595
        }
1✔
596
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
597
        // in this case, return an empty response
598
        if err := common.ValidateLongPollContextTimeout(
1,216✔
599
                ctx,
1,216✔
600
                "PollForDecisionTask",
1,216✔
601
                wh.GetThrottledLogger(),
1,216✔
602
        ); err != nil {
1,216✔
603
                return &types.PollForDecisionTaskResponse{}, nil
×
604
        }
×
605

606
        pollerID := uuid.New().String()
1,216✔
607
        var matchingResp *types.MatchingPollForDecisionTaskResponse
1,216✔
608
        op := func() error {
2,432✔
609
                matchingResp, err = wh.GetMatchingClient().PollForDecisionTask(ctx, &types.MatchingPollForDecisionTaskRequest{
1,216✔
610
                        DomainUUID:     domainID,
1,216✔
611
                        PollerID:       pollerID,
1,216✔
612
                        PollRequest:    pollRequest,
1,216✔
613
                        IsolationGroup: isolationGroup,
1,216✔
614
                })
1,216✔
615
                return err
1,216✔
616
        }
1,216✔
617

618
        err = wh.throttleRetry.Do(ctx, op)
1,216✔
619
        if err != nil {
1,282✔
620
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeDecision, pollRequest.TaskList, pollerID)
66✔
621
                if err != nil {
66✔
622
                        // For all other errors log an error and return it back to client.
×
623
                        ctxTimeout := "not-set"
×
624
                        ctxDeadline, ok := ctx.Deadline()
×
625
                        if ok {
×
626
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
627
                        }
×
628
                        wh.GetLogger().Error("PollForDecisionTask failed.",
×
629
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
630
                                tag.Value(ctxTimeout),
×
631
                                tag.Error(err))
×
632
                        return nil, err
×
633
                }
634

635
                // Must be cancellation error.  Does'nt matter what we return here.  Client already went away.
636
                return nil, nil
66✔
637
        }
638

639
        tags = append(tags, []tag.Tag{tag.WorkflowID(
1,150✔
640
                matchingResp.GetWorkflowExecution().GetWorkflowID()),
1,150✔
641
                tag.WorkflowRunID(matchingResp.GetWorkflowExecution().GetRunID())}...)
1,150✔
642
        resp, err = wh.createPollForDecisionTaskResponse(ctx, scope, domainID, matchingResp, matchingResp.GetBranchToken())
1,150✔
643
        if err != nil {
1,150✔
644
                return nil, err
×
645
        }
×
646
        return resp, nil
1,150✔
647
}
648

649
func (wh *WorkflowHandler) getIsolationGroup(ctx context.Context, domainName string) string {
2,865✔
650
        return partition.IsolationGroupFromContext(ctx)
2,865✔
651
}
2,865✔
652

653
func (wh *WorkflowHandler) getPartitionConfig(ctx context.Context, domainName string) map[string]string {
478✔
654
        return partition.ConfigFromContext(ctx)
478✔
655
}
478✔
656

657
func (wh *WorkflowHandler) isIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,209✔
658
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,211✔
659
                isDrained, err := wh.GetIsolationGroupState().IsDrained(ctx, domainName, isolationGroup)
2✔
660
                if err != nil {
2✔
661
                        wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
662
                        return true
×
663
                }
×
664
                return !isDrained
2✔
665
        }
666
        return true
1,207✔
667
}
668

669
func (wh *WorkflowHandler) waitUntilIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,656✔
670
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,658✔
671
                ticker := time.NewTicker(time.Second * 30)
2✔
672
                defer ticker.Stop()
2✔
673
                childCtx, cancel := common.CreateChildContext(ctx, 0.05)
2✔
674
                defer cancel()
2✔
675
                for {
4✔
676
                        isDrained, err := wh.GetIsolationGroupState().IsDrained(childCtx, domainName, isolationGroup)
2✔
677
                        if err != nil {
2✔
678
                                wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
679
                                return true
×
680
                        }
×
681
                        if !isDrained {
2✔
682
                                break
×
683
                        }
684
                        select {
2✔
685
                        case <-childCtx.Done():
2✔
686
                                return false
2✔
687
                        case <-ticker.C:
×
688
                        }
689
                }
690
        }
691
        return true
1,654✔
692
}
693

694
func (wh *WorkflowHandler) checkBadBinary(domainEntry *cache.DomainCacheEntry, binaryChecksum string) error {
1,217✔
695
        if domainEntry.GetConfig().BadBinaries.Binaries != nil {
2,433✔
696
                badBinaries := domainEntry.GetConfig().BadBinaries.Binaries
1,216✔
697
                _, ok := badBinaries[binaryChecksum]
1,216✔
698
                if ok {
1,216✔
699
                        wh.GetMetricsClient().IncCounter(metrics.FrontendPollForDecisionTaskScope, metrics.CadenceErrBadBinaryCounter)
×
700
                        return &types.BadRequestError{
×
701
                                Message: fmt.Sprintf("binary %v already marked as bad deployment", binaryChecksum),
×
702
                        }
×
703
                }
×
704
        }
705
        return nil
1,217✔
706
}
707

708
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error, domainID string, taskListType int32,
709
        taskList *types.TaskList, pollerID string) error {
132✔
710
        // First check if this err is due to context cancellation.  This means client connection to frontend is closed.
132✔
711
        if ctx.Err() == context.Canceled {
264✔
712
                // Our rpc stack does not propagates context cancellation to the other service.  Lets make an explicit
132✔
713
                // call to matching to notify this poller is gone to prevent any tasks being dispatched to zombie pollers.
132✔
714
                err = wh.GetMatchingClient().CancelOutstandingPoll(context.Background(), &types.CancelOutstandingPollRequest{
132✔
715
                        DomainUUID:   domainID,
132✔
716
                        TaskListType: common.Int32Ptr(taskListType),
132✔
717
                        TaskList:     taskList,
132✔
718
                        PollerID:     pollerID,
132✔
719
                })
132✔
720
                // We can not do much if this call fails.  Just log the error and move on
132✔
721
                if err != nil {
132✔
722
                        wh.GetLogger().Warn("Failed to cancel outstanding poller.",
×
723
                                tag.WorkflowTaskListName(taskList.GetName()), tag.Error(err))
×
724
                }
×
725

726
                // Clear error as we don't want to report context cancellation error to count against our SLA
727
                return nil
132✔
728
        }
729

730
        return err
×
731
}
732

733
// RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
734
func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
735
        ctx context.Context,
736
        heartbeatRequest *types.RecordActivityTaskHeartbeatRequest,
737
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
384✔
738
        if wh.isShuttingDown() {
384✔
739
                return nil, validate.ErrShuttingDown
×
740
        }
×
741

742
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
384✔
743
                return nil, err
×
744
        }
×
745

746
        if heartbeatRequest == nil {
385✔
747
                return nil, validate.ErrRequestNotSet
1✔
748
        }
1✔
749

750
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeat")
383✔
751
        if heartbeatRequest.TaskToken == nil {
384✔
752
                return nil, validate.ErrTaskTokenNotSet
1✔
753
        }
1✔
754
        taskToken, err := wh.tokenSerializer.Deserialize(heartbeatRequest.TaskToken)
382✔
755
        if err != nil {
382✔
756
                return nil, err
×
757
        }
×
758
        if taskToken.DomainID == "" {
382✔
759
                return nil, validate.ErrDomainNotSet
×
760
        }
×
761

762
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
382✔
763
        if err != nil {
382✔
764
                return nil, err
×
765
        }
×
766

767
        dw := domainWrapper{
382✔
768
                domain: domainName,
382✔
769
        }
382✔
770
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
382✔
771
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
382✔
772
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
382✔
773

382✔
774
        if err := common.CheckEventBlobSizeLimit(
382✔
775
                len(heartbeatRequest.Details),
382✔
776
                sizeLimitWarn,
382✔
777
                sizeLimitError,
382✔
778
                taskToken.DomainID,
382✔
779
                taskToken.WorkflowID,
382✔
780
                taskToken.RunID,
382✔
781
                scope,
382✔
782
                wh.GetThrottledLogger(),
382✔
783
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeat"),
382✔
784
        ); err != nil {
382✔
785
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
786
                failRequest := &types.RespondActivityTaskFailedRequest{
×
787
                        TaskToken: heartbeatRequest.TaskToken,
×
788
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
789
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
790
                        Identity:  heartbeatRequest.Identity,
×
791
                }
×
792
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
793
                        DomainUUID:    taskToken.DomainID,
×
794
                        FailedRequest: failRequest,
×
795
                })
×
796
                if err != nil {
×
797
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
798
                }
×
799
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
800
        } else {
382✔
801
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
382✔
802
                        DomainUUID:       taskToken.DomainID,
382✔
803
                        HeartbeatRequest: heartbeatRequest,
382✔
804
                })
382✔
805
                if err != nil {
382✔
806
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
807
                }
×
808
        }
809

810
        return resp, nil
382✔
811
}
812

813
// RecordActivityTaskHeartbeatByID - Record Activity Task Heart beat.
814
func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
815
        ctx context.Context,
816
        heartbeatRequest *types.RecordActivityTaskHeartbeatByIDRequest,
817
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
3✔
818
        if wh.isShuttingDown() {
3✔
819
                return nil, validate.ErrShuttingDown
×
820
        }
×
821

822
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
823
                return nil, err
×
824
        }
×
825

826
        if heartbeatRequest == nil {
4✔
827
                return nil, validate.ErrRequestNotSet
1✔
828
        }
1✔
829

830
        domainName := heartbeatRequest.GetDomain()
2✔
831
        if domainName == "" {
3✔
832
                return nil, validate.ErrDomainNotSet
1✔
833
        }
1✔
834

835
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID")
1✔
836
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
1✔
837
        if err != nil {
1✔
838
                return nil, err
×
839
        }
×
840
        workflowID := heartbeatRequest.GetWorkflowID()
1✔
841
        runID := heartbeatRequest.GetRunID() // runID is optional so can be empty
1✔
842
        activityID := heartbeatRequest.GetActivityID()
1✔
843

1✔
844
        if domainID == "" {
1✔
845
                return nil, validate.ErrDomainNotSet
×
846
        }
×
847
        if workflowID == "" {
1✔
848
                return nil, validate.ErrWorkflowIDNotSet
×
849
        }
×
850
        if activityID == "" {
1✔
851
                return nil, validate.ErrActivityIDNotSet
×
852
        }
×
853

854
        taskToken := &common.TaskToken{
1✔
855
                DomainID:   domainID,
1✔
856
                RunID:      runID,
1✔
857
                WorkflowID: workflowID,
1✔
858
                ScheduleID: common.EmptyEventID,
1✔
859
                ActivityID: activityID,
1✔
860
        }
1✔
861
        token, err := wh.tokenSerializer.Serialize(taskToken)
1✔
862
        if err != nil {
1✔
863
                return nil, err
×
864
        }
×
865

866
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatByIDScope, heartbeatRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1✔
867
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
1✔
868
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
1✔
869

1✔
870
        if err := common.CheckEventBlobSizeLimit(
1✔
871
                len(heartbeatRequest.Details),
1✔
872
                sizeLimitWarn,
1✔
873
                sizeLimitError,
1✔
874
                taskToken.DomainID,
1✔
875
                taskToken.WorkflowID,
1✔
876
                taskToken.RunID,
1✔
877
                scope,
1✔
878
                wh.GetThrottledLogger(),
1✔
879
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeatByID"),
1✔
880
        ); err != nil {
1✔
881
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
882
                failRequest := &types.RespondActivityTaskFailedRequest{
×
883
                        TaskToken: token,
×
884
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
885
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
886
                        Identity:  heartbeatRequest.Identity,
×
887
                }
×
888
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
889
                        DomainUUID:    taskToken.DomainID,
×
890
                        FailedRequest: failRequest,
×
891
                })
×
892
                if err != nil {
×
893
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
894
                }
×
895
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
896
        } else {
1✔
897
                req := &types.RecordActivityTaskHeartbeatRequest{
1✔
898
                        TaskToken: token,
1✔
899
                        Details:   heartbeatRequest.Details,
1✔
900
                        Identity:  heartbeatRequest.Identity,
1✔
901
                }
1✔
902

1✔
903
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
1✔
904
                        DomainUUID:       taskToken.DomainID,
1✔
905
                        HeartbeatRequest: req,
1✔
906
                })
1✔
907
                if err != nil {
1✔
908
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
909
                }
×
910
        }
911

912
        return resp, nil
1✔
913
}
914

915
// RespondActivityTaskCompleted - response to an activity task
916
func (wh *WorkflowHandler) RespondActivityTaskCompleted(
917
        ctx context.Context,
918
        completeRequest *types.RespondActivityTaskCompletedRequest,
919
) (retError error) {
247✔
920
        if wh.isShuttingDown() {
247✔
921
                return validate.ErrShuttingDown
×
922
        }
×
923

924
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
247✔
925
                return err
×
926
        }
×
927

928
        if completeRequest == nil {
247✔
929
                return validate.ErrRequestNotSet
×
930
        }
×
931

932
        if completeRequest.TaskToken == nil {
247✔
933
                return validate.ErrTaskTokenNotSet
×
934
        }
×
935
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
247✔
936
        if err != nil {
247✔
937
                return err
×
938
        }
×
939
        if taskToken.DomainID == "" {
247✔
940
                return validate.ErrDomainNotSet
×
941
        }
×
942

943
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
247✔
944
        if err != nil {
247✔
945
                return err
×
946
        }
×
947

948
        dw := domainWrapper{
247✔
949
                domain: domainName,
247✔
950
        }
247✔
951
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
247✔
952
        if !common.IsValidIDLength(
247✔
953
                completeRequest.GetIdentity(),
247✔
954
                scope,
247✔
955
                wh.config.MaxIDLengthWarnLimit(),
247✔
956
                wh.config.IdentityMaxLength(domainName),
247✔
957
                metrics.CadenceErrIdentityExceededWarnLimit,
247✔
958
                domainName,
247✔
959
                wh.GetLogger(),
247✔
960
                tag.IDTypeIdentity) {
247✔
961
                return validate.ErrIdentityTooLong
×
962
        }
×
963

964
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
247✔
965
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
247✔
966

247✔
967
        if err := common.CheckEventBlobSizeLimit(
247✔
968
                len(completeRequest.Result),
247✔
969
                sizeLimitWarn,
247✔
970
                sizeLimitError,
247✔
971
                taskToken.DomainID,
247✔
972
                taskToken.WorkflowID,
247✔
973
                taskToken.RunID,
247✔
974
                scope,
247✔
975
                wh.GetThrottledLogger(),
247✔
976
                tag.BlobSizeViolationOperation("RespondActivityTaskCompleted"),
247✔
977
        ); err != nil {
247✔
978
                // result exceeds blob size limit, we would record it as failure
×
979
                failRequest := &types.RespondActivityTaskFailedRequest{
×
980
                        TaskToken: completeRequest.TaskToken,
×
981
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
982
                        Details:   completeRequest.Result[0:sizeLimitError],
×
983
                        Identity:  completeRequest.Identity,
×
984
                }
×
985
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
986
                        DomainUUID:    taskToken.DomainID,
×
987
                        FailedRequest: failRequest,
×
988
                })
×
989
                if err != nil {
×
990
                        return wh.normalizeVersionedErrors(ctx, err)
×
991
                }
×
992
        } else {
247✔
993
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
247✔
994
                        DomainUUID:      taskToken.DomainID,
247✔
995
                        CompleteRequest: completeRequest,
247✔
996
                })
247✔
997
                if err != nil {
292✔
998
                        return wh.normalizeVersionedErrors(ctx, err)
45✔
999
                }
45✔
1000
        }
1001

1002
        return nil
202✔
1003
}
1004

1005
// RespondActivityTaskCompletedByID - response to an activity task
1006
func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
1007
        ctx context.Context,
1008
        completeRequest *types.RespondActivityTaskCompletedByIDRequest,
1009
) (retError error) {
76✔
1010
        if wh.isShuttingDown() {
76✔
1011
                return validate.ErrShuttingDown
×
1012
        }
×
1013

1014
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
76✔
1015
                return err
×
1016
        }
×
1017

1018
        if completeRequest == nil {
76✔
1019
                return validate.ErrRequestNotSet
×
1020
        }
×
1021

1022
        domainName := completeRequest.GetDomain()
76✔
1023
        if domainName == "" {
76✔
1024
                return validate.ErrDomainNotSet
×
1025
        }
×
1026
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
76✔
1027
        if err != nil {
76✔
1028
                return err
×
1029
        }
×
1030
        workflowID := completeRequest.GetWorkflowID()
76✔
1031
        runID := completeRequest.GetRunID() // runID is optional so can be empty
76✔
1032
        activityID := completeRequest.GetActivityID()
76✔
1033

76✔
1034
        if domainID == "" {
76✔
1035
                return validate.ErrDomainNotSet
×
1036
        }
×
1037
        if workflowID == "" {
76✔
1038
                return validate.ErrWorkflowIDNotSet
×
1039
        }
×
1040
        if activityID == "" {
76✔
1041
                return validate.ErrActivityIDNotSet
×
1042
        }
×
1043

1044
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedByIDScope, completeRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
76✔
1045
        if !common.IsValidIDLength(
76✔
1046
                completeRequest.GetIdentity(),
76✔
1047
                scope,
76✔
1048
                wh.config.MaxIDLengthWarnLimit(),
76✔
1049
                wh.config.IdentityMaxLength(domainName),
76✔
1050
                metrics.CadenceErrIdentityExceededWarnLimit,
76✔
1051
                domainName,
76✔
1052
                wh.GetLogger(),
76✔
1053
                tag.IDTypeIdentity) {
76✔
1054
                return validate.ErrIdentityTooLong
×
1055
        }
×
1056

1057
        taskToken := &common.TaskToken{
76✔
1058
                DomainID:   domainID,
76✔
1059
                RunID:      runID,
76✔
1060
                WorkflowID: workflowID,
76✔
1061
                ScheduleID: common.EmptyEventID,
76✔
1062
                ActivityID: activityID,
76✔
1063
        }
76✔
1064
        token, err := wh.tokenSerializer.Serialize(taskToken)
76✔
1065
        if err != nil {
76✔
1066
                return err
×
1067
        }
×
1068

1069
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
76✔
1070
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
76✔
1071

76✔
1072
        if err := common.CheckEventBlobSizeLimit(
76✔
1073
                len(completeRequest.Result),
76✔
1074
                sizeLimitWarn,
76✔
1075
                sizeLimitError,
76✔
1076
                taskToken.DomainID,
76✔
1077
                taskToken.WorkflowID,
76✔
1078
                taskToken.RunID,
76✔
1079
                scope,
76✔
1080
                wh.GetThrottledLogger(),
76✔
1081
                tag.BlobSizeViolationOperation("RespondActivityTaskCompletedByID"),
76✔
1082
        ); err != nil {
76✔
1083
                // result exceeds blob size limit, we would record it as failure
×
1084
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1085
                        TaskToken: token,
×
1086
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
1087
                        Details:   completeRequest.Result[0:sizeLimitError],
×
1088
                        Identity:  completeRequest.Identity,
×
1089
                }
×
1090
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1091
                        DomainUUID:    taskToken.DomainID,
×
1092
                        FailedRequest: failRequest,
×
1093
                })
×
1094
                if err != nil {
×
1095
                        return wh.normalizeVersionedErrors(ctx, err)
×
1096
                }
×
1097
        } else {
76✔
1098
                req := &types.RespondActivityTaskCompletedRequest{
76✔
1099
                        TaskToken: token,
76✔
1100
                        Result:    completeRequest.Result,
76✔
1101
                        Identity:  completeRequest.Identity,
76✔
1102
                }
76✔
1103

76✔
1104
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
76✔
1105
                        DomainUUID:      taskToken.DomainID,
76✔
1106
                        CompleteRequest: req,
76✔
1107
                })
76✔
1108
                if err != nil {
76✔
1109
                        return wh.normalizeVersionedErrors(ctx, err)
×
1110
                }
×
1111
        }
1112

1113
        return nil
76✔
1114
}
1115

1116
// RespondActivityTaskFailed - response to an activity task failure
1117
func (wh *WorkflowHandler) RespondActivityTaskFailed(
1118
        ctx context.Context,
1119
        failedRequest *types.RespondActivityTaskFailedRequest,
1120
) (retError error) {
12✔
1121
        if wh.isShuttingDown() {
12✔
1122
                return validate.ErrShuttingDown
×
1123
        }
×
1124

1125
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
12✔
1126
                return err
×
1127
        }
×
1128

1129
        if failedRequest == nil {
12✔
1130
                return validate.ErrRequestNotSet
×
1131
        }
×
1132

1133
        if failedRequest.TaskToken == nil {
12✔
1134
                return validate.ErrTaskTokenNotSet
×
1135
        }
×
1136
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
12✔
1137
        if err != nil {
12✔
1138
                return err
×
1139
        }
×
1140
        if taskToken.DomainID == "" {
12✔
1141
                return validate.ErrDomainNotSet
×
1142
        }
×
1143

1144
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
12✔
1145
        if err != nil {
12✔
1146
                return err
×
1147
        }
×
1148

1149
        dw := domainWrapper{
12✔
1150
                domain: domainName,
12✔
1151
        }
12✔
1152
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
12✔
1153
        if !common.IsValidIDLength(
12✔
1154
                failedRequest.GetIdentity(),
12✔
1155
                scope,
12✔
1156
                wh.config.MaxIDLengthWarnLimit(),
12✔
1157
                wh.config.IdentityMaxLength(domainName),
12✔
1158
                metrics.CadenceErrIdentityExceededWarnLimit,
12✔
1159
                domainName,
12✔
1160
                wh.GetLogger(),
12✔
1161
                tag.IDTypeIdentity) {
12✔
1162
                return validate.ErrIdentityTooLong
×
1163
        }
×
1164

1165
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
12✔
1166
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
12✔
1167

12✔
1168
        if err := common.CheckEventBlobSizeLimit(
12✔
1169
                len(failedRequest.Details),
12✔
1170
                sizeLimitWarn,
12✔
1171
                sizeLimitError,
12✔
1172
                taskToken.DomainID,
12✔
1173
                taskToken.WorkflowID,
12✔
1174
                taskToken.RunID,
12✔
1175
                scope,
12✔
1176
                wh.GetThrottledLogger(),
12✔
1177
                tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
12✔
1178
        ); err != nil {
12✔
1179
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1180
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1181
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1182
        }
×
1183

1184
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
12✔
1185
                DomainUUID:    taskToken.DomainID,
12✔
1186
                FailedRequest: failedRequest,
12✔
1187
        })
12✔
1188
        if err != nil {
12✔
1189
                return wh.normalizeVersionedErrors(ctx, err)
×
1190
        }
×
1191
        return nil
12✔
1192
}
1193

1194
// RespondActivityTaskFailedByID - response to an activity task failure
1195
func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
1196
        ctx context.Context,
1197
        failedRequest *types.RespondActivityTaskFailedByIDRequest,
1198
) (retError error) {
×
1199
        if wh.isShuttingDown() {
×
1200
                return validate.ErrShuttingDown
×
1201
        }
×
1202

1203
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1204
                return err
×
1205
        }
×
1206

1207
        if failedRequest == nil {
×
1208
                return validate.ErrRequestNotSet
×
1209
        }
×
1210

1211
        domainName := failedRequest.GetDomain()
×
1212

×
1213
        if domainName == "" {
×
1214
                return validate.ErrDomainNotSet
×
1215
        }
×
1216
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1217
        if err != nil {
×
1218
                return err
×
1219
        }
×
1220
        workflowID := failedRequest.GetWorkflowID()
×
1221
        runID := failedRequest.GetRunID() // runID is optional so can be empty
×
1222
        activityID := failedRequest.GetActivityID()
×
1223

×
1224
        if domainID == "" {
×
1225
                return validate.ErrDomainNotSet
×
1226
        }
×
1227
        if workflowID == "" {
×
1228
                return validate.ErrWorkflowIDNotSet
×
1229
        }
×
1230
        if activityID == "" {
×
1231
                return validate.ErrActivityIDNotSet
×
1232
        }
×
1233

1234
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedByIDScope, failedRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1235
        if !common.IsValidIDLength(
×
1236
                failedRequest.GetIdentity(),
×
1237
                scope,
×
1238
                wh.config.MaxIDLengthWarnLimit(),
×
1239
                wh.config.IdentityMaxLength(failedRequest.GetDomain()),
×
1240
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1241
                domainName,
×
1242
                wh.GetLogger(),
×
1243
                tag.IDTypeIdentity) {
×
1244
                return validate.ErrIdentityTooLong
×
1245
        }
×
1246

1247
        taskToken := &common.TaskToken{
×
1248
                DomainID:   domainID,
×
1249
                RunID:      runID,
×
1250
                WorkflowID: workflowID,
×
1251
                ScheduleID: common.EmptyEventID,
×
1252
                ActivityID: activityID,
×
1253
        }
×
1254
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1255
        if err != nil {
×
1256
                return err
×
1257
        }
×
1258

1259
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1260
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1261

×
1262
        if err := common.CheckEventBlobSizeLimit(
×
1263
                len(failedRequest.Details),
×
1264
                sizeLimitWarn,
×
1265
                sizeLimitError,
×
1266
                taskToken.DomainID,
×
1267
                taskToken.WorkflowID,
×
1268
                taskToken.RunID,
×
1269
                scope,
×
1270
                wh.GetThrottledLogger(),
×
1271
                tag.BlobSizeViolationOperation("RespondActivityTaskFailedByID"),
×
1272
        ); err != nil {
×
1273
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1274
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1275
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1276
        }
×
1277

1278
        req := &types.RespondActivityTaskFailedRequest{
×
1279
                TaskToken: token,
×
1280
                Reason:    failedRequest.Reason,
×
1281
                Details:   failedRequest.Details,
×
1282
                Identity:  failedRequest.Identity,
×
1283
        }
×
1284

×
1285
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1286
                DomainUUID:    taskToken.DomainID,
×
1287
                FailedRequest: req,
×
1288
        })
×
1289
        if err != nil {
×
1290
                return wh.normalizeVersionedErrors(ctx, err)
×
1291
        }
×
1292
        return nil
×
1293
}
1294

1295
// RespondActivityTaskCanceled - called to cancel an activity task
1296
func (wh *WorkflowHandler) RespondActivityTaskCanceled(
1297
        ctx context.Context,
1298
        cancelRequest *types.RespondActivityTaskCanceledRequest,
1299
) (retError error) {
×
1300
        if wh.isShuttingDown() {
×
1301
                return validate.ErrShuttingDown
×
1302
        }
×
1303

1304
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1305
                return err
×
1306
        }
×
1307

1308
        if cancelRequest == nil {
×
1309
                return validate.ErrRequestNotSet
×
1310
        }
×
1311

1312
        if cancelRequest.TaskToken == nil {
×
1313
                return validate.ErrTaskTokenNotSet
×
1314
        }
×
1315

1316
        taskToken, err := wh.tokenSerializer.Deserialize(cancelRequest.TaskToken)
×
1317
        if err != nil {
×
1318
                return err
×
1319
        }
×
1320

1321
        if taskToken.DomainID == "" {
×
1322
                return validate.ErrDomainNotSet
×
1323
        }
×
1324

1325
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
×
1326
        if err != nil {
×
1327
                return err
×
1328
        }
×
1329

1330
        dw := domainWrapper{
×
1331
                domain: domainName,
×
1332
        }
×
1333
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1334
        if !common.IsValidIDLength(
×
1335
                cancelRequest.GetIdentity(),
×
1336
                scope,
×
1337
                wh.config.MaxIDLengthWarnLimit(),
×
1338
                wh.config.IdentityMaxLength(domainName),
×
1339
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1340
                domainName,
×
1341
                wh.GetLogger(),
×
1342
                tag.IDTypeIdentity) {
×
1343
                return validate.ErrIdentityTooLong
×
1344
        }
×
1345

1346
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1347
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1348

×
1349
        if err := common.CheckEventBlobSizeLimit(
×
1350
                len(cancelRequest.Details),
×
1351
                sizeLimitWarn,
×
1352
                sizeLimitError,
×
1353
                taskToken.DomainID,
×
1354
                taskToken.WorkflowID,
×
1355
                taskToken.RunID,
×
1356
                scope,
×
1357
                wh.GetThrottledLogger(),
×
1358
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceled"),
×
1359
        ); err != nil {
×
1360
                // details exceeds blob size limit, we would record it as failure
×
1361
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1362
                        TaskToken: cancelRequest.TaskToken,
×
1363
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1364
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1365
                        Identity:  cancelRequest.Identity,
×
1366
                }
×
1367
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1368
                        DomainUUID:    taskToken.DomainID,
×
1369
                        FailedRequest: failRequest,
×
1370
                })
×
1371
                if err != nil {
×
1372
                        return wh.normalizeVersionedErrors(ctx, err)
×
1373
                }
×
1374
        } else {
×
1375
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1376
                        DomainUUID:    taskToken.DomainID,
×
1377
                        CancelRequest: cancelRequest,
×
1378
                })
×
1379
                if err != nil {
×
1380
                        return wh.normalizeVersionedErrors(ctx, err)
×
1381
                }
×
1382
        }
1383

1384
        return nil
×
1385
}
1386

1387
// RespondActivityTaskCanceledByID - called to cancel an activity task
1388
func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
1389
        ctx context.Context,
1390
        cancelRequest *types.RespondActivityTaskCanceledByIDRequest,
1391
) (retError error) {
×
1392
        if wh.isShuttingDown() {
×
1393
                return validate.ErrShuttingDown
×
1394
        }
×
1395

1396
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1397
                return err
×
1398
        }
×
1399

1400
        if cancelRequest == nil {
×
1401
                return validate.ErrRequestNotSet
×
1402
        }
×
1403

1404
        domainName := cancelRequest.GetDomain()
×
1405
        if domainName == "" {
×
1406
                return validate.ErrDomainNotSet
×
1407
        }
×
1408
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1409
        if err != nil {
×
1410
                return err
×
1411
        }
×
1412
        workflowID := cancelRequest.GetWorkflowID()
×
1413
        runID := cancelRequest.GetRunID() // runID is optional so can be empty
×
1414
        activityID := cancelRequest.GetActivityID()
×
1415

×
1416
        if domainID == "" {
×
1417
                return validate.ErrDomainNotSet
×
1418
        }
×
1419
        if workflowID == "" {
×
1420
                return validate.ErrWorkflowIDNotSet
×
1421
        }
×
1422
        if activityID == "" {
×
1423
                return validate.ErrActivityIDNotSet
×
1424
        }
×
1425

1426
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledByIDScope, cancelRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1427
        if !common.IsValidIDLength(
×
1428
                cancelRequest.GetIdentity(),
×
1429
                scope,
×
1430
                wh.config.MaxIDLengthWarnLimit(),
×
1431
                wh.config.IdentityMaxLength(cancelRequest.GetDomain()),
×
1432
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1433
                domainName,
×
1434
                wh.GetLogger(),
×
1435
                tag.IDTypeIdentity) {
×
1436
                return validate.ErrIdentityTooLong
×
1437
        }
×
1438

1439
        taskToken := &common.TaskToken{
×
1440
                DomainID:   domainID,
×
1441
                RunID:      runID,
×
1442
                WorkflowID: workflowID,
×
1443
                ScheduleID: common.EmptyEventID,
×
1444
                ActivityID: activityID,
×
1445
        }
×
1446
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1447
        if err != nil {
×
1448
                return err
×
1449
        }
×
1450

1451
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1452
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1453

×
1454
        if err := common.CheckEventBlobSizeLimit(
×
1455
                len(cancelRequest.Details),
×
1456
                sizeLimitWarn,
×
1457
                sizeLimitError,
×
1458
                taskToken.DomainID,
×
1459
                taskToken.WorkflowID,
×
1460
                taskToken.RunID,
×
1461
                scope,
×
1462
                wh.GetThrottledLogger(),
×
1463
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceledByID"),
×
1464
        ); err != nil {
×
1465
                // details exceeds blob size limit, we would record it as failure
×
1466
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1467
                        TaskToken: token,
×
1468
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1469
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1470
                        Identity:  cancelRequest.Identity,
×
1471
                }
×
1472
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1473
                        DomainUUID:    taskToken.DomainID,
×
1474
                        FailedRequest: failRequest,
×
1475
                })
×
1476
                if err != nil {
×
1477
                        return wh.normalizeVersionedErrors(ctx, err)
×
1478
                }
×
1479
        } else {
×
1480
                req := &types.RespondActivityTaskCanceledRequest{
×
1481
                        TaskToken: token,
×
1482
                        Details:   cancelRequest.Details,
×
1483
                        Identity:  cancelRequest.Identity,
×
1484
                }
×
1485

×
1486
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1487
                        DomainUUID:    taskToken.DomainID,
×
1488
                        CancelRequest: req,
×
1489
                })
×
1490
                if err != nil {
×
1491
                        return wh.normalizeVersionedErrors(ctx, err)
×
1492
                }
×
1493
        }
1494

1495
        return nil
×
1496
}
1497

1498
// RespondDecisionTaskCompleted - response to a decision task
1499
func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
1500
        ctx context.Context,
1501
        completeRequest *types.RespondDecisionTaskCompletedRequest,
1502
) (resp *types.RespondDecisionTaskCompletedResponse, retError error) {
927✔
1503
        if wh.isShuttingDown() {
927✔
1504
                return nil, validate.ErrShuttingDown
×
1505
        }
×
1506

1507
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
927✔
1508
                return nil, err
×
1509
        }
×
1510

1511
        if completeRequest == nil {
927✔
1512
                return nil, validate.ErrRequestNotSet
×
1513
        }
×
1514

1515
        if completeRequest.TaskToken == nil {
927✔
1516
                return nil, validate.ErrTaskTokenNotSet
×
1517
        }
×
1518
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
927✔
1519
        if err != nil {
927✔
1520
                return nil, err
×
1521
        }
×
1522
        if taskToken.DomainID == "" {
927✔
1523
                return nil, validate.ErrDomainNotSet
×
1524
        }
×
1525

1526
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
927✔
1527
        if err != nil {
927✔
1528
                return nil, err
×
1529
        }
×
1530

1531
        dw := domainWrapper{
927✔
1532
                domain: domainName,
927✔
1533
        }
927✔
1534
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
927✔
1535
        if !common.IsValidIDLength(
927✔
1536
                completeRequest.GetIdentity(),
927✔
1537
                scope,
927✔
1538
                wh.config.MaxIDLengthWarnLimit(),
927✔
1539
                wh.config.IdentityMaxLength(domainName),
927✔
1540
                metrics.CadenceErrIdentityExceededWarnLimit,
927✔
1541
                domainName,
927✔
1542
                wh.GetLogger(),
927✔
1543
                tag.IDTypeIdentity) {
927✔
1544
                return nil, validate.ErrIdentityTooLong
×
1545
        }
×
1546

1547
        if err := common.CheckDecisionResultLimit(
927✔
1548
                len(completeRequest.Decisions),
927✔
1549
                wh.config.DecisionResultCountLimit(domainName),
927✔
1550
                scope); err != nil {
927✔
1551
                return nil, err
×
1552
        }
×
1553

1554
        histResp, err := wh.GetHistoryClient().RespondDecisionTaskCompleted(ctx, &types.HistoryRespondDecisionTaskCompletedRequest{
927✔
1555
                DomainUUID:      taskToken.DomainID,
927✔
1556
                CompleteRequest: completeRequest},
927✔
1557
        )
927✔
1558
        if err != nil {
936✔
1559
                return nil, wh.normalizeVersionedErrors(ctx, err)
9✔
1560
        }
9✔
1561

1562
        completedResp := &types.RespondDecisionTaskCompletedResponse{}
918✔
1563
        completedResp.ActivitiesToDispatchLocally = histResp.ActivitiesToDispatchLocally
918✔
1564
        if completeRequest.GetReturnNewDecisionTask() && histResp != nil && histResp.StartedResponse != nil {
978✔
1565
                taskToken := &common.TaskToken{
60✔
1566
                        DomainID:        taskToken.DomainID,
60✔
1567
                        WorkflowID:      taskToken.WorkflowID,
60✔
1568
                        RunID:           taskToken.RunID,
60✔
1569
                        ScheduleID:      histResp.StartedResponse.GetScheduledEventID(),
60✔
1570
                        ScheduleAttempt: histResp.StartedResponse.GetAttempt(),
60✔
1571
                }
60✔
1572
                token, _ := wh.tokenSerializer.Serialize(taskToken)
60✔
1573
                workflowExecution := &types.WorkflowExecution{
60✔
1574
                        WorkflowID: taskToken.WorkflowID,
60✔
1575
                        RunID:      taskToken.RunID,
60✔
1576
                }
60✔
1577
                matchingResp := common.CreateMatchingPollForDecisionTaskResponse(histResp.StartedResponse, workflowExecution, token)
60✔
1578

60✔
1579
                newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, scope, taskToken.DomainID, matchingResp, matchingResp.GetBranchToken())
60✔
1580
                if err != nil {
60✔
1581
                        return nil, err
×
1582
                }
×
1583
                completedResp.DecisionTask = newDecisionTask
60✔
1584
        }
1585

1586
        return completedResp, nil
918✔
1587
}
1588

1589
// RespondDecisionTaskFailed - failed response to a decision task
1590
func (wh *WorkflowHandler) RespondDecisionTaskFailed(
1591
        ctx context.Context,
1592
        failedRequest *types.RespondDecisionTaskFailedRequest,
1593
) (retError error) {
159✔
1594
        if wh.isShuttingDown() {
159✔
1595
                return validate.ErrShuttingDown
×
1596
        }
×
1597

1598
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
159✔
1599
                return err
×
1600
        }
×
1601

1602
        if failedRequest == nil {
159✔
1603
                return validate.ErrRequestNotSet
×
1604
        }
×
1605

1606
        if failedRequest.TaskToken == nil {
159✔
1607
                return validate.ErrTaskTokenNotSet
×
1608
        }
×
1609
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
159✔
1610
        if err != nil {
159✔
1611
                return err
×
1612
        }
×
1613
        if taskToken.DomainID == "" {
159✔
1614
                return validate.ErrDomainNotSet
×
1615
        }
×
1616

1617
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
159✔
1618
        if err != nil {
159✔
1619
                return err
×
1620
        }
×
1621

1622
        dw := domainWrapper{
159✔
1623
                domain: domainName,
159✔
1624
        }
159✔
1625
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
159✔
1626
        if !common.IsValidIDLength(
159✔
1627
                failedRequest.GetIdentity(),
159✔
1628
                scope,
159✔
1629
                wh.config.MaxIDLengthWarnLimit(),
159✔
1630
                wh.config.IdentityMaxLength(domainName),
159✔
1631
                metrics.CadenceErrIdentityExceededWarnLimit,
159✔
1632
                domainName,
159✔
1633
                wh.GetLogger(),
159✔
1634
                tag.IDTypeIdentity) {
159✔
1635
                return validate.ErrIdentityTooLong
×
1636
        }
×
1637

1638
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
159✔
1639
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
159✔
1640

159✔
1641
        if err := common.CheckEventBlobSizeLimit(
159✔
1642
                len(failedRequest.Details),
159✔
1643
                sizeLimitWarn,
159✔
1644
                sizeLimitError,
159✔
1645
                taskToken.DomainID,
159✔
1646
                taskToken.WorkflowID,
159✔
1647
                taskToken.RunID,
159✔
1648
                scope,
159✔
1649
                wh.GetThrottledLogger(),
159✔
1650
                tag.BlobSizeViolationOperation("RespondDecisionTaskFailed"),
159✔
1651
        ); err != nil {
159✔
1652
                // details exceed, we would just truncate the size for decision task failed as the details is not used anywhere by client code
×
1653
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1654
        }
×
1655

1656
        err = wh.GetHistoryClient().RespondDecisionTaskFailed(ctx, &types.HistoryRespondDecisionTaskFailedRequest{
159✔
1657
                DomainUUID:    taskToken.DomainID,
159✔
1658
                FailedRequest: failedRequest,
159✔
1659
        })
159✔
1660
        if err != nil {
159✔
1661
                return wh.normalizeVersionedErrors(ctx, err)
×
1662
        }
×
1663
        return nil
159✔
1664
}
1665

1666
// RespondQueryTaskCompleted - response to a query task
1667
func (wh *WorkflowHandler) RespondQueryTaskCompleted(
1668
        ctx context.Context,
1669
        completeRequest *types.RespondQueryTaskCompletedRequest,
1670
) (retError error) {
30✔
1671
        if wh.isShuttingDown() {
30✔
1672
                return validate.ErrShuttingDown
×
1673
        }
×
1674

1675
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
30✔
1676
                return err
×
1677
        }
×
1678

1679
        if completeRequest == nil {
30✔
1680
                return validate.ErrRequestNotSet
×
1681
        }
×
1682

1683
        if completeRequest.TaskToken == nil {
30✔
1684
                return validate.ErrTaskTokenNotSet
×
1685
        }
×
1686
        queryTaskToken, err := wh.tokenSerializer.DeserializeQueryTaskToken(completeRequest.TaskToken)
30✔
1687
        if err != nil {
30✔
1688
                return err
×
1689
        }
×
1690
        if queryTaskToken.DomainID == "" || queryTaskToken.TaskList == "" || queryTaskToken.TaskID == "" {
30✔
1691
                return validate.ErrInvalidTaskToken
×
1692
        }
×
1693

1694
        domainName, err := wh.GetDomainCache().GetDomainName(queryTaskToken.DomainID)
30✔
1695
        if err != nil {
30✔
1696
                return err
×
1697
        }
×
1698

1699
        dw := domainWrapper{
30✔
1700
                domain: domainName,
30✔
1701
        }
30✔
1702
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
30✔
1703
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
30✔
1704

30✔
1705
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondQueryTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
30✔
1706
        if err := common.CheckEventBlobSizeLimit(
30✔
1707
                len(completeRequest.GetQueryResult()),
30✔
1708
                sizeLimitWarn,
30✔
1709
                sizeLimitError,
30✔
1710
                queryTaskToken.DomainID,
30✔
1711
                "",
30✔
1712
                "",
30✔
1713
                scope,
30✔
1714
                wh.GetThrottledLogger(),
30✔
1715
                tag.BlobSizeViolationOperation("RespondQueryTaskCompleted"),
30✔
1716
        ); err != nil {
30✔
1717
                completeRequest = &types.RespondQueryTaskCompletedRequest{
×
1718
                        TaskToken:     completeRequest.TaskToken,
×
1719
                        CompletedType: types.QueryTaskCompletedTypeFailed.Ptr(),
×
1720
                        QueryResult:   nil,
×
1721
                        ErrorMessage:  err.Error(),
×
1722
                }
×
1723
        }
×
1724

1725
        call := yarpc.CallFromContext(ctx)
30✔
1726

30✔
1727
        completeRequest.WorkerVersionInfo = &types.WorkerVersionInfo{
30✔
1728
                Impl:           call.Header(common.ClientImplHeaderName),
30✔
1729
                FeatureVersion: call.Header(common.FeatureVersionHeaderName),
30✔
1730
        }
30✔
1731
        matchingRequest := &types.MatchingRespondQueryTaskCompletedRequest{
30✔
1732
                DomainUUID:       queryTaskToken.DomainID,
30✔
1733
                TaskList:         &types.TaskList{Name: queryTaskToken.TaskList},
30✔
1734
                TaskID:           queryTaskToken.TaskID,
30✔
1735
                CompletedRequest: completeRequest,
30✔
1736
        }
30✔
1737

30✔
1738
        err = wh.GetMatchingClient().RespondQueryTaskCompleted(ctx, matchingRequest)
30✔
1739
        if err != nil {
30✔
1740
                return err
×
1741
        }
×
1742
        return nil
30✔
1743
}
1744

1745
func (wh *WorkflowHandler) StartWorkflowExecutionAsync(
1746
        ctx context.Context,
1747
        startRequest *types.StartWorkflowExecutionAsyncRequest,
1748
) (resp *types.StartWorkflowExecutionAsyncResponse, retError error) {
3✔
1749
        if wh.isShuttingDown() {
3✔
1750
                return nil, validate.ErrShuttingDown
×
1751
        }
×
1752
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionAsyncScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
3✔
1753
        // validate request before pushing to queue
3✔
1754
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest.StartWorkflowExecutionRequest, scope)
3✔
1755
        if err != nil {
3✔
1756
                return nil, err
×
1757
        }
×
1758

1759
        producer, err := wh.producerManager.GetProducerByDomain(startRequest.GetDomain())
3✔
1760
        if err != nil {
4✔
1761
                return nil, err
1✔
1762
        }
1✔
1763
        // serialize the message to be sent to the queue
1764
        payload, err := json.Marshal(startRequest)
2✔
1765
        if err != nil {
2✔
1766
                return nil, err
×
1767
        }
×
1768
        // propagate the headers from the context to the message
1769
        clientHeaders := common.GetClientHeaders(ctx)
2✔
1770
        header := &shared.Header{
2✔
1771
                Fields: map[string][]byte{},
2✔
1772
        }
2✔
1773
        for k, v := range clientHeaders {
12✔
1774
                header.Fields[k] = []byte(v)
10✔
1775
        }
10✔
1776
        messageType := sqlblobs.AsyncRequestTypeStartWorkflowExecutionAsyncRequest
2✔
1777
        message := &sqlblobs.AsyncRequestMessage{
2✔
1778
                PartitionKey: common.StringPtr(startRequest.GetWorkflowID()),
2✔
1779
                Type:         &messageType,
2✔
1780
                Header:       header,
2✔
1781
                Encoding:     common.StringPtr(string(common.EncodingTypeJSON)),
2✔
1782
                Payload:      payload,
2✔
1783
        }
2✔
1784
        err = producer.Publish(ctx, message)
2✔
1785
        if err != nil {
3✔
1786
                return nil, err
1✔
1787
        }
1✔
1788
        return &types.StartWorkflowExecutionAsyncResponse{}, nil
1✔
1789
}
1790

1791
// StartWorkflowExecution - Creates a new workflow execution
1792
func (wh *WorkflowHandler) StartWorkflowExecution(
1793
        ctx context.Context,
1794
        startRequest *types.StartWorkflowExecutionRequest,
1795
) (resp *types.StartWorkflowExecutionResponse, retError error) {
455✔
1796
        if wh.isShuttingDown() {
455✔
1797
                return nil, validate.ErrShuttingDown
×
1798
        }
×
1799
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
455✔
1800
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest, scope)
455✔
1801
        if err != nil {
466✔
1802
                return nil, err
11✔
1803
        }
11✔
1804
        domainName := startRequest.GetDomain()
444✔
1805
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
444✔
1806
        if err != nil {
444✔
1807
                return nil, err
×
1808
        }
×
1809
        wh.GetLogger().Debug("Start workflow execution request domainID", tag.WorkflowDomainID(domainID))
444✔
1810
        historyRequest, err := common.CreateHistoryStartWorkflowRequest(
444✔
1811
                domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
444✔
1812
        if err != nil {
444✔
1813
                return nil, err
×
1814
        }
×
1815

1816
        resp, err = wh.GetHistoryClient().StartWorkflowExecution(ctx, historyRequest)
444✔
1817
        if err != nil {
462✔
1818
                return nil, err
18✔
1819
        }
18✔
1820
        return resp, nil
426✔
1821
}
1822

1823
func (wh *WorkflowHandler) validateStartWorkflowExecutionRequest(ctx context.Context, startRequest *types.StartWorkflowExecutionRequest, scope metrics.Scope) error {
458✔
1824
        if startRequest == nil {
459✔
1825
                return validate.ErrRequestNotSet
1✔
1826
        }
1✔
1827
        domainName := startRequest.GetDomain()
457✔
1828
        if domainName == "" {
458✔
1829
                return validate.ErrDomainNotSet
1✔
1830
        }
1✔
1831
        if startRequest.GetWorkflowID() == "" {
457✔
1832
                return validate.ErrWorkflowIDNotSet
1✔
1833
        }
1✔
1834
        if _, err := uuid.Parse(startRequest.RequestID); err != nil {
457✔
1835
                return &types.BadRequestError{Message: fmt.Sprintf("requestId %q is not a valid UUID", startRequest.RequestID)}
2✔
1836
        }
2✔
1837
        if startRequest.WorkflowType == nil || startRequest.WorkflowType.GetName() == "" {
454✔
1838
                return validate.ErrWorkflowTypeNotSet
1✔
1839
        }
1✔
1840
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
452✔
1841
                return err
×
1842
        }
×
1843
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
452✔
1844
        if !common.IsValidIDLength(
452✔
1845
                domainName,
452✔
1846
                scope,
452✔
1847
                idLengthWarnLimit,
452✔
1848
                wh.config.DomainNameMaxLength(domainName),
452✔
1849
                metrics.CadenceErrDomainNameExceededWarnLimit,
452✔
1850
                domainName,
452✔
1851
                wh.GetLogger(),
452✔
1852
                tag.IDTypeDomainName) {
452✔
1853
                return validate.ErrDomainTooLong
×
1854
        }
×
1855
        if !common.IsValidIDLength(
452✔
1856
                startRequest.GetWorkflowID(),
452✔
1857
                scope,
452✔
1858
                idLengthWarnLimit,
452✔
1859
                wh.config.WorkflowIDMaxLength(domainName),
452✔
1860
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
452✔
1861
                domainName,
452✔
1862
                wh.GetLogger(),
452✔
1863
                tag.IDTypeWorkflowID) {
452✔
1864
                return validate.ErrWorkflowIDTooLong
×
1865
        }
×
1866
        if err := common.ValidateRetryPolicy(startRequest.RetryPolicy); err != nil {
452✔
1867
                return err
×
1868
        }
×
1869
        wh.GetLogger().Debug(
452✔
1870
                "Received StartWorkflowExecution. WorkflowID",
452✔
1871
                tag.WorkflowID(startRequest.GetWorkflowID()))
452✔
1872
        if !common.IsValidIDLength(
452✔
1873
                startRequest.WorkflowType.GetName(),
452✔
1874
                scope,
452✔
1875
                idLengthWarnLimit,
452✔
1876
                wh.config.WorkflowTypeMaxLength(domainName),
452✔
1877
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
452✔
1878
                domainName,
452✔
1879
                wh.GetLogger(),
452✔
1880
                tag.IDTypeWorkflowType) {
452✔
1881
                return validate.ErrWorkflowTypeTooLong
×
1882
        }
×
1883
        if err := wh.validateTaskList(startRequest.TaskList, scope, domainName); err != nil {
453✔
1884
                return err
1✔
1885
        }
1✔
1886
        if startRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
452✔
1887
                return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
1✔
1888
        }
1✔
1889
        if startRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
451✔
1890
                return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
1✔
1891
        }
1✔
1892
        if startRequest.GetDelayStartSeconds() < 0 {
450✔
1893
                return validate.ErrInvalidDelayStartSeconds
1✔
1894
        }
1✔
1895
        if startRequest.GetJitterStartSeconds() < 0 {
448✔
1896
                return validate.ErrInvalidJitterStartSeconds
×
1897
        }
×
1898
        jitter := startRequest.GetJitterStartSeconds()
448✔
1899
        cron := startRequest.GetCronSchedule()
448✔
1900
        if cron != "" {
466✔
1901
                if _, err := backoff.ValidateSchedule(startRequest.GetCronSchedule()); err != nil {
18✔
1902
                        return err
×
1903
                }
×
1904
        }
1905
        if jitter > 0 && cron != "" {
448✔
1906
                // Calculate the cron duration and ensure that jitter is not greater than the cron duration,
×
1907
                // because that would be confusing to users.
×
1908

×
1909
                // Request using start/end time zero value, which will get us an exact answer (i.e. its not in the
×
1910
                // middle of a minute)
×
1911
                backoffSeconds, err := backoff.GetBackoffForNextScheduleInSeconds(cron, time.Time{}, time.Time{}, jitter)
×
1912
                if err != nil {
×
1913
                        return err
×
1914
                }
×
1915
                if jitter > backoffSeconds {
×
1916
                        return validate.ErrInvalidJitterStartSeconds2
×
1917
                }
×
1918
        }
1919
        if !common.IsValidIDLength(
448✔
1920
                startRequest.GetRequestID(),
448✔
1921
                scope,
448✔
1922
                idLengthWarnLimit,
448✔
1923
                wh.config.RequestIDMaxLength(domainName),
448✔
1924
                metrics.CadenceErrRequestIDExceededWarnLimit,
448✔
1925
                domainName,
448✔
1926
                wh.GetLogger(),
448✔
1927
                tag.IDTypeRequestID) {
448✔
1928
                return validate.ErrRequestIDTooLong
×
1929
        }
×
1930
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(startRequest.SearchAttributes, domainName); err != nil {
448✔
1931
                return err
×
1932
        }
×
1933
        wh.GetLogger().Debug("Start workflow execution request domain", tag.WorkflowDomainName(domainName))
448✔
1934
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
448✔
1935
        if err != nil {
448✔
1936
                return err
×
1937
        }
×
1938
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
448✔
1939
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
448✔
1940
        actualSize := len(startRequest.Input)
448✔
1941
        if startRequest.Memo != nil {
460✔
1942
                actualSize += common.GetSizeOfMapStringToByteArray(startRequest.Memo.GetFields())
12✔
1943
        }
12✔
1944
        if err := common.CheckEventBlobSizeLimit(
448✔
1945
                actualSize,
448✔
1946
                sizeLimitWarn,
448✔
1947
                sizeLimitError,
448✔
1948
                domainID,
448✔
1949
                startRequest.GetWorkflowID(),
448✔
1950
                "",
448✔
1951
                scope,
448✔
1952
                wh.GetThrottledLogger(),
448✔
1953
                tag.BlobSizeViolationOperation("StartWorkflowExecution"),
448✔
1954
        ); err != nil {
448✔
1955
                return err
×
1956
        }
×
1957
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
448✔
1958
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
449✔
1959
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
1960
        }
1✔
1961
        return nil
447✔
1962
}
1963

1964
// GetWorkflowExecutionHistory - retrieves the history of workflow execution
1965
func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
1966
        ctx context.Context,
1967
        getRequest *types.GetWorkflowExecutionHistoryRequest,
1968
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
457✔
1969
        if wh.isShuttingDown() {
457✔
1970
                return nil, validate.ErrShuttingDown
×
1971
        }
×
1972

1973
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
457✔
1974
                return nil, err
×
1975
        }
×
1976

1977
        if getRequest == nil {
457✔
1978
                return nil, validate.ErrRequestNotSet
×
1979
        }
×
1980

1981
        domainName := getRequest.GetDomain()
457✔
1982
        wfExecution := getRequest.GetExecution()
457✔
1983

457✔
1984
        if domainName == "" {
457✔
1985
                return nil, validate.ErrDomainNotSet
×
1986
        }
×
1987

1988
        if err := validate.CheckExecution(wfExecution); err != nil {
457✔
1989
                return nil, err
×
1990
        }
×
1991

1992
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
457✔
1993
        if err != nil {
457✔
1994
                return nil, err
×
1995
        }
×
1996

1997
        if getRequest.GetMaximumPageSize() <= 0 {
782✔
1998
                getRequest.MaximumPageSize = int32(wh.config.HistoryMaxPageSize(getRequest.GetDomain()))
325✔
1999
        }
325✔
2000
        // force limit page size if exceed
2001
        if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize {
457✔
2002
                wh.GetThrottledLogger().Warn("GetHistory page size is larger than threshold",
×
2003
                        tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2004
                        tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2005
                        tag.WorkflowDomainID(domainID),
×
2006
                        tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))
×
2007
                getRequest.MaximumPageSize = common.GetHistoryMaxPageSize
×
2008
        }
×
2009

2010
        scope := getMetricsScopeWithDomain(metrics.FrontendGetWorkflowExecutionHistoryScope, getRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
457✔
2011
        if !getRequest.GetSkipArchival() {
896✔
2012
                enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled()
439✔
2013
                historyArchived := wh.historyArchived(ctx, getRequest, domainID)
439✔
2014
                if enableArchivalRead && historyArchived {
460✔
2015
                        return wh.getArchivedHistory(ctx, getRequest, domainID)
21✔
2016
                }
21✔
2017
        }
2018

2019
        // this function return the following 6 things,
2020
        // 1. branch token
2021
        // 2. the workflow run ID
2022
        // 3. the last first event ID (the event ID of the last batch of events in the history)
2023
        // 4. the next event ID
2024
        // 5. whether the workflow is closed
2025
        // 6. error if any
2026
        queryHistory := func(
436✔
2027
                domainUUID string,
436✔
2028
                execution *types.WorkflowExecution,
436✔
2029
                expectedNextEventID int64,
436✔
2030
                currentBranchToken []byte,
436✔
2031
        ) ([]byte, string, int64, int64, bool, error) {
828✔
2032
                response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
392✔
2033
                        DomainUUID:          domainUUID,
392✔
2034
                        Execution:           execution,
392✔
2035
                        ExpectedNextEventID: expectedNextEventID,
392✔
2036
                        CurrentBranchToken:  currentBranchToken,
392✔
2037
                })
392✔
2038

392✔
2039
                if err != nil {
392✔
2040
                        return nil, "", 0, 0, false, err
×
2041
                }
×
2042
                isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
392✔
2043

392✔
2044
                return response.CurrentBranchToken,
392✔
2045
                        response.Execution.GetRunID(),
392✔
2046
                        response.GetLastFirstEventID(),
392✔
2047
                        response.GetNextEventID(),
392✔
2048
                        isWorkflowRunning,
392✔
2049
                        nil
392✔
2050
        }
2051

2052
        isLongPoll := getRequest.GetWaitForNewEvent()
436✔
2053
        isCloseEventOnly := getRequest.GetHistoryEventFilterType() == types.HistoryEventFilterTypeCloseEvent
436✔
2054
        execution := getRequest.Execution
436✔
2055
        token := &getHistoryContinuationToken{}
436✔
2056

436✔
2057
        var runID string
436✔
2058
        lastFirstEventID := common.FirstEventID
436✔
2059
        var nextEventID int64
436✔
2060
        var isWorkflowRunning bool
436✔
2061

436✔
2062
        // process the token for paging
436✔
2063
        queryNextEventID := common.EndEventID
436✔
2064
        if getRequest.NextPageToken != nil {
480✔
2065
                token, err = deserializeHistoryToken(getRequest.NextPageToken)
44✔
2066
                if err != nil {
44✔
2067
                        return nil, validate.ErrInvalidNextPageToken
×
2068
                }
×
2069
                if execution.RunID != "" && execution.GetRunID() != token.RunID {
44✔
2070
                        return nil, validate.ErrNextPageTokenRunIDMismatch
×
2071
                }
×
2072

2073
                execution.RunID = token.RunID
44✔
2074

44✔
2075
                // we need to update the current next event ID and whether workflow is running
44✔
2076
                if len(token.PersistenceToken) == 0 && isLongPoll && token.IsWorkflowRunning {
44✔
2077
                        logger := wh.GetLogger().WithTags(
×
2078
                                tag.WorkflowDomainName(getRequest.GetDomain()),
×
2079
                                tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2080
                                tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2081
                        )
×
2082
                        // TODO: for now we only log the invalid timeout (this is done inside the helper function) in case
×
2083
                        // this change breaks existing customers. Once we are sure no one is calling this API with very short timeout
×
2084
                        // we can return the error.
×
2085
                        _ = common.ValidateLongPollContextTimeout(ctx, "GetWorkflowExecutionHistory", logger)
×
2086

×
2087
                        if !isCloseEventOnly {
×
2088
                                queryNextEventID = token.NextEventID
×
2089
                        }
×
2090
                        token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
×
2091
                                queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
×
2092
                        if err != nil {
×
2093
                                return nil, err
×
2094
                        }
×
2095
                        token.FirstEventID = token.NextEventID
×
2096
                        token.NextEventID = nextEventID
×
2097
                        token.IsWorkflowRunning = isWorkflowRunning
×
2098
                }
2099
        } else {
392✔
2100
                if !isCloseEventOnly {
766✔
2101
                        queryNextEventID = common.FirstEventID
374✔
2102
                }
374✔
2103
                token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
392✔
2104
                        queryHistory(domainID, execution, queryNextEventID, nil)
392✔
2105
                if err != nil {
392✔
2106
                        return nil, err
×
2107
                }
×
2108

2109
                execution.RunID = runID
392✔
2110

392✔
2111
                token.RunID = runID
392✔
2112
                token.FirstEventID = common.FirstEventID
392✔
2113
                token.NextEventID = nextEventID
392✔
2114
                token.IsWorkflowRunning = isWorkflowRunning
392✔
2115
                token.PersistenceToken = nil
392✔
2116
        }
2117

2118
        call := yarpc.CallFromContext(ctx)
436✔
2119
        clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
436✔
2120
        clientImpl := call.Header(common.ClientImplHeaderName)
436✔
2121
        supportsRawHistoryQuery := wh.versionChecker.SupportsRawHistoryQuery(clientImpl, clientFeatureVersion) == nil
436✔
2122
        isRawHistoryEnabled := wh.config.SendRawWorkflowHistory(domainName) && supportsRawHistoryQuery
436✔
2123

436✔
2124
        history := &types.History{}
436✔
2125
        history.Events = []*types.HistoryEvent{}
436✔
2126
        var historyBlob []*types.DataBlob
436✔
2127

436✔
2128
        // helper function to just getHistory
436✔
2129
        getHistory := func(firstEventID, nextEventID int64, nextPageToken []byte) error {
872✔
2130
                if isRawHistoryEnabled {
438✔
2131
                        historyBlob, token.PersistenceToken, err = wh.getRawHistory(
2✔
2132
                                ctx,
2✔
2133
                                scope,
2✔
2134
                                domainID,
2✔
2135
                                domainName,
2✔
2136
                                *execution,
2✔
2137
                                firstEventID,
2✔
2138
                                nextEventID,
2✔
2139
                                getRequest.GetMaximumPageSize(),
2✔
2140
                                nextPageToken,
2✔
2141
                                token.TransientDecision,
2✔
2142
                                token.BranchToken,
2✔
2143
                        )
2✔
2144
                } else {
436✔
2145
                        history, token.PersistenceToken, err = wh.getHistory(
434✔
2146
                                ctx,
434✔
2147
                                scope,
434✔
2148
                                domainID,
434✔
2149
                                domainName,
434✔
2150
                                *execution,
434✔
2151
                                firstEventID,
434✔
2152
                                nextEventID,
434✔
2153
                                getRequest.GetMaximumPageSize(),
434✔
2154
                                nextPageToken,
434✔
2155
                                token.TransientDecision,
434✔
2156
                                token.BranchToken,
434✔
2157
                        )
434✔
2158
                }
434✔
2159
                if err != nil {
436✔
2160
                        return err
×
2161
                }
×
2162
                return nil
436✔
2163
        }
2164

2165
        if isCloseEventOnly {
454✔
2166
                if !isWorkflowRunning {
36✔
2167
                        if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil {
18✔
2168
                                return nil, err
×
2169
                        }
×
2170
                        if isRawHistoryEnabled {
18✔
2171
                                // since getHistory func will not return empty history, so the below is safe
×
2172
                                historyBlob = historyBlob[len(historyBlob)-1:]
×
2173
                        } else {
18✔
2174
                                // since getHistory func will not return empty history, so the below is safe
18✔
2175
                                history.Events = history.Events[len(history.Events)-1:]
18✔
2176
                        }
18✔
2177
                        token = nil
18✔
2178
                } else if isLongPoll {
×
2179
                        // set the persistence token to be nil so next time we will query history for updates
×
2180
                        token.PersistenceToken = nil
×
2181
                } else {
×
2182
                        token = nil
×
2183
                }
×
2184
        } else {
418✔
2185
                // return all events
418✔
2186
                if token.FirstEventID >= token.NextEventID {
418✔
2187
                        // currently there is no new event
×
2188
                        history.Events = []*types.HistoryEvent{}
×
2189
                        if !isWorkflowRunning {
×
2190
                                token = nil
×
2191
                        }
×
2192
                } else {
418✔
2193
                        if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil {
418✔
2194
                                return nil, err
×
2195
                        }
×
2196
                        // here, for long pull on history events, we need to intercept the paging token from cassandra
2197
                        // and do something clever
2198
                        if len(token.PersistenceToken) == 0 && (!token.IsWorkflowRunning || !isLongPoll) {
794✔
2199
                                // meaning, there is no more history to be returned
376✔
2200
                                token = nil
376✔
2201
                        }
376✔
2202
                }
2203
        }
2204

2205
        nextToken, err := serializeHistoryToken(token)
436✔
2206
        if err != nil {
436✔
2207
                return nil, err
×
2208
        }
×
2209
        return &types.GetWorkflowExecutionHistoryResponse{
436✔
2210
                History:       history,
436✔
2211
                RawHistory:    historyBlob,
436✔
2212
                NextPageToken: nextToken,
436✔
2213
                Archived:      false,
436✔
2214
        }, nil
436✔
2215
}
2216

2217
// SignalWorkflowExecution is used to send a signal event to running workflow execution.  This results in
2218
// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
2219
func (wh *WorkflowHandler) SignalWorkflowExecution(
2220
        ctx context.Context,
2221
        signalRequest *types.SignalWorkflowExecutionRequest,
2222
) (retError error) {
723✔
2223
        if wh.isShuttingDown() {
723✔
2224
                return validate.ErrShuttingDown
×
2225
        }
×
2226

2227
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
723✔
2228
                return err
×
2229
        }
×
2230

2231
        if signalRequest == nil {
723✔
2232
                return validate.ErrRequestNotSet
×
2233
        }
×
2234

2235
        domainName := signalRequest.GetDomain()
723✔
2236
        wfExecution := signalRequest.GetWorkflowExecution()
723✔
2237

723✔
2238
        if domainName == "" {
723✔
2239
                return validate.ErrDomainNotSet
×
2240
        }
×
2241
        if err := validate.CheckExecution(wfExecution); err != nil {
723✔
2242
                return err
×
2243
        }
×
2244

2245
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWorkflowExecutionScope, signalRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
723✔
2246
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
723✔
2247
        if !common.IsValidIDLength(
723✔
2248
                domainName,
723✔
2249
                scope,
723✔
2250
                idLengthWarnLimit,
723✔
2251
                wh.config.DomainNameMaxLength(domainName),
723✔
2252
                metrics.CadenceErrDomainNameExceededWarnLimit,
723✔
2253
                domainName,
723✔
2254
                wh.GetLogger(),
723✔
2255
                tag.IDTypeDomainName) {
723✔
2256
                return validate.ErrDomainTooLong
×
2257
        }
×
2258

2259
        if signalRequest.GetSignalName() == "" {
723✔
2260
                return validate.ErrSignalNameNotSet
×
2261
        }
×
2262

2263
        if !common.IsValidIDLength(
723✔
2264
                signalRequest.GetSignalName(),
723✔
2265
                scope,
723✔
2266
                idLengthWarnLimit,
723✔
2267
                wh.config.SignalNameMaxLength(domainName),
723✔
2268
                metrics.CadenceErrSignalNameExceededWarnLimit,
723✔
2269
                domainName,
723✔
2270
                wh.GetLogger(),
723✔
2271
                tag.IDTypeSignalName) {
723✔
2272
                return validate.ErrSignalNameTooLong
×
2273
        }
×
2274

2275
        if !common.IsValidIDLength(
723✔
2276
                signalRequest.GetRequestID(),
723✔
2277
                scope,
723✔
2278
                idLengthWarnLimit,
723✔
2279
                wh.config.RequestIDMaxLength(domainName),
723✔
2280
                metrics.CadenceErrRequestIDExceededWarnLimit,
723✔
2281
                domainName,
723✔
2282
                wh.GetLogger(),
723✔
2283
                tag.IDTypeRequestID) {
723✔
2284
                return validate.ErrRequestIDTooLong
×
2285
        }
×
2286

2287
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
723✔
2288
        if err != nil {
723✔
2289
                return err
×
2290
        }
×
2291

2292
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
723✔
2293
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
723✔
2294
        if err := common.CheckEventBlobSizeLimit(
723✔
2295
                len(signalRequest.Input),
723✔
2296
                sizeLimitWarn,
723✔
2297
                sizeLimitError,
723✔
2298
                domainID,
723✔
2299
                signalRequest.GetWorkflowExecution().GetWorkflowID(),
723✔
2300
                signalRequest.GetWorkflowExecution().GetRunID(),
723✔
2301
                scope,
723✔
2302
                wh.GetThrottledLogger(),
723✔
2303
                tag.BlobSizeViolationOperation("SignalWorkflowExecution"),
723✔
2304
        ); err != nil {
723✔
2305
                return err
×
2306
        }
×
2307

2308
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
723✔
2309
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
723✔
2310
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2311
        }
×
2312

2313
        err = wh.GetHistoryClient().SignalWorkflowExecution(ctx, &types.HistorySignalWorkflowExecutionRequest{
723✔
2314
                DomainUUID:    domainID,
723✔
2315
                SignalRequest: signalRequest,
723✔
2316
        })
723✔
2317
        if err != nil {
732✔
2318
                return wh.normalizeVersionedErrors(ctx, err)
9✔
2319
        }
9✔
2320

2321
        return nil
714✔
2322
}
2323

2324
func (wh *WorkflowHandler) SignalWithStartWorkflowExecutionAsync(
2325
        ctx context.Context,
2326
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionAsyncRequest,
2327
) (resp *types.SignalWithStartWorkflowExecutionAsyncResponse, retError error) {
3✔
2328
        if wh.isShuttingDown() {
3✔
2329
                return nil, validate.ErrShuttingDown
×
2330
        }
×
2331
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionAsyncScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
3✔
2332
        // validate request before pushing to queue
3✔
2333
        err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest.SignalWithStartWorkflowExecutionRequest, scope)
3✔
2334
        if err != nil {
3✔
2335
                return nil, err
×
2336
        }
×
2337
        producer, err := wh.producerManager.GetProducerByDomain(signalWithStartRequest.GetDomain())
3✔
2338
        if err != nil {
4✔
2339
                return nil, err
1✔
2340
        }
1✔
2341
        // serialize the message to be sent to the queue
2342
        payload, err := json.Marshal(signalWithStartRequest)
2✔
2343
        if err != nil {
2✔
2344
                return nil, err
×
2345
        }
×
2346
        // propagate the headers from the context to the message
2347
        clientHeaders := common.GetClientHeaders(ctx)
2✔
2348
        header := &shared.Header{
2✔
2349
                Fields: map[string][]byte{},
2✔
2350
        }
2✔
2351
        for k, v := range clientHeaders {
12✔
2352
                header.Fields[k] = []byte(v)
10✔
2353
        }
10✔
2354
        messageType := sqlblobs.AsyncRequestTypeSignalWithStartWorkflowExecutionAsyncRequest
2✔
2355
        message := &sqlblobs.AsyncRequestMessage{
2✔
2356
                PartitionKey: common.StringPtr(signalWithStartRequest.GetWorkflowID()),
2✔
2357
                Type:         &messageType,
2✔
2358
                Header:       header,
2✔
2359
                Encoding:     common.StringPtr(string(common.EncodingTypeJSON)),
2✔
2360
                Payload:      payload,
2✔
2361
        }
2✔
2362
        err = producer.Publish(ctx, message)
2✔
2363
        if err != nil {
3✔
2364
                return nil, err
1✔
2365
        }
1✔
2366
        return &types.SignalWithStartWorkflowExecutionAsyncResponse{}, nil
1✔
2367
}
2368

2369
// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
2370
// If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history
2371
// and a decision task being created for the execution.
2372
// If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled
2373
// event recorded in history, and a decision task being created for the execution
2374
func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(
2375
        ctx context.Context,
2376
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest,
2377
) (resp *types.StartWorkflowExecutionResponse, retError error) {
33✔
2378
        if wh.isShuttingDown() {
33✔
2379
                return nil, validate.ErrShuttingDown
×
2380
        }
×
2381

2382
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
33✔
2383
        err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest, scope)
33✔
2384
        if err != nil {
33✔
2385
                return nil, err
×
2386
        }
×
2387

2388
        domainName := signalWithStartRequest.GetDomain()
33✔
2389
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
33✔
2390
        if err != nil {
33✔
2391
                return nil, err
×
2392
        }
×
2393
        resp, err = wh.GetHistoryClient().SignalWithStartWorkflowExecution(ctx, &types.HistorySignalWithStartWorkflowExecutionRequest{
33✔
2394
                DomainUUID:             domainID,
33✔
2395
                SignalWithStartRequest: signalWithStartRequest,
33✔
2396
                PartitionConfig:        wh.getPartitionConfig(ctx, domainName),
33✔
2397
        })
33✔
2398
        if err != nil {
39✔
2399
                return nil, err
6✔
2400
        }
6✔
2401

2402
        return resp, nil
27✔
2403
}
2404

2405
func (wh *WorkflowHandler) validateSignalWithStartWorkflowExecutionRequest(ctx context.Context, signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest, scope metrics.Scope) error {
36✔
2406
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
36✔
2407
                return err
×
2408
        }
×
2409

2410
        if signalWithStartRequest == nil {
36✔
2411
                return validate.ErrRequestNotSet
×
2412
        }
×
2413

2414
        domainName := signalWithStartRequest.GetDomain()
36✔
2415
        if domainName == "" {
36✔
2416
                return validate.ErrDomainNotSet
×
2417
        }
×
2418
        if signalWithStartRequest.GetWorkflowID() == "" {
36✔
2419
                return validate.ErrWorkflowIDNotSet
×
2420
        }
×
2421

2422
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
36✔
2423
        if !common.IsValidIDLength(
36✔
2424
                domainName,
36✔
2425
                scope,
36✔
2426
                idLengthWarnLimit,
36✔
2427
                wh.config.DomainNameMaxLength(domainName),
36✔
2428
                metrics.CadenceErrDomainNameExceededWarnLimit,
36✔
2429
                domainName,
36✔
2430
                wh.GetLogger(),
36✔
2431
                tag.IDTypeDomainName) {
36✔
2432
                return validate.ErrDomainTooLong
×
2433
        }
×
2434

2435
        if !common.IsValidIDLength(
36✔
2436
                signalWithStartRequest.GetWorkflowID(),
36✔
2437
                scope,
36✔
2438
                idLengthWarnLimit,
36✔
2439
                wh.config.WorkflowIDMaxLength(domainName),
36✔
2440
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
36✔
2441
                domainName,
36✔
2442
                wh.GetLogger(),
36✔
2443
                tag.IDTypeWorkflowID) {
36✔
2444
                return validate.ErrWorkflowIDTooLong
×
2445
        }
×
2446

2447
        if signalWithStartRequest.GetSignalName() == "" {
36✔
2448
                return validate.ErrSignalNameNotSet
×
2449
        }
×
2450

2451
        if !common.IsValidIDLength(
36✔
2452
                signalWithStartRequest.GetSignalName(),
36✔
2453
                scope,
36✔
2454
                idLengthWarnLimit,
36✔
2455
                wh.config.SignalNameMaxLength(domainName),
36✔
2456
                metrics.CadenceErrSignalNameExceededWarnLimit,
36✔
2457
                domainName,
36✔
2458
                wh.GetLogger(),
36✔
2459
                tag.IDTypeSignalName) {
36✔
2460
                return validate.ErrSignalNameTooLong
×
2461
        }
×
2462

2463
        if signalWithStartRequest.WorkflowType == nil || signalWithStartRequest.WorkflowType.GetName() == "" {
36✔
2464
                return validate.ErrWorkflowTypeNotSet
×
2465
        }
×
2466

2467
        if !common.IsValidIDLength(
36✔
2468
                signalWithStartRequest.WorkflowType.GetName(),
36✔
2469
                scope,
36✔
2470
                idLengthWarnLimit,
36✔
2471
                wh.config.WorkflowTypeMaxLength(domainName),
36✔
2472
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
36✔
2473
                domainName,
36✔
2474
                wh.GetLogger(),
36✔
2475
                tag.IDTypeWorkflowType) {
36✔
2476
                return validate.ErrWorkflowTypeTooLong
×
2477
        }
×
2478

2479
        if err := wh.validateTaskList(signalWithStartRequest.TaskList, scope, domainName); err != nil {
36✔
2480
                return err
×
2481
        }
×
2482

2483
        if !common.IsValidIDLength(
36✔
2484
                signalWithStartRequest.GetRequestID(),
36✔
2485
                scope,
36✔
2486
                idLengthWarnLimit,
36✔
2487
                wh.config.RequestIDMaxLength(domainName),
36✔
2488
                metrics.CadenceErrRequestIDExceededWarnLimit,
36✔
2489
                domainName,
36✔
2490
                wh.GetLogger(),
36✔
2491
                tag.IDTypeRequestID) {
36✔
2492
                return validate.ErrRequestIDTooLong
×
2493
        }
×
2494

2495
        if signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
36✔
2496
                return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
×
2497
        }
×
2498

2499
        if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
36✔
2500
                return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
×
2501
        }
×
2502

2503
        if err := common.ValidateRetryPolicy(signalWithStartRequest.RetryPolicy); err != nil {
36✔
2504
                return err
×
2505
        }
×
2506

2507
        if signalWithStartRequest.GetCronSchedule() != "" {
36✔
2508
                if _, err := backoff.ValidateSchedule(signalWithStartRequest.GetCronSchedule()); err != nil {
×
2509
                        return err
×
2510
                }
×
2511
        }
2512

2513
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(signalWithStartRequest.SearchAttributes, domainName); err != nil {
36✔
2514
                return err
×
2515
        }
×
2516

2517
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
36✔
2518
        if err != nil {
36✔
2519
                return err
×
2520
        }
×
2521

2522
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
36✔
2523
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
36✔
2524
        if err := common.CheckEventBlobSizeLimit(
36✔
2525
                len(signalWithStartRequest.SignalInput),
36✔
2526
                sizeLimitWarn,
36✔
2527
                sizeLimitError,
36✔
2528
                domainID,
36✔
2529
                signalWithStartRequest.GetWorkflowID(),
36✔
2530
                "",
36✔
2531
                scope,
36✔
2532
                wh.GetThrottledLogger(),
36✔
2533
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
36✔
2534
        ); err != nil {
36✔
2535
                return err
×
2536
        }
×
2537
        actualSize := len(signalWithStartRequest.Input) + common.GetSizeOfMapStringToByteArray(signalWithStartRequest.Memo.GetFields())
36✔
2538
        if err := common.CheckEventBlobSizeLimit(
36✔
2539
                actualSize,
36✔
2540
                sizeLimitWarn,
36✔
2541
                sizeLimitError,
36✔
2542
                domainID,
36✔
2543
                signalWithStartRequest.GetWorkflowID(),
36✔
2544
                "",
36✔
2545
                scope,
36✔
2546
                wh.GetThrottledLogger(),
36✔
2547
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
36✔
2548
        ); err != nil {
36✔
2549
                return err
×
2550
        }
×
2551

2552
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
36✔
2553
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
36✔
2554
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2555
        }
×
2556
        return nil
36✔
2557
}
2558

2559
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event
2560
// in the history and immediately terminating the execution instance.
2561
func (wh *WorkflowHandler) TerminateWorkflowExecution(
2562
        ctx context.Context,
2563
        terminateRequest *types.TerminateWorkflowExecutionRequest,
2564
) (retError error) {
48✔
2565
        if wh.isShuttingDown() {
48✔
2566
                return validate.ErrShuttingDown
×
2567
        }
×
2568

2569
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
48✔
2570
                return err
×
2571
        }
×
2572

2573
        if terminateRequest == nil {
48✔
2574
                return validate.ErrRequestNotSet
×
2575
        }
×
2576

2577
        domainName := terminateRequest.GetDomain()
48✔
2578
        wfExecution := terminateRequest.GetWorkflowExecution()
48✔
2579
        if terminateRequest.GetDomain() == "" {
48✔
2580
                return validate.ErrDomainNotSet
×
2581
        }
×
2582
        if err := validate.CheckExecution(wfExecution); err != nil {
48✔
2583
                return err
×
2584
        }
×
2585

2586
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
48✔
2587
        if err != nil {
48✔
2588
                return err
×
2589
        }
×
2590

2591
        err = wh.GetHistoryClient().TerminateWorkflowExecution(ctx, &types.HistoryTerminateWorkflowExecutionRequest{
48✔
2592
                DomainUUID:       domainID,
48✔
2593
                TerminateRequest: terminateRequest,
48✔
2594
        })
48✔
2595
        if err != nil {
48✔
2596
                return wh.normalizeVersionedErrors(ctx, err)
×
2597
        }
×
2598

2599
        return nil
48✔
2600
}
2601

2602
// ResetWorkflowExecution reset an existing workflow execution to the nextFirstEventID
2603
// in the history and immediately terminating the current execution instance.
2604
func (wh *WorkflowHandler) ResetWorkflowExecution(
2605
        ctx context.Context,
2606
        resetRequest *types.ResetWorkflowExecutionRequest,
2607
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
15✔
2608
        if wh.isShuttingDown() {
15✔
2609
                return nil, validate.ErrShuttingDown
×
2610
        }
×
2611

2612
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2613
                return nil, err
×
2614
        }
×
2615

2616
        if resetRequest == nil {
15✔
2617
                return nil, validate.ErrRequestNotSet
×
2618
        }
×
2619

2620
        domainName := resetRequest.GetDomain()
15✔
2621
        wfExecution := resetRequest.GetWorkflowExecution()
15✔
2622
        if domainName == "" {
15✔
2623
                return nil, validate.ErrDomainNotSet
×
2624
        }
×
2625
        if err := validate.CheckExecution(wfExecution); err != nil {
15✔
2626
                return nil, err
×
2627
        }
×
2628

2629
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
15✔
2630
        if err != nil {
15✔
2631
                return nil, err
×
2632
        }
×
2633

2634
        resp, err = wh.GetHistoryClient().ResetWorkflowExecution(ctx, &types.HistoryResetWorkflowExecutionRequest{
15✔
2635
                DomainUUID:   domainID,
15✔
2636
                ResetRequest: resetRequest,
15✔
2637
        })
15✔
2638
        if err != nil {
15✔
2639
                return nil, err
×
2640
        }
×
2641

2642
        return resp, nil
15✔
2643
}
2644

2645
// RequestCancelWorkflowExecution - requests to cancel a workflow execution
2646
func (wh *WorkflowHandler) RequestCancelWorkflowExecution(
2647
        ctx context.Context,
2648
        cancelRequest *types.RequestCancelWorkflowExecutionRequest,
2649
) (retError error) {
6✔
2650
        if wh.isShuttingDown() {
6✔
2651
                return validate.ErrShuttingDown
×
2652
        }
×
2653

2654
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
6✔
2655
                return err
×
2656
        }
×
2657

2658
        if cancelRequest == nil {
6✔
2659
                return validate.ErrRequestNotSet
×
2660
        }
×
2661

2662
        domainName := cancelRequest.GetDomain()
6✔
2663
        wfExecution := cancelRequest.GetWorkflowExecution()
6✔
2664
        if domainName == "" {
6✔
2665
                return validate.ErrDomainNotSet
×
2666
        }
×
2667
        if err := validate.CheckExecution(wfExecution); err != nil {
6✔
2668
                return err
×
2669
        }
×
2670

2671
        domainID, err := wh.GetDomainCache().GetDomainID(cancelRequest.GetDomain())
6✔
2672
        if err != nil {
6✔
2673
                return err
×
2674
        }
×
2675

2676
        err = wh.GetHistoryClient().RequestCancelWorkflowExecution(ctx, &types.HistoryRequestCancelWorkflowExecutionRequest{
6✔
2677
                DomainUUID:    domainID,
6✔
2678
                CancelRequest: cancelRequest,
6✔
2679
        })
6✔
2680
        if err != nil {
9✔
2681
                return wh.normalizeVersionedErrors(ctx, err)
3✔
2682
        }
3✔
2683

2684
        return nil
3✔
2685
}
2686

2687
// ListOpenWorkflowExecutions - retrieves info for open workflow executions in a domain
2688
func (wh *WorkflowHandler) ListOpenWorkflowExecutions(
2689
        ctx context.Context,
2690
        listRequest *types.ListOpenWorkflowExecutionsRequest,
2691
) (resp *types.ListOpenWorkflowExecutionsResponse, retError error) {
106✔
2692
        if wh.isShuttingDown() {
106✔
2693
                return nil, validate.ErrShuttingDown
×
2694
        }
×
2695

2696
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
106✔
2697
                return nil, err
×
2698
        }
×
2699

2700
        if listRequest == nil {
106✔
2701
                return nil, validate.ErrRequestNotSet
×
2702
        }
×
2703

2704
        if listRequest.GetDomain() == "" {
106✔
2705
                return nil, validate.ErrDomainNotSet
×
2706
        }
×
2707

2708
        if listRequest.StartTimeFilter == nil {
106✔
2709
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2710
        }
×
2711

2712
        if listRequest.StartTimeFilter.EarliestTime == nil {
106✔
2713
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2714
        }
×
2715

2716
        if listRequest.StartTimeFilter.LatestTime == nil {
106✔
2717
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2718
        }
×
2719

2720
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
106✔
2721
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2722
        }
×
2723

2724
        if listRequest.ExecutionFilter != nil && listRequest.TypeFilter != nil {
106✔
2725
                return nil, &types.BadRequestError{
×
2726
                        Message: "Only one of ExecutionFilter or TypeFilter is allowed"}
×
2727
        }
×
2728

2729
        if listRequest.GetMaximumPageSize() <= 0 {
167✔
2730
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
61✔
2731
        }
61✔
2732

2733
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
106✔
2734
                return nil, &types.BadRequestError{
×
2735
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2736
        }
×
2737

2738
        domain := listRequest.GetDomain()
106✔
2739
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
106✔
2740
        if err != nil {
106✔
2741
                return nil, err
×
2742
        }
×
2743

2744
        baseReq := persistence.ListWorkflowExecutionsRequest{
106✔
2745
                DomainUUID:    domainID,
106✔
2746
                Domain:        domain,
106✔
2747
                PageSize:      int(listRequest.GetMaximumPageSize()),
106✔
2748
                NextPageToken: listRequest.NextPageToken,
106✔
2749
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
106✔
2750
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
106✔
2751
        }
106✔
2752

106✔
2753
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
106✔
2754
        if listRequest.ExecutionFilter != nil {
206✔
2755
                if wh.config.DisableListVisibilityByFilter(domain) {
101✔
2756
                        err = validate.ErrNoPermission
1✔
2757
                } else {
100✔
2758
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByWorkflowID(
99✔
2759
                                ctx,
99✔
2760
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
99✔
2761
                                        ListWorkflowExecutionsRequest: baseReq,
99✔
2762
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
99✔
2763
                                })
99✔
2764
                }
99✔
2765
                wh.GetLogger().Debug("List open workflow with filter",
100✔
2766
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
100✔
2767
        } else if listRequest.TypeFilter != nil {
7✔
2768
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2769
                        err = validate.ErrNoPermission
1✔
2770
                } else {
1✔
2771
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByType(
×
2772
                                ctx,
×
2773
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2774
                                        ListWorkflowExecutionsRequest: baseReq,
×
2775
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2776
                                },
×
2777
                        )
×
2778
                }
×
2779
                wh.GetLogger().Debug("List open workflow with filter",
1✔
2780
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2781
        } else {
5✔
2782
                persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutions(ctx, &baseReq)
5✔
2783
        }
5✔
2784

2785
        if err != nil {
108✔
2786
                return nil, err
2✔
2787
        }
2✔
2788

2789
        resp = &types.ListOpenWorkflowExecutionsResponse{}
104✔
2790
        resp.Executions = persistenceResp.Executions
104✔
2791
        resp.NextPageToken = persistenceResp.NextPageToken
104✔
2792
        return resp, nil
104✔
2793
}
2794

2795
// ListArchivedWorkflowExecutions - retrieves archived info for closed workflow executions in a domain
2796
func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(
2797
        ctx context.Context,
2798
        listRequest *types.ListArchivedWorkflowExecutionsRequest,
2799
) (resp *types.ListArchivedWorkflowExecutionsResponse, retError error) {
15✔
2800
        if wh.isShuttingDown() {
15✔
2801
                return nil, validate.ErrShuttingDown
×
2802
        }
×
2803

2804
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2805
                return nil, err
×
2806
        }
×
2807

2808
        if listRequest == nil {
15✔
2809
                return nil, validate.ErrRequestNotSet
×
2810
        }
×
2811

2812
        if listRequest.GetDomain() == "" {
16✔
2813
                return nil, validate.ErrDomainNotSet
1✔
2814
        }
1✔
2815

2816
        if listRequest.GetPageSize() <= 0 {
14✔
2817
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
2818
        }
×
2819

2820
        maxPageSize := wh.config.VisibilityArchivalQueryMaxPageSize()
14✔
2821
        if int(listRequest.GetPageSize()) > maxPageSize {
14✔
2822
                return nil, &types.BadRequestError{
×
2823
                        Message: fmt.Sprintf("Pagesize is larger than allowed %d", maxPageSize)}
×
2824
        }
×
2825

2826
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() {
15✔
2827
                return nil, &types.BadRequestError{Message: "Cluster is not configured for visibility archival"}
1✔
2828
        }
1✔
2829

2830
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ReadEnabled() {
13✔
2831
                return nil, &types.BadRequestError{Message: "Cluster is not configured for reading archived visibility records"}
×
2832
        }
×
2833

2834
        entry, err := wh.GetDomainCache().GetDomain(listRequest.GetDomain())
13✔
2835
        if err != nil {
14✔
2836
                return nil, err
1✔
2837
        }
1✔
2838

2839
        if entry.GetConfig().VisibilityArchivalStatus != types.ArchivalStatusEnabled {
14✔
2840
                return nil, &types.BadRequestError{Message: "Domain is not configured for visibility archival"}
2✔
2841
        }
2✔
2842

2843
        URI, err := archiver.NewURI(entry.GetConfig().VisibilityArchivalURI)
10✔
2844
        if err != nil {
10✔
2845
                return nil, err
×
2846
        }
×
2847

2848
        visibilityArchiver, err := wh.GetArchiverProvider().GetVisibilityArchiver(URI.Scheme(), service.Frontend)
10✔
2849
        if err != nil {
10✔
2850
                return nil, err
×
2851
        }
×
2852

2853
        archiverRequest := &archiver.QueryVisibilityRequest{
10✔
2854
                DomainID:      entry.GetInfo().ID,
10✔
2855
                PageSize:      int(listRequest.GetPageSize()),
10✔
2856
                NextPageToken: listRequest.NextPageToken,
10✔
2857
                Query:         listRequest.GetQuery(),
10✔
2858
        }
10✔
2859

10✔
2860
        archiverResponse, err := visibilityArchiver.Query(ctx, URI, archiverRequest)
10✔
2861
        if err != nil {
10✔
2862
                return nil, err
×
2863
        }
×
2864

2865
        // special handling of ExecutionTime for cron or retry
2866
        for _, execution := range archiverResponse.Executions {
25✔
2867
                if execution.GetExecutionTime() == 0 {
30✔
2868
                        execution.ExecutionTime = common.Int64Ptr(execution.GetStartTime())
15✔
2869
                }
15✔
2870
        }
2871

2872
        return &types.ListArchivedWorkflowExecutionsResponse{
10✔
2873
                Executions:    archiverResponse.Executions,
10✔
2874
                NextPageToken: archiverResponse.NextPageToken,
10✔
2875
        }, nil
10✔
2876
}
2877

2878
// ListClosedWorkflowExecutions - retrieves info for closed workflow executions in a domain
2879
func (wh *WorkflowHandler) ListClosedWorkflowExecutions(
2880
        ctx context.Context,
2881
        listRequest *types.ListClosedWorkflowExecutionsRequest,
2882
) (resp *types.ListClosedWorkflowExecutionsResponse, retError error) {
29✔
2883
        if wh.isShuttingDown() {
29✔
2884
                return nil, validate.ErrShuttingDown
×
2885
        }
×
2886

2887
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
29✔
2888
                return nil, err
×
2889
        }
×
2890

2891
        if listRequest == nil {
29✔
2892
                return nil, validate.ErrRequestNotSet
×
2893
        }
×
2894

2895
        if listRequest.GetDomain() == "" {
29✔
2896
                return nil, validate.ErrDomainNotSet
×
2897
        }
×
2898

2899
        if listRequest.StartTimeFilter == nil {
29✔
2900
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2901
        }
×
2902

2903
        if listRequest.StartTimeFilter.EarliestTime == nil {
29✔
2904
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2905
        }
×
2906

2907
        if listRequest.StartTimeFilter.LatestTime == nil {
29✔
2908
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2909
        }
×
2910

2911
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
29✔
2912
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2913
        }
×
2914

2915
        filterCount := 0
29✔
2916
        if listRequest.TypeFilter != nil {
30✔
2917
                filterCount++
1✔
2918
        }
1✔
2919
        if listRequest.StatusFilter != nil {
30✔
2920
                filterCount++
1✔
2921
        }
1✔
2922

2923
        if filterCount > 1 {
29✔
2924
                return nil, &types.BadRequestError{
×
2925
                        Message: "Only one of ExecutionFilter, TypeFilter or StatusFilter is allowed"}
×
2926
        } // If ExecutionFilter is provided with one of TypeFilter or StatusFilter, use ExecutionFilter and ignore other filter
×
2927

2928
        if listRequest.GetMaximumPageSize() <= 0 {
30✔
2929
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
1✔
2930
        }
1✔
2931

2932
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
29✔
2933
                return nil, &types.BadRequestError{
×
2934
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2935
        }
×
2936

2937
        domain := listRequest.GetDomain()
29✔
2938
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
29✔
2939
        if err != nil {
29✔
2940
                return nil, err
×
2941
        }
×
2942

2943
        baseReq := persistence.ListWorkflowExecutionsRequest{
29✔
2944
                DomainUUID:    domainID,
29✔
2945
                Domain:        domain,
29✔
2946
                PageSize:      int(listRequest.GetMaximumPageSize()),
29✔
2947
                NextPageToken: listRequest.NextPageToken,
29✔
2948
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
29✔
2949
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
29✔
2950
        }
29✔
2951

29✔
2952
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
29✔
2953
        if listRequest.ExecutionFilter != nil {
45✔
2954
                if wh.config.DisableListVisibilityByFilter(domain) {
17✔
2955
                        err = validate.ErrNoPermission
1✔
2956
                } else {
16✔
2957
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByWorkflowID(
15✔
2958
                                ctx,
15✔
2959
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
15✔
2960
                                        ListWorkflowExecutionsRequest: baseReq,
15✔
2961
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
15✔
2962
                                },
15✔
2963
                        )
15✔
2964
                }
15✔
2965
                wh.GetLogger().Debug("List closed workflow with filter",
16✔
2966
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
16✔
2967
        } else if listRequest.TypeFilter != nil {
14✔
2968
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2969
                        err = validate.ErrNoPermission
1✔
2970
                } else {
1✔
2971
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByType(
×
2972
                                ctx,
×
2973
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2974
                                        ListWorkflowExecutionsRequest: baseReq,
×
2975
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2976
                                },
×
2977
                        )
×
2978
                }
×
2979
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
2980
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2981
        } else if listRequest.StatusFilter != nil {
13✔
2982
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2983
                        err = validate.ErrNoPermission
1✔
2984
                } else {
1✔
2985
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByStatus(
×
2986
                                ctx,
×
2987
                                &persistence.ListClosedWorkflowExecutionsByStatusRequest{
×
2988
                                        ListWorkflowExecutionsRequest: baseReq,
×
2989
                                        Status:                        listRequest.GetStatusFilter(),
×
2990
                                },
×
2991
                        )
×
2992
                }
×
2993
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
2994
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByStatus)
1✔
2995
        } else {
11✔
2996
                persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutions(ctx, &baseReq)
11✔
2997
        }
11✔
2998

2999
        if err != nil {
32✔
3000
                return nil, err
3✔
3001
        }
3✔
3002

3003
        resp = &types.ListClosedWorkflowExecutionsResponse{}
26✔
3004
        resp.Executions = persistenceResp.Executions
26✔
3005
        resp.NextPageToken = persistenceResp.NextPageToken
26✔
3006
        return resp, nil
26✔
3007
}
3008

3009
// ListWorkflowExecutions - retrieves info for workflow executions in a domain
3010
func (wh *WorkflowHandler) ListWorkflowExecutions(
3011
        ctx context.Context,
3012
        listRequest *types.ListWorkflowExecutionsRequest,
3013
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
140✔
3014
        if wh.isShuttingDown() {
140✔
3015
                return nil, validate.ErrShuttingDown
×
3016
        }
×
3017

3018
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
140✔
3019
                return nil, err
×
3020
        }
×
3021

3022
        if listRequest == nil {
140✔
3023
                return nil, validate.ErrRequestNotSet
×
3024
        }
×
3025

3026
        if listRequest.GetDomain() == "" {
140✔
3027
                return nil, validate.ErrDomainNotSet
×
3028
        }
×
3029

3030
        if listRequest.GetPageSize() <= 0 {
140✔
3031
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3032
        }
×
3033

3034
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
140✔
3035
                return nil, &types.BadRequestError{
×
3036
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3037
        }
×
3038

3039
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
140✔
3040
        if err != nil {
143✔
3041
                return nil, err
3✔
3042
        }
3✔
3043

3044
        domain := listRequest.GetDomain()
137✔
3045
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
137✔
3046
        if err != nil {
137✔
3047
                return nil, err
×
3048
        }
×
3049

3050
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
137✔
3051
                DomainUUID:    domainID,
137✔
3052
                Domain:        domain,
137✔
3053
                PageSize:      int(listRequest.GetPageSize()),
137✔
3054
                NextPageToken: listRequest.NextPageToken,
137✔
3055
                Query:         validatedQuery,
137✔
3056
        }
137✔
3057
        persistenceResp, err := wh.GetVisibilityManager().ListWorkflowExecutions(ctx, req)
137✔
3058
        if err != nil {
137✔
3059
                return nil, err
×
3060
        }
×
3061

3062
        resp = &types.ListWorkflowExecutionsResponse{}
137✔
3063
        resp.Executions = persistenceResp.Executions
137✔
3064
        resp.NextPageToken = persistenceResp.NextPageToken
137✔
3065
        return resp, nil
137✔
3066
}
3067

3068
// RestartWorkflowExecution - retrieves info for an existing workflow then restarts it
3069
func (wh *WorkflowHandler) RestartWorkflowExecution(ctx context.Context, request *types.RestartWorkflowExecutionRequest) (resp *types.RestartWorkflowExecutionResponse, retError error) {
2✔
3070
        if wh.isShuttingDown() {
2✔
3071
                return nil, validate.ErrShuttingDown
×
3072
        }
×
3073

3074
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
3075
                return nil, err
×
3076
        }
×
3077

3078
        if request == nil {
2✔
3079
                return nil, validate.ErrRequestNotSet
×
3080
        }
×
3081

3082
        domainName := request.GetDomain()
2✔
3083
        wfExecution := request.GetWorkflowExecution()
2✔
3084

2✔
3085
        if request.GetDomain() == "" {
2✔
3086
                return nil, validate.ErrDomainNotSet
×
3087
        }
×
3088

3089
        if err := validate.CheckExecution(wfExecution); err != nil {
2✔
3090
                return nil, err
×
3091
        }
×
3092

3093
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
2✔
3094
        if err != nil {
2✔
3095
                return nil, err
×
3096
        }
×
3097

3098
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
2✔
3099
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
3✔
3100
                return nil, &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
3101
        }
1✔
3102

3103
        history, err := wh.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
1✔
3104
                Domain: domainName,
1✔
3105
                Execution: &types.WorkflowExecution{
1✔
3106
                        WorkflowID: wfExecution.WorkflowID,
1✔
3107
                        RunID:      wfExecution.RunID,
1✔
3108
                },
1✔
3109
                SkipArchival: true,
1✔
3110
        })
1✔
3111
        if err != nil {
1✔
3112
                return nil, validate.ErrHistoryNotFound
×
3113
        }
×
3114
        startRequest := constructRestartWorkflowRequest(history.History.Events[0].WorkflowExecutionStartedEventAttributes,
1✔
3115
                domainName, request.Identity, wfExecution.WorkflowID)
1✔
3116
        req, err := common.CreateHistoryStartWorkflowRequest(domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
1✔
3117
        if err != nil {
1✔
3118
                return nil, err
×
3119
        }
×
3120
        startResp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, req)
1✔
3121
        if err != nil {
1✔
3122
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3123
        }
×
3124
        resp = &types.RestartWorkflowExecutionResponse{
1✔
3125
                RunID: startResp.RunID,
1✔
3126
        }
1✔
3127

1✔
3128
        return resp, nil
1✔
3129
}
3130

3131
// ScanWorkflowExecutions - retrieves info for large amount of workflow executions in a domain without order
3132
func (wh *WorkflowHandler) ScanWorkflowExecutions(
3133
        ctx context.Context,
3134
        listRequest *types.ListWorkflowExecutionsRequest,
3135
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
28✔
3136
        if wh.isShuttingDown() {
28✔
3137
                return nil, validate.ErrShuttingDown
×
3138
        }
×
3139

3140
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
28✔
3141
                return nil, err
×
3142
        }
×
3143

3144
        if listRequest == nil {
28✔
3145
                return nil, validate.ErrRequestNotSet
×
3146
        }
×
3147

3148
        if listRequest.GetDomain() == "" {
28✔
3149
                return nil, validate.ErrDomainNotSet
×
3150
        }
×
3151

3152
        if listRequest.GetPageSize() <= 0 {
28✔
3153
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3154
        }
×
3155

3156
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
28✔
3157
                return nil, &types.BadRequestError{
×
3158
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3159
        }
×
3160

3161
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
28✔
3162
        if err != nil {
29✔
3163
                return nil, err
1✔
3164
        }
1✔
3165

3166
        domain := listRequest.GetDomain()
27✔
3167
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
27✔
3168
        if err != nil {
27✔
3169
                return nil, err
×
3170
        }
×
3171

3172
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
27✔
3173
                DomainUUID:    domainID,
27✔
3174
                Domain:        domain,
27✔
3175
                PageSize:      int(listRequest.GetPageSize()),
27✔
3176
                NextPageToken: listRequest.NextPageToken,
27✔
3177
                Query:         validatedQuery,
27✔
3178
        }
27✔
3179
        persistenceResp, err := wh.GetVisibilityManager().ScanWorkflowExecutions(ctx, req)
27✔
3180
        if err != nil {
27✔
3181
                return nil, err
×
3182
        }
×
3183

3184
        resp = &types.ListWorkflowExecutionsResponse{}
27✔
3185
        resp.Executions = persistenceResp.Executions
27✔
3186
        resp.NextPageToken = persistenceResp.NextPageToken
27✔
3187
        return resp, nil
27✔
3188
}
3189

3190
// CountWorkflowExecutions - count number of workflow executions in a domain
3191
func (wh *WorkflowHandler) CountWorkflowExecutions(
3192
        ctx context.Context,
3193
        countRequest *types.CountWorkflowExecutionsRequest,
3194
) (resp *types.CountWorkflowExecutionsResponse, retError error) {
14✔
3195
        if wh.isShuttingDown() {
14✔
3196
                return nil, validate.ErrShuttingDown
×
3197
        }
×
3198

3199
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
14✔
3200
                return nil, err
×
3201
        }
×
3202

3203
        if countRequest == nil {
14✔
3204
                return nil, validate.ErrRequestNotSet
×
3205
        }
×
3206

3207
        if countRequest.GetDomain() == "" {
14✔
3208
                return nil, validate.ErrDomainNotSet
×
3209
        }
×
3210

3211
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(countRequest.GetQuery())
14✔
3212
        if err != nil {
15✔
3213
                return nil, err
1✔
3214
        }
1✔
3215

3216
        domain := countRequest.GetDomain()
13✔
3217
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
13✔
3218
        if err != nil {
13✔
3219
                return nil, err
×
3220
        }
×
3221

3222
        req := &persistence.CountWorkflowExecutionsRequest{
13✔
3223
                DomainUUID: domainID,
13✔
3224
                Domain:     domain,
13✔
3225
                Query:      validatedQuery,
13✔
3226
        }
13✔
3227
        persistenceResp, err := wh.GetVisibilityManager().CountWorkflowExecutions(ctx, req)
13✔
3228
        if err != nil {
13✔
3229
                return nil, err
×
3230
        }
×
3231

3232
        resp = &types.CountWorkflowExecutionsResponse{
13✔
3233
                Count: persistenceResp.Count,
13✔
3234
        }
13✔
3235
        return resp, nil
13✔
3236
}
3237

3238
// GetSearchAttributes return valid indexed keys
3239
func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context) (resp *types.GetSearchAttributesResponse, retError error) {
1✔
3240
        if wh.isShuttingDown() {
1✔
3241
                return nil, validate.ErrShuttingDown
×
3242
        }
×
3243

3244
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1✔
3245
                return nil, err
×
3246
        }
×
3247

3248
        keys := wh.config.ValidSearchAttributes()
1✔
3249
        resp = &types.GetSearchAttributesResponse{
1✔
3250
                Keys: wh.convertIndexedKeyToThrift(keys),
1✔
3251
        }
1✔
3252
        return resp, nil
1✔
3253
}
3254

3255
// ResetStickyTaskList reset the volatile information in mutable state of a given workflow.
3256
func (wh *WorkflowHandler) ResetStickyTaskList(
3257
        ctx context.Context,
3258
        resetRequest *types.ResetStickyTaskListRequest,
3259
) (resp *types.ResetStickyTaskListResponse, retError error) {
3✔
3260
        if wh.isShuttingDown() {
3✔
3261
                return nil, validate.ErrShuttingDown
×
3262
        }
×
3263

3264
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
3265
                return nil, err
×
3266
        }
×
3267

3268
        if resetRequest == nil {
3✔
3269
                return nil, validate.ErrRequestNotSet
×
3270
        }
×
3271

3272
        domainName := resetRequest.GetDomain()
3✔
3273
        wfExecution := resetRequest.GetExecution()
3✔
3274

3✔
3275
        if domainName == "" {
3✔
3276
                return nil, validate.ErrDomainNotSet
×
3277
        }
×
3278

3279
        if err := validate.CheckExecution(wfExecution); err != nil {
3✔
3280
                return nil, err
×
3281
        }
×
3282

3283
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
3✔
3284
        if err != nil {
3✔
3285
                return nil, err
×
3286
        }
×
3287

3288
        _, err = wh.GetHistoryClient().ResetStickyTaskList(ctx, &types.HistoryResetStickyTaskListRequest{
3✔
3289
                DomainUUID: domainID,
3✔
3290
                Execution:  resetRequest.Execution,
3✔
3291
        })
3✔
3292
        if err != nil {
3✔
3293
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3294
        }
×
3295
        return &types.ResetStickyTaskListResponse{}, nil
3✔
3296
}
3297

3298
// QueryWorkflow returns query result for a specified workflow execution
3299
func (wh *WorkflowHandler) QueryWorkflow(
3300
        ctx context.Context,
3301
        queryRequest *types.QueryWorkflowRequest,
3302
) (resp *types.QueryWorkflowResponse, retError error) {
45✔
3303
        if wh.isShuttingDown() {
45✔
3304
                return nil, validate.ErrShuttingDown
×
3305
        }
×
3306

3307
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
45✔
3308
                return nil, err
×
3309
        }
×
3310

3311
        if queryRequest == nil {
45✔
3312
                return nil, validate.ErrRequestNotSet
×
3313
        }
×
3314

3315
        domainName := queryRequest.GetDomain()
45✔
3316
        wfExecution := queryRequest.GetExecution()
45✔
3317

45✔
3318
        if domainName == "" {
45✔
3319
                return nil, validate.ErrDomainNotSet
×
3320
        }
×
3321

3322
        if err := validate.CheckExecution(wfExecution); err != nil {
45✔
3323
                return nil, err
×
3324
        }
×
3325

3326
        if wh.config.DisallowQuery(domainName) {
45✔
3327
                return nil, validate.ErrQueryDisallowedForDomain
×
3328
        }
×
3329

3330
        if queryRequest.Query == nil {
45✔
3331
                return nil, validate.ErrQueryNotSet
×
3332
        }
×
3333

3334
        if queryRequest.Query.GetQueryType() == "" {
45✔
3335
                return nil, validate.ErrQueryTypeNotSet
×
3336
        }
×
3337

3338
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
45✔
3339
        if err != nil {
45✔
3340
                return nil, err
×
3341
        }
×
3342

3343
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
45✔
3344
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
45✔
3345

45✔
3346
        scope := getMetricsScopeWithDomain(metrics.FrontendQueryWorkflowScope, queryRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
45✔
3347
        if err := common.CheckEventBlobSizeLimit(
45✔
3348
                len(queryRequest.GetQuery().GetQueryArgs()),
45✔
3349
                sizeLimitWarn,
45✔
3350
                sizeLimitError,
45✔
3351
                domainID,
45✔
3352
                queryRequest.GetExecution().GetWorkflowID(),
45✔
3353
                queryRequest.GetExecution().GetRunID(),
45✔
3354
                scope,
45✔
3355
                wh.GetThrottledLogger(),
45✔
3356
                tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil {
45✔
3357
                return nil, err
×
3358
        }
×
3359

3360
        req := &types.HistoryQueryWorkflowRequest{
45✔
3361
                DomainUUID: domainID,
45✔
3362
                Request:    queryRequest,
45✔
3363
        }
45✔
3364
        hResponse, err := wh.GetHistoryClient().QueryWorkflow(ctx, req)
45✔
3365
        if err != nil {
57✔
3366
                return nil, err
12✔
3367
        }
12✔
3368
        return hResponse.GetResponse(), nil
33✔
3369
}
3370

3371
// DescribeWorkflowExecution returns information about the specified workflow execution.
3372
func (wh *WorkflowHandler) DescribeWorkflowExecution(
3373
        ctx context.Context,
3374
        request *types.DescribeWorkflowExecutionRequest,
3375
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
93✔
3376
        if wh.isShuttingDown() {
93✔
3377
                return nil, validate.ErrShuttingDown
×
3378
        }
×
3379

3380
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
93✔
3381
                return nil, err
×
3382
        }
×
3383

3384
        if request == nil {
93✔
3385
                return nil, validate.ErrRequestNotSet
×
3386
        }
×
3387

3388
        domainName := request.GetDomain()
93✔
3389
        wfExecution := request.GetExecution()
93✔
3390
        if domainName == "" {
93✔
3391
                return nil, validate.ErrDomainNotSet
×
3392
        }
×
3393

3394
        if err := validate.CheckExecution(wfExecution); err != nil {
93✔
3395
                return nil, err
×
3396
        }
×
3397

3398
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
93✔
3399
        if err != nil {
93✔
3400
                return nil, err
×
3401
        }
×
3402

3403
        response, err := wh.GetHistoryClient().DescribeWorkflowExecution(ctx, &types.HistoryDescribeWorkflowExecutionRequest{
93✔
3404
                DomainUUID: domainID,
93✔
3405
                Request:    request,
93✔
3406
        })
93✔
3407

93✔
3408
        if err != nil {
93✔
3409
                return nil, err
×
3410
        }
×
3411

3412
        return response, nil
93✔
3413
}
3414

3415
// DescribeTaskList returns information about the target tasklist, right now this API returns the
3416
// pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true,
3417
// it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
3418
func (wh *WorkflowHandler) DescribeTaskList(
3419
        ctx context.Context,
3420
        request *types.DescribeTaskListRequest,
3421
) (resp *types.DescribeTaskListResponse, retError error) {
18✔
3422
        if wh.isShuttingDown() {
18✔
3423
                return nil, validate.ErrShuttingDown
×
3424
        }
×
3425

3426
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
18✔
3427
                return nil, err
×
3428
        }
×
3429

3430
        if request == nil {
18✔
3431
                return nil, validate.ErrRequestNotSet
×
3432
        }
×
3433

3434
        if request.GetDomain() == "" {
18✔
3435
                return nil, validate.ErrDomainNotSet
×
3436
        }
×
3437

3438
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
18✔
3439
        if err != nil {
18✔
3440
                return nil, err
×
3441
        }
×
3442

3443
        scope := getMetricsScopeWithDomain(metrics.FrontendDescribeTaskListScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
18✔
3444
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
18✔
3445
                return nil, err
×
3446
        }
×
3447

3448
        if request.TaskListType == nil {
18✔
3449
                return nil, validate.ErrTaskListTypeNotSet
×
3450
        }
×
3451

3452
        response, err := wh.GetMatchingClient().DescribeTaskList(ctx, &types.MatchingDescribeTaskListRequest{
18✔
3453
                DomainUUID:  domainID,
18✔
3454
                DescRequest: request,
18✔
3455
        })
18✔
3456
        if err != nil {
18✔
3457
                return nil, err
×
3458
        }
×
3459

3460
        return response, nil
18✔
3461
}
3462

3463
// ListTaskListPartitions returns all the partition and host for a taskList
3464
func (wh *WorkflowHandler) ListTaskListPartitions(
3465
        ctx context.Context,
3466
        request *types.ListTaskListPartitionsRequest,
3467
) (resp *types.ListTaskListPartitionsResponse, retError error) {
×
3468
        if wh.isShuttingDown() {
×
3469
                return nil, validate.ErrShuttingDown
×
3470
        }
×
3471

3472
        if request == nil {
×
3473
                return nil, validate.ErrRequestNotSet
×
3474
        }
×
3475

3476
        if request.GetDomain() == "" {
×
3477
                return nil, validate.ErrDomainNotSet
×
3478
        }
×
3479

3480
        scope := getMetricsScopeWithDomain(metrics.FrontendListTaskListPartitionsScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
3481
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
×
3482
                return nil, err
×
3483
        }
×
3484

3485
        resp, err := wh.GetMatchingClient().ListTaskListPartitions(ctx, &types.MatchingListTaskListPartitionsRequest{
×
3486
                Domain:   request.Domain,
×
3487
                TaskList: request.TaskList,
×
3488
        })
×
3489
        return resp, err
×
3490
}
3491

3492
// GetTaskListsByDomain returns all the partition and host for a taskList
3493
func (wh *WorkflowHandler) GetTaskListsByDomain(
3494
        ctx context.Context,
3495
        request *types.GetTaskListsByDomainRequest,
3496
) (resp *types.GetTaskListsByDomainResponse, retError error) {
×
3497
        if wh.isShuttingDown() {
×
3498
                return nil, validate.ErrShuttingDown
×
3499
        }
×
3500

3501
        if request == nil {
×
3502
                return nil, validate.ErrRequestNotSet
×
3503
        }
×
3504

3505
        if request.GetDomain() == "" {
×
3506
                return nil, validate.ErrDomainNotSet
×
3507
        }
×
3508

3509
        resp, err := wh.GetMatchingClient().GetTaskListsByDomain(ctx, &types.GetTaskListsByDomainRequest{
×
3510
                Domain: request.Domain,
×
3511
        })
×
3512
        return resp, err
×
3513
}
3514

3515
// RefreshWorkflowTasks re-generates the workflow tasks
3516
func (wh *WorkflowHandler) RefreshWorkflowTasks(
3517
        ctx context.Context,
3518
        request *types.RefreshWorkflowTasksRequest,
3519
) (err error) {
×
3520
        if request == nil {
×
3521
                return validate.ErrRequestNotSet
×
3522
        }
×
3523
        if err := validate.CheckExecution(request.Execution); err != nil {
×
3524
                return err
×
3525
        }
×
3526
        domainEntry, err := wh.GetDomainCache().GetDomain(request.GetDomain())
×
3527
        if err != nil {
×
3528
                return err
×
3529
        }
×
3530

3531
        err = wh.GetHistoryClient().RefreshWorkflowTasks(ctx, &types.HistoryRefreshWorkflowTasksRequest{
×
3532
                DomainUIID: domainEntry.GetInfo().ID,
×
3533
                Request:    request,
×
3534
        })
×
3535
        if err != nil {
×
3536
                return err
×
3537
        }
×
3538
        return nil
×
3539
}
3540

3541
func (wh *WorkflowHandler) getRawHistory(
3542
        ctx context.Context,
3543
        scope metrics.Scope,
3544
        domainID string,
3545
        domainName string,
3546
        execution types.WorkflowExecution,
3547
        firstEventID int64,
3548
        nextEventID int64,
3549
        pageSize int32,
3550
        nextPageToken []byte,
3551
        transientDecision *types.TransientDecisionInfo,
3552
        branchToken []byte,
3553
) ([]*types.DataBlob, []byte, error) {
2✔
3554
        rawHistory := []*types.DataBlob{}
2✔
3555
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
2✔
3556

2✔
3557
        resp, err := wh.GetHistoryManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
2✔
3558
                BranchToken:   branchToken,
2✔
3559
                MinEventID:    firstEventID,
2✔
3560
                MaxEventID:    nextEventID,
2✔
3561
                PageSize:      int(pageSize),
2✔
3562
                NextPageToken: nextPageToken,
2✔
3563
                ShardID:       common.IntPtr(shardID),
2✔
3564
                DomainName:    domainName,
2✔
3565
        })
2✔
3566
        if err != nil {
2✔
3567
                return nil, nil, err
×
3568
        }
×
3569

3570
        var encoding *types.EncodingType
2✔
3571
        for _, data := range resp.HistoryEventBlobs {
4✔
3572
                switch data.Encoding {
2✔
3573
                case common.EncodingTypeJSON:
×
3574
                        encoding = types.EncodingTypeJSON.Ptr()
×
3575
                case common.EncodingTypeThriftRW:
2✔
3576
                        encoding = types.EncodingTypeThriftRW.Ptr()
2✔
3577
                default:
×
3578
                        panic(fmt.Sprintf("Invalid encoding type for raw history, encoding type: %s", data.Encoding))
×
3579
                }
3580
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3581
                        EncodingType: encoding,
2✔
3582
                        Data:         data.Data,
2✔
3583
                })
2✔
3584
        }
3585

3586
        if len(resp.NextPageToken) == 0 && transientDecision != nil {
4✔
3587
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
2✔
3588
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3589
                        wh.GetLogger().Error("getHistory error",
×
3590
                                tag.WorkflowDomainID(domainID),
×
3591
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3592
                                tag.WorkflowRunID(execution.GetRunID()),
×
3593
                                tag.Error(err))
×
3594
                }
×
3595
                blob, err := wh.GetPayloadSerializer().SerializeBatchEvents(
2✔
3596
                        []*types.HistoryEvent{transientDecision.ScheduledEvent, transientDecision.StartedEvent}, common.EncodingTypeThriftRW)
2✔
3597
                if err != nil {
2✔
3598
                        return nil, nil, err
×
3599
                }
×
3600
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3601
                        EncodingType: types.EncodingTypeThriftRW.Ptr(),
2✔
3602
                        Data:         blob.Data,
2✔
3603
                })
2✔
3604
        }
3605

3606
        return rawHistory, resp.NextPageToken, nil
2✔
3607
}
3608

3609
func (wh *WorkflowHandler) getHistory(
3610
        ctx context.Context,
3611
        scope metrics.Scope,
3612
        domainID string,
3613
        domainName string,
3614
        execution types.WorkflowExecution,
3615
        firstEventID, nextEventID int64,
3616
        pageSize int32,
3617
        nextPageToken []byte,
3618
        transientDecision *types.TransientDecisionInfo,
3619
        branchToken []byte,
3620
) (*types.History, []byte, error) {
1,590✔
3621

1,590✔
3622
        var size int
1,590✔
3623

1,590✔
3624
        isFirstPage := len(nextPageToken) == 0
1,590✔
3625
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
1,590✔
3626
        var err error
1,590✔
3627
        historyEvents, size, nextPageToken, err := persistenceutils.ReadFullPageV2Events(ctx, wh.GetHistoryManager(), &persistence.ReadHistoryBranchRequest{
1,590✔
3628
                BranchToken:   branchToken,
1,590✔
3629
                MinEventID:    firstEventID,
1,590✔
3630
                MaxEventID:    nextEventID,
1,590✔
3631
                PageSize:      int(pageSize),
1,590✔
3632
                NextPageToken: nextPageToken,
1,590✔
3633
                ShardID:       common.IntPtr(shardID),
1,590✔
3634
                DomainName:    domainName,
1,590✔
3635
        })
1,590✔
3636

1,590✔
3637
        if err != nil {
1,590✔
3638
                return nil, nil, err
×
3639
        }
×
3640

3641
        scope.RecordTimer(metrics.HistorySize, time.Duration(size))
1,590✔
3642

1,590✔
3643
        isLastPage := len(nextPageToken) == 0
1,590✔
3644
        if err := verifyHistoryIsComplete(
1,590✔
3645
                historyEvents,
1,590✔
3646
                firstEventID,
1,590✔
3647
                nextEventID-1,
1,590✔
3648
                isFirstPage,
1,590✔
3649
                isLastPage,
1,590✔
3650
                int(pageSize)); err != nil {
1,590✔
3651
                scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3652
                wh.GetLogger().Error("getHistory: incomplete history",
×
3653
                        tag.WorkflowDomainID(domainID),
×
3654
                        tag.WorkflowID(execution.GetWorkflowID()),
×
3655
                        tag.WorkflowRunID(execution.GetRunID()),
×
3656
                        tag.Error(err))
×
3657
                return nil, nil, err
×
3658
        }
×
3659

3660
        if len(nextPageToken) == 0 && transientDecision != nil {
1,761✔
3661
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
171✔
3662
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3663
                        wh.GetLogger().Error("getHistory error",
×
3664
                                tag.WorkflowDomainID(domainID),
×
3665
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3666
                                tag.WorkflowRunID(execution.GetRunID()),
×
3667
                                tag.Error(err))
×
3668
                }
×
3669
                // Append the transient decision events once we are done enumerating everything from the events table
3670
                historyEvents = append(historyEvents, transientDecision.ScheduledEvent, transientDecision.StartedEvent)
171✔
3671
        }
3672

3673
        executionHistory := &types.History{}
1,590✔
3674
        executionHistory.Events = historyEvents
1,590✔
3675
        return executionHistory, nextPageToken, nil
1,590✔
3676
}
3677

3678
func (wh *WorkflowHandler) validateTransientDecisionEvents(
3679
        expectedNextEventID int64,
3680
        decision *types.TransientDecisionInfo,
3681
) error {
173✔
3682

173✔
3683
        if decision.ScheduledEvent.ID == expectedNextEventID &&
173✔
3684
                decision.StartedEvent.ID == expectedNextEventID+1 {
346✔
3685
                return nil
173✔
3686
        }
173✔
3687

3688
        return fmt.Errorf(
×
3689
                "invalid transient decision: "+
×
3690
                        "expectedScheduledEventID=%v expectedStartedEventID=%v but have scheduledEventID=%v startedEventID=%v",
×
3691
                expectedNextEventID,
×
3692
                expectedNextEventID+1,
×
3693
                decision.ScheduledEvent.ID,
×
3694
                decision.StartedEvent.ID)
×
3695
}
3696

3697
func (wh *WorkflowHandler) validateTaskList(t *types.TaskList, scope metrics.Scope, domain string) error {
2,683✔
3698
        if t == nil || t.GetName() == "" {
2,684✔
3699
                return validate.ErrTaskListNotSet
1✔
3700
        }
1✔
3701

3702
        if !common.IsValidIDLength(
2,682✔
3703
                t.GetName(),
2,682✔
3704
                scope,
2,682✔
3705
                wh.config.MaxIDLengthWarnLimit(),
2,682✔
3706
                wh.config.TaskListNameMaxLength(domain),
2,682✔
3707
                metrics.CadenceErrTaskListNameExceededWarnLimit,
2,682✔
3708
                domain,
2,682✔
3709
                wh.GetLogger(),
2,682✔
3710
                tag.IDTypeTaskListName) {
2,682✔
3711
                return validate.ErrTaskListTooLong
×
3712
        }
×
3713
        return nil
2,682✔
3714
}
3715

3716
func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
3717
        ctx context.Context,
3718
        scope metrics.Scope,
3719
        domainID string,
3720
        matchingResp *types.MatchingPollForDecisionTaskResponse,
3721
        branchToken []byte,
3722
) (*types.PollForDecisionTaskResponse, error) {
1,210✔
3723

1,210✔
3724
        if matchingResp.WorkflowExecution == nil {
1,259✔
3725
                // this will happen if there is no decision task to be send to worker / caller
49✔
3726
                return &types.PollForDecisionTaskResponse{}, nil
49✔
3727
        }
49✔
3728

3729
        var history *types.History
1,161✔
3730
        var continuation []byte
1,161✔
3731
        var err error
1,161✔
3732

1,161✔
3733
        if matchingResp.GetStickyExecutionEnabled() && matchingResp.Query != nil {
1,167✔
3734
                // meaning sticky query, we should not return any events to worker
6✔
3735
                // since query task only check the current status
6✔
3736
                history = &types.History{
6✔
3737
                        Events: []*types.HistoryEvent{},
6✔
3738
                }
6✔
3739
        } else {
1,161✔
3740
                // here we have 3 cases:
1,155✔
3741
                // 1. sticky && non query task
1,155✔
3742
                // 2. non sticky &&  non query task
1,155✔
3743
                // 3. non sticky && query task
1,155✔
3744
                // for 1, partial history have to be send back
1,155✔
3745
                // for 2 and 3, full history have to be send back
1,155✔
3746

1,155✔
3747
                var persistenceToken []byte
1,155✔
3748

1,155✔
3749
                firstEventID := common.FirstEventID
1,155✔
3750
                nextEventID := matchingResp.GetNextEventID()
1,155✔
3751
                if matchingResp.GetStickyExecutionEnabled() {
1,251✔
3752
                        firstEventID = matchingResp.GetPreviousStartedEventID() + 1
96✔
3753
                }
96✔
3754
                domainName, dErr := wh.GetDomainCache().GetDomainName(domainID)
1,155✔
3755
                if dErr != nil {
1,155✔
3756
                        return nil, dErr
×
3757
                }
×
3758
                scope = scope.Tagged(metrics.DomainTag(domainName))
1,155✔
3759
                history, persistenceToken, err = wh.getHistory(
1,155✔
3760
                        ctx,
1,155✔
3761
                        scope,
1,155✔
3762
                        domainID,
1,155✔
3763
                        domainName,
1,155✔
3764
                        *matchingResp.WorkflowExecution,
1,155✔
3765
                        firstEventID,
1,155✔
3766
                        nextEventID,
1,155✔
3767
                        int32(wh.config.HistoryMaxPageSize(domainName)),
1,155✔
3768
                        nil,
1,155✔
3769
                        matchingResp.DecisionInfo,
1,155✔
3770
                        branchToken,
1,155✔
3771
                )
1,155✔
3772
                if err != nil {
1,155✔
3773
                        return nil, err
×
3774
                }
×
3775

3776
                if len(persistenceToken) != 0 {
1,155✔
3777
                        continuation, err = serializeHistoryToken(&getHistoryContinuationToken{
×
3778
                                RunID:             matchingResp.WorkflowExecution.GetRunID(),
×
3779
                                FirstEventID:      firstEventID,
×
3780
                                NextEventID:       nextEventID,
×
3781
                                PersistenceToken:  persistenceToken,
×
3782
                                TransientDecision: matchingResp.DecisionInfo,
×
3783
                                BranchToken:       branchToken,
×
3784
                        })
×
3785
                        if err != nil {
×
3786
                                return nil, err
×
3787
                        }
×
3788
                }
3789
        }
3790

3791
        resp := &types.PollForDecisionTaskResponse{
1,161✔
3792
                TaskToken:                 matchingResp.TaskToken,
1,161✔
3793
                WorkflowExecution:         matchingResp.WorkflowExecution,
1,161✔
3794
                WorkflowType:              matchingResp.WorkflowType,
1,161✔
3795
                PreviousStartedEventID:    matchingResp.PreviousStartedEventID,
1,161✔
3796
                StartedEventID:            matchingResp.StartedEventID, // this field is not set for query tasks as there's no decision task started event
1,161✔
3797
                Query:                     matchingResp.Query,
1,161✔
3798
                BacklogCountHint:          matchingResp.BacklogCountHint,
1,161✔
3799
                Attempt:                   matchingResp.Attempt,
1,161✔
3800
                History:                   history,
1,161✔
3801
                NextPageToken:             continuation,
1,161✔
3802
                WorkflowExecutionTaskList: matchingResp.WorkflowExecutionTaskList,
1,161✔
3803
                ScheduledTimestamp:        matchingResp.ScheduledTimestamp,
1,161✔
3804
                StartedTimestamp:          matchingResp.StartedTimestamp,
1,161✔
3805
                Queries:                   matchingResp.Queries,
1,161✔
3806
                NextEventID:               matchingResp.NextEventID,
1,161✔
3807
                TotalHistoryBytes:         matchingResp.TotalHistoryBytes,
1,161✔
3808
        }
1,161✔
3809

1,161✔
3810
        return resp, nil
1,161✔
3811
}
3812

3813
func verifyHistoryIsComplete(
3814
        events []*types.HistoryEvent,
3815
        expectedFirstEventID int64,
3816
        expectedLastEventID int64,
3817
        isFirstPage bool,
3818
        isLastPage bool,
3819
        pageSize int,
3820
) error {
1,609✔
3821

1,609✔
3822
        nEvents := len(events)
1,609✔
3823
        if nEvents == 0 {
1,621✔
3824
                if isLastPage {
24✔
3825
                        // we seem to be returning a non-nil pageToken on the lastPage which
12✔
3826
                        // in turn cases the client to call getHistory again - only to find
12✔
3827
                        // there are no more events to consume - bail out if this is the case here
12✔
3828
                        return nil
12✔
3829
                }
12✔
3830
                return fmt.Errorf("invalid history: contains zero events")
×
3831
        }
3832

3833
        firstEventID := events[0].ID
1,597✔
3834
        lastEventID := events[nEvents-1].ID
1,597✔
3835

1,597✔
3836
        if !isFirstPage { // atleast one page of history has been read previously
1,633✔
3837
                if firstEventID <= expectedFirstEventID {
36✔
3838
                        // not first page and no events have been read in the previous pages - not possible
×
3839
                        return &types.InternalServiceError{
×
3840
                                Message: fmt.Sprintf(
×
3841
                                        "invalid history: expected first eventID to be > %v but got %v", expectedFirstEventID, firstEventID),
×
3842
                        }
×
3843
                }
×
3844
                expectedFirstEventID = firstEventID
36✔
3845
        }
3846

3847
        if !isLastPage {
1,646✔
3848
                // estimate lastEventID based on pageSize. This is a lower bound
49✔
3849
                // since the persistence layer counts "batch of events" as a single page
49✔
3850
                expectedLastEventID = expectedFirstEventID + int64(pageSize) - 1
49✔
3851
        }
49✔
3852

3853
        nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1
1,597✔
3854

1,597✔
3855
        if firstEventID == expectedFirstEventID &&
1,597✔
3856
                ((isLastPage && lastEventID == expectedLastEventID && int64(nEvents) == nExpectedEvents) ||
1,597✔
3857
                        (!isLastPage && lastEventID >= expectedLastEventID && int64(nEvents) >= nExpectedEvents)) {
3,182✔
3858
                return nil
1,585✔
3859
        }
1,585✔
3860

3861
        return &types.InternalServiceError{
12✔
3862
                Message: fmt.Sprintf(
12✔
3863
                        "incomplete history: "+
12✔
3864
                                "expected events [%v-%v] but got events [%v-%v] of length %v:"+
12✔
3865
                                "isFirstPage=%v,isLastPage=%v,pageSize=%v",
12✔
3866
                        expectedFirstEventID,
12✔
3867
                        expectedLastEventID,
12✔
3868
                        firstEventID,
12✔
3869
                        lastEventID,
12✔
3870
                        nEvents,
12✔
3871
                        isFirstPage,
12✔
3872
                        isLastPage,
12✔
3873
                        pageSize),
12✔
3874
        }
12✔
3875
}
3876

3877
func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) {
44✔
3878
        token := &getHistoryContinuationToken{}
44✔
3879
        err := json.Unmarshal(bytes, token)
44✔
3880
        return token, err
44✔
3881
}
44✔
3882

3883
func serializeHistoryToken(token *getHistoryContinuationToken) ([]byte, error) {
436✔
3884
        if token == nil {
830✔
3885
                return nil, nil
394✔
3886
        }
394✔
3887

3888
        bytes, err := json.Marshal(token)
42✔
3889
        return bytes, err
42✔
3890
}
3891

3892
func isFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3893
        return updateRequest.ActiveClusterName != nil
9✔
3894
}
9✔
3895

3896
func isGraceFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3897
        return updateRequest.FailoverTimeoutInSeconds != nil
9✔
3898
}
9✔
3899

3900
func (wh *WorkflowHandler) checkOngoingFailover(
3901
        ctx context.Context,
3902
        domainName *string,
3903
) error {
1✔
3904

1✔
3905
        enabledClusters := wh.GetClusterMetadata().GetEnabledClusterInfo()
1✔
3906
        respChan := make(chan *types.DescribeDomainResponse, len(enabledClusters))
1✔
3907

1✔
3908
        g := &errgroup.Group{}
1✔
3909
        for clusterName := range enabledClusters {
3✔
3910
                frontendClient := wh.GetRemoteFrontendClient(clusterName)
2✔
3911
                g.Go(func() (e error) {
4✔
3912
                        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &e) }()
4✔
3913

3914
                        resp, _ := frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{Name: domainName})
2✔
3915
                        respChan <- resp
2✔
3916
                        return nil
2✔
3917
                })
3918
        }
3919
        g.Wait()
1✔
3920
        close(respChan)
1✔
3921

1✔
3922
        var failoverVersion *int64
1✔
3923
        for resp := range respChan {
3✔
3924
                if resp == nil {
2✔
3925
                        return &types.InternalServiceError{
×
3926
                                Message: "Failed to verify failover version from all clusters",
×
3927
                        }
×
3928
                }
×
3929
                if failoverVersion == nil {
3✔
3930
                        failoverVersion = &resp.FailoverVersion
1✔
3931
                }
1✔
3932
                if *failoverVersion != resp.GetFailoverVersion() {
2✔
3933
                        return &types.BadRequestError{
×
3934
                                Message: "Concurrent failover is not allow.",
×
3935
                        }
×
3936
                }
×
3937
        }
3938
        return nil
1✔
3939
}
3940

3941
func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *types.GetWorkflowExecutionHistoryRequest, domainID string) bool {
444✔
3942
        if request.GetExecution() == nil || request.GetExecution().GetRunID() == "" {
479✔
3943
                return false
35✔
3944
        }
35✔
3945
        getMutableStateRequest := &types.GetMutableStateRequest{
409✔
3946
                DomainUUID: domainID,
409✔
3947
                Execution:  request.Execution,
409✔
3948
        }
409✔
3949
        _, err := wh.GetHistoryClient().GetMutableState(ctx, getMutableStateRequest)
409✔
3950
        if err == nil {
795✔
3951
                return false
386✔
3952
        }
386✔
3953
        switch err.(type) {
23✔
3954
        case *types.EntityNotExistsError:
22✔
3955
                // the only case in which history is assumed to be archived is if getting mutable state returns entity not found error
22✔
3956
                return true
22✔
3957
        }
3958
        return false
1✔
3959
}
3960

3961
func (wh *WorkflowHandler) getArchivedHistory(
3962
        ctx context.Context,
3963
        request *types.GetWorkflowExecutionHistoryRequest,
3964
        domainID string,
3965
) (*types.GetWorkflowExecutionHistoryResponse, error) {
25✔
3966
        entry, err := wh.GetDomainCache().GetDomainByID(domainID)
25✔
3967
        if err != nil {
26✔
3968
                return nil, err
1✔
3969
        }
1✔
3970

3971
        URIString := entry.GetConfig().HistoryArchivalURI
24✔
3972
        if URIString == "" {
25✔
3973
                // if URI is empty, it means the domain has never enabled for archival.
1✔
3974
                // the error is not "workflow has passed retention period", because
1✔
3975
                // we have no way to tell if the requested workflow exists or not.
1✔
3976
                return nil, validate.ErrHistoryNotFound
1✔
3977
        }
1✔
3978

3979
        URI, err := archiver.NewURI(URIString)
23✔
3980
        if err != nil {
24✔
3981
                return nil, err
1✔
3982
        }
1✔
3983

3984
        historyArchiver, err := wh.GetArchiverProvider().GetHistoryArchiver(URI.Scheme(), service.Frontend)
22✔
3985
        if err != nil {
22✔
3986
                return nil, err
×
3987
        }
×
3988

3989
        resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{
22✔
3990
                DomainID:      domainID,
22✔
3991
                WorkflowID:    request.GetExecution().GetWorkflowID(),
22✔
3992
                RunID:         request.GetExecution().GetRunID(),
22✔
3993
                NextPageToken: request.GetNextPageToken(),
22✔
3994
                PageSize:      int(request.GetMaximumPageSize()),
22✔
3995
        })
22✔
3996
        if err != nil {
22✔
3997
                return nil, err
×
3998
        }
×
3999

4000
        history := &types.History{}
22✔
4001
        for _, batch := range resp.HistoryBatches {
279✔
4002
                history.Events = append(history.Events, batch.Events...)
257✔
4003
        }
257✔
4004
        return &types.GetWorkflowExecutionHistoryResponse{
22✔
4005
                History:       history,
22✔
4006
                NextPageToken: resp.NextPageToken,
22✔
4007
                Archived:      true,
22✔
4008
        }, nil
22✔
4009
}
4010

4011
func (wh *WorkflowHandler) convertIndexedKeyToThrift(keys map[string]interface{}) map[string]types.IndexedValueType {
3✔
4012
        converted := make(map[string]types.IndexedValueType)
3✔
4013
        for k, v := range keys {
51✔
4014
                converted[k] = common.ConvertIndexedValueTypeToInternalType(v, wh.GetLogger())
48✔
4015
        }
48✔
4016
        return converted
2✔
4017
}
4018

4019
func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain string) bool {
303✔
4020
        return common.IsAdvancedVisibilityReadingEnabled(wh.config.EnableReadVisibilityFromES(domain), wh.config.IsAdvancedVisConfigExist) &&
303✔
4021
                pageSize > int32(wh.config.ESIndexMaxResultWindow())
303✔
4022
}
303✔
4023

4024
// GetClusterInfo return information about cadence deployment
4025
func (wh *WorkflowHandler) GetClusterInfo(
4026
        ctx context.Context,
4027
) (resp *types.ClusterInfo, err error) {
×
4028
        return &types.ClusterInfo{
×
4029
                SupportedClientVersions: &types.SupportedClientVersions{
×
4030
                        GoSdk:   client.SupportedGoSDKVersion,
×
4031
                        JavaSdk: client.SupportedJavaSDKVersion,
×
4032
                },
×
4033
        }, nil
×
4034
}
×
4035

4036
func checkFailOverPermission(config *config.Config, domainName string) error {
2✔
4037
        if config.Lockdown(domainName) {
3✔
4038
                return validate.ErrDomainInLockdown
1✔
4039
        }
1✔
4040
        return nil
1✔
4041
}
4042

4043
type domainWrapper struct {
4044
        domain string
4045
}
4046

4047
func (d domainWrapper) GetDomain() string {
1,757✔
4048
        return d.domain
1,757✔
4049
}
1,757✔
4050

4051
func (hs HealthStatus) String() string {
2✔
4052
        switch hs {
2✔
4053
        case HealthStatusOK:
1✔
4054
                return "OK"
1✔
4055
        case HealthStatusWarmingUp:
1✔
4056
                return "WarmingUp"
1✔
4057
        case HealthStatusShuttingDown:
×
4058
                return "ShuttingDown"
×
4059
        default:
×
4060
                return "unknown"
×
4061
        }
4062
}
4063

4064
func getDomainWfIDRunIDTags(
4065
        domainName string,
4066
        wf *types.WorkflowExecution,
4067
) []tag.Tag {
1,480✔
4068
        tags := []tag.Tag{tag.WorkflowDomainName(domainName)}
1,480✔
4069
        if wf == nil {
2,960✔
4070
                return tags
1,480✔
4071
        }
1,480✔
4072
        return append(
×
4073
                tags,
×
4074
                tag.WorkflowID(wf.GetWorkflowID()),
×
4075
                tag.WorkflowRunID(wf.GetRunID()),
×
4076
        )
×
4077
}
4078

4079
func checkRequiredDomainDataKVs(requiredDomainDataKeys map[string]interface{}, domainData map[string]string) error {
45✔
4080
        // check requiredDomainDataKeys
45✔
4081
        for k := range requiredDomainDataKeys {
46✔
4082
                _, ok := domainData[k]
1✔
4083
                if !ok {
2✔
4084
                        return fmt.Errorf("domain data error, missing required key %v . All required keys: %v", k, requiredDomainDataKeys)
1✔
4085
                }
1✔
4086
        }
4087
        return nil
44✔
4088
}
4089

4090
// Some error types are introduced later that some clients might not support
4091
// To make them backward compatible, we continue returning the legacy error types
4092
// for older clients
4093
func (wh *WorkflowHandler) normalizeVersionedErrors(ctx context.Context, err error) error {
66✔
4094
        switch err.(type) {
66✔
4095
        case *types.WorkflowExecutionAlreadyCompletedError:
20✔
4096
                call := yarpc.CallFromContext(ctx)
20✔
4097
                clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
20✔
4098
                clientImpl := call.Header(common.ClientImplHeaderName)
20✔
4099
                featureFlags := client.GetFeatureFlagsFromHeader(call)
20✔
4100

20✔
4101
                vErr := wh.versionChecker.SupportsWorkflowAlreadyCompletedError(clientImpl, clientFeatureVersion, featureFlags)
20✔
4102
                if vErr == nil {
23✔
4103
                        return err
3✔
4104
                }
3✔
4105
                return &types.EntityNotExistsError{Message: "Workflow execution already completed."}
17✔
4106
        default:
46✔
4107
                return err
46✔
4108
        }
4109
}
4110
func constructRestartWorkflowRequest(w *types.WorkflowExecutionStartedEventAttributes, domain string, identity string, workflowID string) *types.StartWorkflowExecutionRequest {
1✔
4111

1✔
4112
        startRequest := &types.StartWorkflowExecutionRequest{
1✔
4113
                RequestID:  uuid.New().String(),
1✔
4114
                Domain:     domain,
1✔
4115
                WorkflowID: workflowID,
1✔
4116
                WorkflowType: &types.WorkflowType{
1✔
4117
                        Name: w.WorkflowType.Name,
1✔
4118
                },
1✔
4119
                TaskList: &types.TaskList{
1✔
4120
                        Name: w.TaskList.Name,
1✔
4121
                },
1✔
4122
                Input:                               w.Input,
1✔
4123
                ExecutionStartToCloseTimeoutSeconds: w.ExecutionStartToCloseTimeoutSeconds,
1✔
4124
                TaskStartToCloseTimeoutSeconds:      w.TaskStartToCloseTimeoutSeconds,
1✔
4125
                Identity:                            identity,
1✔
4126
                WorkflowIDReusePolicy:               types.WorkflowIDReusePolicyTerminateIfRunning.Ptr(),
1✔
4127
        }
1✔
4128
        startRequest.CronSchedule = w.CronSchedule
1✔
4129
        startRequest.RetryPolicy = w.RetryPolicy
1✔
4130
        startRequest.DelayStartSeconds = w.FirstDecisionTaskBackoffSeconds
1✔
4131
        startRequest.Header = w.Header
1✔
4132
        startRequest.Memo = w.Memo
1✔
4133
        startRequest.SearchAttributes = w.SearchAttributes
1✔
4134

1✔
4135
        return startRequest
1✔
4136
}
1✔
4137

4138
func getMetricsScopeWithDomain(
4139
        scope int,
4140
        d domainGetter,
4141
        metricsClient metrics.Client,
4142
) metrics.Scope {
5,752✔
4143
        var metricsScope metrics.Scope
5,752✔
4144
        if d != nil {
11,504✔
4145
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainTag(d.GetDomain()))
5,752✔
4146
        } else {
5,752✔
4147
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainUnknownTag())
×
4148
        }
×
4149
        return metricsScope
5,752✔
4150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc