• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 019056dc-98d1-4fa6-b475-a7aef51f4b90

26 Jun 2024 11:23PM UTC coverage: 71.557% (-0.001%) from 71.558%
019056dc-98d1-4fa6-b475-a7aef51f4b90

push

buildkite

web-flow
Fix encoding bug to index context header in search attributes (#6148)

What changed?

json marshal raw string bytes before store in search attributes

Why?

Context Header stores the raw string bytes; but search attributes should store json strings rather than raw string bytes. Otherwise, it will cause unmarshal error in creating visibility message.

How did you test it?

unit test

15 of 19 new or added lines in 3 files covered. (78.95%)

24 existing lines in 7 files now uncovered.

107133 of 149716 relevant lines covered (71.56%)

2587.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.74
/service/frontend/api/handler.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package api
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "fmt"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/google/uuid"
31
        "go.uber.org/yarpc"
32
        "golang.org/x/sync/errgroup"
33

34
        "github.com/uber/cadence/.gen/go/shared"
35
        "github.com/uber/cadence/.gen/go/sqlblobs"
36
        "github.com/uber/cadence/common"
37
        "github.com/uber/cadence/common/archiver"
38
        "github.com/uber/cadence/common/backoff"
39
        "github.com/uber/cadence/common/cache"
40
        "github.com/uber/cadence/common/client"
41
        "github.com/uber/cadence/common/codec"
42
        "github.com/uber/cadence/common/domain"
43
        "github.com/uber/cadence/common/elasticsearch/validator"
44
        "github.com/uber/cadence/common/log"
45
        "github.com/uber/cadence/common/log/tag"
46
        "github.com/uber/cadence/common/metrics"
47
        "github.com/uber/cadence/common/partition"
48
        "github.com/uber/cadence/common/persistence"
49
        persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
50
        "github.com/uber/cadence/common/resource"
51
        "github.com/uber/cadence/common/service"
52
        "github.com/uber/cadence/common/types"
53
        "github.com/uber/cadence/common/types/mapper/thrift"
54
        "github.com/uber/cadence/service/frontend/config"
55
        "github.com/uber/cadence/service/frontend/validate"
56
)
57

58
const (
59
        // HealthStatusOK is used when this node is healthy and rpc requests are allowed
60
        HealthStatusOK HealthStatus = iota + 1
61
        // HealthStatusWarmingUp is used when the rpc handler is warming up
62
        HealthStatusWarmingUp
63
        // HealthStatusShuttingDown is used when the rpc handler is shutting down
64
        HealthStatusShuttingDown
65
)
66

67
var _ Handler = (*WorkflowHandler)(nil)
68

69
type (
70
        // WorkflowHandler - Thrift handler interface for workflow service
71
        WorkflowHandler struct {
72
                resource.Resource
73

74
                shuttingDown              int32
75
                healthStatus              int32
76
                tokenSerializer           common.TaskTokenSerializer
77
                config                    *config.Config
78
                versionChecker            client.VersionChecker
79
                domainHandler             domain.Handler
80
                visibilityQueryValidator  *validator.VisibilityQueryValidator
81
                searchAttributesValidator *validator.SearchAttributesValidator
82
                throttleRetry             *backoff.ThrottleRetry
83
                producerManager           ProducerManager
84
                thriftrwEncoder           codec.BinaryEncoder
85
        }
86

87
        getHistoryContinuationToken struct {
88
                RunID             string
89
                FirstEventID      int64
90
                NextEventID       int64
91
                IsWorkflowRunning bool
92
                PersistenceToken  []byte
93
                TransientDecision *types.TransientDecisionInfo
94
                BranchToken       []byte
95
        }
96

97
        domainGetter interface {
98
                GetDomain() string
99
        }
100

101
        // HealthStatus is an enum that refers to the rpc handler health status
102
        HealthStatus int32
103
)
104

105
var (
106
        frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
107
)
108

109
// NewWorkflowHandler creates a thrift handler for the cadence service
110
func NewWorkflowHandler(
111
        resource resource.Resource,
112
        config *config.Config,
113
        versionChecker client.VersionChecker,
114
        domainHandler domain.Handler,
115
) *WorkflowHandler {
141✔
116
        return &WorkflowHandler{
141✔
117
                Resource:        resource,
141✔
118
                config:          config,
141✔
119
                healthStatus:    int32(HealthStatusWarmingUp),
141✔
120
                tokenSerializer: common.NewJSONTaskTokenSerializer(),
141✔
121
                versionChecker:  versionChecker,
141✔
122
                domainHandler:   domainHandler,
141✔
123
                visibilityQueryValidator: validator.NewQueryValidator(
141✔
124
                        config.ValidSearchAttributes,
141✔
125
                        config.EnableQueryAttributeValidation,
141✔
126
                ),
141✔
127
                searchAttributesValidator: validator.NewSearchAttributesValidator(
141✔
128
                        resource.GetLogger(),
141✔
129
                        config.EnableQueryAttributeValidation,
141✔
130
                        config.ValidSearchAttributes,
141✔
131
                        config.SearchAttributesNumberOfKeysLimit,
141✔
132
                        config.SearchAttributesSizeOfValueLimit,
141✔
133
                        config.SearchAttributesTotalSizeLimit,
141✔
134
                ),
141✔
135
                throttleRetry: backoff.NewThrottleRetry(
141✔
136
                        backoff.WithRetryPolicy(frontendServiceRetryPolicy),
141✔
137
                        backoff.WithRetryableError(common.IsServiceTransientError),
141✔
138
                ),
141✔
139
                producerManager: NewProducerManager(
141✔
140
                        resource.GetDomainCache(),
141✔
141
                        resource.GetAsyncWorkflowQueueProvider(),
141✔
142
                        resource.GetLogger(),
141✔
143
                        resource.GetMetricsClient(),
141✔
144
                ),
141✔
145
                thriftrwEncoder: codec.NewThriftRWEncoder(),
141✔
146
        }
141✔
147
}
141✔
148

149
// Start starts the handler
150
func (wh *WorkflowHandler) Start() {
21✔
151
        // TODO: Get warmup duration from config. Even better, run proactive checks such as probing downstream connections.
21✔
152
        const warmUpDuration = 30 * time.Second
21✔
153

21✔
154
        warmupTimer := time.NewTimer(warmUpDuration)
21✔
155
        go func() {
42✔
156
                <-warmupTimer.C
21✔
157
                wh.GetLogger().Warn("Service warmup duration has elapsed.")
21✔
158
                if atomic.CompareAndSwapInt32(&wh.healthStatus, int32(HealthStatusWarmingUp), int32(HealthStatusOK)) {
33✔
159
                        wh.GetLogger().Warn("Warmup time has elapsed. Service is healthy.")
12✔
160
                } else {
13✔
161
                        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
1✔
162
                        wh.GetLogger().Warn(fmt.Sprintf("Warmup time has elapsed. Service status is: %v", status.String()))
1✔
163
                }
1✔
164
        }()
165
}
166

167
// Stop stops the handler
168
func (wh *WorkflowHandler) Stop() {
21✔
169
        atomic.StoreInt32(&wh.shuttingDown, 1)
21✔
170
}
21✔
171

172
// UpdateHealthStatus sets the health status for this rpc handler.
173
// This health status will be used within the rpc health check handler
174
func (wh *WorkflowHandler) UpdateHealthStatus(status HealthStatus) {
22✔
175
        atomic.StoreInt32(&wh.healthStatus, int32(status))
22✔
176
}
22✔
177

178
func (wh *WorkflowHandler) isShuttingDown() bool {
6,740✔
179
        return atomic.LoadInt32(&wh.shuttingDown) != 0
6,740✔
180
}
6,740✔
181

182
// Health is for health check
183
func (wh *WorkflowHandler) Health(ctx context.Context) (*types.HealthStatus, error) {
2✔
184
        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
2✔
185
        msg := status.String()
2✔
186

2✔
187
        if status != HealthStatusOK {
3✔
188
                wh.GetLogger().Warn(fmt.Sprintf("Service status is: %v", msg))
1✔
189
        }
1✔
190

191
        return &types.HealthStatus{
2✔
192
                Ok:  status == HealthStatusOK,
2✔
193
                Msg: msg,
2✔
194
        }, nil
2✔
195
}
196

197
// RegisterDomain creates a new domain which can be used as a container for all resources.  Domain is a top level
198
// entity within Cadence, used as a container for all resources like workflow executions, tasklists, etc.  Domain
199
// acts as a sandbox and provides isolation for all resources within the domain.  All resources belongs to exactly one
200
// domain.
201
func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *types.RegisterDomainRequest) (retError error) {
51✔
202
        if wh.isShuttingDown() {
51✔
203
                return validate.ErrShuttingDown
×
204
        }
×
205

206
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
51✔
207
                return err
×
208
        }
×
209

210
        if registerRequest == nil {
51✔
211
                return validate.ErrRequestNotSet
×
212
        }
×
213

214
        if registerRequest.GetWorkflowExecutionRetentionPeriodInDays() > int32(wh.config.DomainConfig.MaxRetentionDays()) {
51✔
215
                return validate.ErrInvalidRetention
×
216
        }
×
217

218
        if err := validate.CheckPermission(wh.config, registerRequest.SecurityToken); err != nil {
51✔
219
                return err
×
220
        }
×
221

222
        if err := checkRequiredDomainDataKVs(wh.config.DomainConfig.RequiredDomainDataKeys(), registerRequest.GetData()); err != nil {
52✔
223
                return err
1✔
224
        }
1✔
225

226
        if registerRequest.GetName() == "" {
50✔
227
                return validate.ErrDomainNotSet
×
228
        }
×
229

230
        return wh.domainHandler.RegisterDomain(ctx, registerRequest)
50✔
231
}
232

233
// ListDomains returns the information and configuration for a registered domain.
234
func (wh *WorkflowHandler) ListDomains(
235
        ctx context.Context,
236
        listRequest *types.ListDomainsRequest,
237
) (response *types.ListDomainsResponse, retError error) {
2✔
238
        if wh.isShuttingDown() {
2✔
239
                return nil, validate.ErrShuttingDown
×
240
        }
×
241

242
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
243
                return nil, err
×
244
        }
×
245

246
        if listRequest == nil {
3✔
247
                return nil, validate.ErrRequestNotSet
1✔
248
        }
1✔
249

250
        return wh.domainHandler.ListDomains(ctx, listRequest)
1✔
251
}
252

253
// DescribeDomain returns the information and configuration for a registered domain.
254
func (wh *WorkflowHandler) DescribeDomain(
255
        ctx context.Context,
256
        describeRequest *types.DescribeDomainRequest,
257
) (response *types.DescribeDomainResponse, retError error) {
134✔
258
        if wh.isShuttingDown() {
134✔
259
                return nil, validate.ErrShuttingDown
×
260
        }
×
261

262
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
134✔
263
                return nil, err
×
264
        }
×
265

266
        if describeRequest == nil {
134✔
267
                return nil, validate.ErrRequestNotSet
×
268
        }
×
269

270
        if describeRequest.GetName() == "" && describeRequest.GetUUID() == "" {
134✔
271
                return nil, validate.ErrDomainNotSet
×
272
        }
×
273

274
        resp, err := wh.domainHandler.DescribeDomain(ctx, describeRequest)
134✔
275
        if err != nil {
134✔
276
                return nil, err
×
277
        }
×
278

279
        if resp.GetFailoverInfo() != nil && resp.GetFailoverInfo().GetFailoverExpireTimestamp() > 0 {
134✔
280
                // fetch ongoing failover info from history service
×
281
                failoverResp, err := wh.GetHistoryClient().GetFailoverInfo(ctx, &types.GetFailoverInfoRequest{
×
282
                        DomainID: resp.GetDomainInfo().UUID,
×
283
                })
×
284
                if err != nil {
×
285
                        // despite the error from history, return describe domain response
×
286
                        wh.GetLogger().Error(
×
287
                                fmt.Sprintf("Failed to get failover info for domain %s", resp.DomainInfo.GetName()),
×
288
                                tag.Error(err),
×
289
                        )
×
290
                        return resp, nil
×
291
                }
×
292
                resp.FailoverInfo.CompletedShardCount = failoverResp.GetCompletedShardCount()
×
293
                resp.FailoverInfo.PendingShards = failoverResp.GetPendingShards()
×
294
        }
295
        return resp, nil
134✔
296
}
297

298
// UpdateDomain is used to update the information and configuration for a registered domain.
299
func (wh *WorkflowHandler) UpdateDomain(
300
        ctx context.Context,
301
        updateRequest *types.UpdateDomainRequest,
302
) (resp *types.UpdateDomainResponse, retError error) {
9✔
303
        domainName := ""
9✔
304
        if updateRequest != nil {
18✔
305
                domainName = updateRequest.GetName()
9✔
306
        }
9✔
307

308
        logger := wh.GetLogger().WithTags(
9✔
309
                tag.WorkflowDomainName(domainName),
9✔
310
                tag.OperationName("DomainUpdate"))
9✔
311

9✔
312
        if updateRequest == nil {
9✔
313
                logger.Error("Nil domain update request.",
×
314
                        tag.Error(validate.ErrRequestNotSet))
×
315
                return nil, validate.ErrRequestNotSet
×
316
        }
×
317

318
        isFailover := isFailoverRequest(updateRequest)
9✔
319
        isGraceFailover := isGraceFailoverRequest(updateRequest)
9✔
320
        logger.Info(fmt.Sprintf(
9✔
321
                "Domain Update requested. isFailover: %v, isGraceFailover: %v, Request: %#v.",
9✔
322
                isFailover,
9✔
323
                isGraceFailover,
9✔
324
                updateRequest))
9✔
325

9✔
326
        if wh.isShuttingDown() {
9✔
327
                logger.Error("Won't apply the domain update since workflowHandler is shutting down.",
×
328
                        tag.Error(validate.ErrShuttingDown))
×
329
                return nil, validate.ErrShuttingDown
×
330
        }
×
331

332
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
9✔
333
                logger.Error("Won't apply the domain update since client version is not supported.",
×
334
                        tag.Error(err))
×
335
                return nil, err
×
336
        }
×
337

338
        // don't require permission for failover request
339
        if isFailover {
11✔
340
                // reject the failover if the cluster is in lockdown
2✔
341
                if err := checkFailOverPermission(wh.config, updateRequest.GetName()); err != nil {
3✔
342
                        logger.Error("Domain failover request rejected since domain is in lockdown.",
1✔
343
                                tag.Error(err))
1✔
344
                        return nil, err
1✔
345
                }
1✔
346
        } else {
7✔
347
                if err := validate.CheckPermission(wh.config, updateRequest.SecurityToken); err != nil {
7✔
348
                        logger.Error("Domain update request rejected due to failing permissions.",
×
349
                                tag.Error(err))
×
350
                        return nil, err
×
351
                }
×
352
        }
353

354
        if isGraceFailover {
9✔
355
                if err := wh.checkOngoingFailover(
1✔
356
                        ctx,
1✔
357
                        &updateRequest.Name,
1✔
358
                ); err != nil {
1✔
359
                        logger.Error("Graceful domain failover request failed. Not able to check ongoing failovers.",
×
360
                                tag.Error(err))
×
361
                        return nil, err
×
362
                }
×
363
        }
364

365
        if updateRequest.GetName() == "" {
8✔
366
                logger.Error("Domain not set on request.",
×
367
                        tag.Error(validate.ErrDomainNotSet))
×
368
                return nil, validate.ErrDomainNotSet
×
369
        }
×
370
        // TODO: call remote clusters to verify domain data
371
        resp, err := wh.domainHandler.UpdateDomain(ctx, updateRequest)
8✔
372
        if err != nil {
10✔
373
                logger.Error("Domain update operation failed.",
2✔
374
                        tag.Error(err))
2✔
375
                return nil, err
2✔
376
        }
2✔
377
        logger.Info("Domain update operation succeeded.")
6✔
378
        return resp, nil
6✔
379
}
380

381
// DeprecateDomain us used to update status of a registered domain to DEPRECATED. Once the domain is deprecated
382
// it cannot be used to start new workflow executions.  Existing workflow executions will continue to run on
383
// deprecated domains.
384
func (wh *WorkflowHandler) DeprecateDomain(ctx context.Context, deprecateRequest *types.DeprecateDomainRequest) (retError error) {
×
385
        if wh.isShuttingDown() {
×
386
                return validate.ErrShuttingDown
×
387
        }
×
388

389
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
390
                return err
×
391
        }
×
392

393
        if deprecateRequest == nil {
×
394
                return validate.ErrRequestNotSet
×
395
        }
×
396

397
        if err := validate.CheckPermission(wh.config, deprecateRequest.SecurityToken); err != nil {
×
398
                return err
×
399
        }
×
400

401
        if deprecateRequest.GetName() == "" {
×
402
                return validate.ErrDomainNotSet
×
403
        }
×
404

405
        return wh.domainHandler.DeprecateDomain(ctx, deprecateRequest)
×
406
}
407

408
// PollForActivityTask - Poll for an activity task.
409
func (wh *WorkflowHandler) PollForActivityTask(
410
        ctx context.Context,
411
        pollRequest *types.PollForActivityTaskRequest,
412
) (resp *types.PollForActivityTaskResponse, retError error) {
727✔
413
        callTime := time.Now()
727✔
414

727✔
415
        if wh.isShuttingDown() {
727✔
416
                return nil, validate.ErrShuttingDown
×
417
        }
×
418

419
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
727✔
420
                return nil, err
×
421
        }
×
422

423
        if pollRequest == nil {
727✔
424
                return nil, validate.ErrRequestNotSet
×
425
        }
×
426

427
        domainName := pollRequest.GetDomain()
727✔
428
        if domainName == "" {
727✔
429
                return nil, validate.ErrDomainNotSet
×
430
        }
×
431

432
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForActivityTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
727✔
433
        wh.GetLogger().Debug("Received PollForActivityTask")
727✔
434
        if err := common.ValidateLongPollContextTimeout(
727✔
435
                ctx,
727✔
436
                "PollForActivityTask",
727✔
437
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
727✔
438
        ); err != nil {
729✔
439
                return nil, err
2✔
440
        }
2✔
441

442
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
725✔
443
        if !common.IsValidIDLength(
725✔
444
                domainName,
725✔
445
                scope,
725✔
446
                idLengthWarnLimit,
725✔
447
                wh.config.DomainNameMaxLength(domainName),
725✔
448
                metrics.CadenceErrDomainNameExceededWarnLimit,
725✔
449
                domainName,
725✔
450
                wh.GetLogger(),
725✔
451
                tag.IDTypeDomainName) {
725✔
452
                return nil, validate.ErrDomainTooLong
×
453
        }
×
454

455
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
725✔
456
                return nil, err
×
457
        }
×
458

459
        if !common.IsValidIDLength(
725✔
460
                pollRequest.GetIdentity(),
725✔
461
                scope,
725✔
462
                idLengthWarnLimit,
725✔
463
                wh.config.IdentityMaxLength(domainName),
725✔
464
                metrics.CadenceErrIdentityExceededWarnLimit,
725✔
465
                domainName,
725✔
466
                wh.GetLogger(),
725✔
467
                tag.IDTypeIdentity) {
725✔
468
                return nil, validate.ErrIdentityTooLong
×
469
        }
×
470

471
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
725✔
472
        if err != nil {
992✔
473
                return nil, err
267✔
474
        }
267✔
475

476
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
458✔
477
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
459✔
478
                return &types.PollForActivityTaskResponse{}, nil
1✔
479
        }
1✔
480
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
481
        // in this case, return an empty response
482
        if err := common.ValidateLongPollContextTimeout(
457✔
483
                ctx,
457✔
484
                "PollForActivityTask",
457✔
485
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
457✔
486
        ); err != nil {
457✔
487
                return &types.PollForActivityTaskResponse{}, nil
×
488
        }
×
489
        pollerID := uuid.New().String()
457✔
490
        op := func() error {
914✔
491
                resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{
457✔
492
                        DomainUUID:     domainID,
457✔
493
                        PollerID:       pollerID,
457✔
494
                        PollRequest:    pollRequest,
457✔
495
                        IsolationGroup: isolationGroup,
457✔
496
                })
457✔
497
                return err
457✔
498
        }
457✔
499

500
        err = wh.throttleRetry.Do(ctx, op)
457✔
501
        if err != nil {
523✔
502
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeActivity, pollRequest.TaskList, pollerID)
66✔
503
                if err != nil {
66✔
504
                        // For all other errors log an error and return it back to client.
×
505
                        ctxTimeout := "not-set"
×
506
                        ctxDeadline, ok := ctx.Deadline()
×
507
                        if ok {
×
508
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
509
                        }
×
510
                        wh.GetLogger().Error("PollForActivityTask failed.",
×
511
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
512
                                tag.Value(ctxTimeout),
×
513
                                tag.Error(err))
×
514
                        return nil, err
×
515
                }
516
        }
517
        return resp, nil
457✔
518
}
519

520
// PollForDecisionTask - Poll for a decision task.
521
func (wh *WorkflowHandler) PollForDecisionTask(
522
        ctx context.Context,
523
        pollRequest *types.PollForDecisionTaskRequest,
524
) (resp *types.PollForDecisionTaskResponse, retError error) {
1,512✔
525
        callTime := time.Now()
1,512✔
526

1,512✔
527
        if wh.isShuttingDown() {
1,512✔
528
                return nil, validate.ErrShuttingDown
×
529
        }
×
530

531
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1,512✔
532
                return nil, err
×
533
        }
×
534

535
        if pollRequest == nil {
1,512✔
536
                return nil, validate.ErrRequestNotSet
×
537
        }
×
538

539
        domainName := pollRequest.GetDomain()
1,512✔
540
        tags := getDomainWfIDRunIDTags(domainName, nil)
1,512✔
541

1,512✔
542
        if domainName == "" {
1,512✔
543
                return nil, validate.ErrDomainNotSet
×
544
        }
×
545

546
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForDecisionTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1,512✔
547
        wh.GetLogger().Debug("Received PollForDecisionTask")
1,512✔
548
        if err := common.ValidateLongPollContextTimeout(
1,512✔
549
                ctx,
1,512✔
550
                "PollForDecisionTask",
1,512✔
551
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
1,512✔
552
        ); err != nil {
1,514✔
553
                return nil, err
2✔
554
        }
2✔
555

556
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
1,510✔
557
        if !common.IsValidIDLength(
1,510✔
558
                domainName,
1,510✔
559
                scope,
1,510✔
560
                idLengthWarnLimit,
1,510✔
561
                wh.config.DomainNameMaxLength(domainName),
1,510✔
562
                metrics.CadenceErrDomainNameExceededWarnLimit,
1,510✔
563
                domainName,
1,510✔
564
                wh.GetLogger(),
1,510✔
565
                tag.IDTypeDomainName) {
1,510✔
566
                return nil, validate.ErrDomainTooLong
×
567
        }
×
568

569
        if !common.IsValidIDLength(
1,510✔
570
                pollRequest.GetIdentity(),
1,510✔
571
                scope,
1,510✔
572
                idLengthWarnLimit,
1,510✔
573
                wh.config.IdentityMaxLength(domainName),
1,510✔
574
                metrics.CadenceErrIdentityExceededWarnLimit,
1,510✔
575
                domainName,
1,510✔
576
                wh.GetLogger(),
1,510✔
577
                tag.IDTypeIdentity) {
1,510✔
578
                return nil, validate.ErrIdentityTooLong
×
579
        }
×
580

581
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
1,510✔
582
                return nil, err
×
583
        }
×
584

585
        domainEntry, err := wh.GetDomainCache().GetDomain(domainName)
1,510✔
586
        if err != nil {
1,774✔
587
                return nil, err
264✔
588
        }
264✔
589
        domainID := domainEntry.GetInfo().ID
1,246✔
590

1,246✔
591
        wh.GetLogger().Debug("Poll for decision.", tag.WorkflowDomainName(domainName), tag.WorkflowDomainID(domainID))
1,246✔
592
        if err := wh.checkBadBinary(domainEntry, pollRequest.GetBinaryChecksum()); err != nil {
1,246✔
593
                return nil, err
×
594
        }
×
595

596
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
1,246✔
597
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
1,247✔
598
                return &types.PollForDecisionTaskResponse{}, nil
1✔
599
        }
1✔
600
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
601
        // in this case, return an empty response
602
        if err := common.ValidateLongPollContextTimeout(
1,245✔
603
                ctx,
1,245✔
604
                "PollForDecisionTask",
1,245✔
605
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
1,245✔
606
        ); err != nil {
1,245✔
607
                return &types.PollForDecisionTaskResponse{}, nil
×
608
        }
×
609

610
        pollerID := uuid.New().String()
1,245✔
611
        var matchingResp *types.MatchingPollForDecisionTaskResponse
1,245✔
612
        op := func() error {
2,490✔
613
                matchingResp, err = wh.GetMatchingClient().PollForDecisionTask(ctx, &types.MatchingPollForDecisionTaskRequest{
1,245✔
614
                        DomainUUID:     domainID,
1,245✔
615
                        PollerID:       pollerID,
1,245✔
616
                        PollRequest:    pollRequest,
1,245✔
617
                        IsolationGroup: isolationGroup,
1,245✔
618
                })
1,245✔
619
                return err
1,245✔
620
        }
1,245✔
621

622
        err = wh.throttleRetry.Do(ctx, op)
1,245✔
623
        if err != nil {
1,311✔
624
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeDecision, pollRequest.TaskList, pollerID)
66✔
625
                if err != nil {
66✔
626
                        // For all other errors log an error and return it back to client.
×
627
                        ctxTimeout := "not-set"
×
628
                        ctxDeadline, ok := ctx.Deadline()
×
629
                        if ok {
×
630
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
631
                        }
×
632
                        wh.GetLogger().Error("PollForDecisionTask failed.",
×
633
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
634
                                tag.Value(ctxTimeout),
×
635
                                tag.Error(err))
×
636
                        return nil, err
×
637
                }
638

639
                // Must be cancellation error.  Does'nt matter what we return here.  Client already went away.
640
                return nil, nil
66✔
641
        }
642

643
        tags = append(tags, []tag.Tag{tag.WorkflowID(
1,179✔
644
                matchingResp.GetWorkflowExecution().GetWorkflowID()),
1,179✔
645
                tag.WorkflowRunID(matchingResp.GetWorkflowExecution().GetRunID())}...)
1,179✔
646
        resp, err = wh.createPollForDecisionTaskResponse(ctx, scope, domainID, matchingResp, matchingResp.GetBranchToken())
1,179✔
647
        if err != nil {
1,179✔
648
                return nil, err
×
649
        }
×
650
        return resp, nil
1,179✔
651
}
652

653
func (wh *WorkflowHandler) getIsolationGroup(ctx context.Context, domainName string) string {
2,957✔
654
        return partition.IsolationGroupFromContext(ctx)
2,957✔
655
}
2,957✔
656

657
func (wh *WorkflowHandler) getPartitionConfig(ctx context.Context, domainName string) map[string]string {
518✔
658
        return partition.ConfigFromContext(ctx)
518✔
659
}
518✔
660

661
func (wh *WorkflowHandler) isIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,253✔
662
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,255✔
663
                isDrained, err := wh.GetIsolationGroupState().IsDrained(ctx, domainName, isolationGroup)
2✔
664
                if err != nil {
2✔
665
                        wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
666
                        return true
×
667
                }
×
668
                return !isDrained
2✔
669
        }
670
        return true
1,251✔
671
}
672

673
func (wh *WorkflowHandler) waitUntilIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,704✔
674
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,706✔
675
                ticker := time.NewTicker(time.Second * 30)
2✔
676
                defer ticker.Stop()
2✔
677
                childCtx, cancel := common.CreateChildContext(ctx, 0.05)
2✔
678
                defer cancel()
2✔
679
                for {
4✔
680
                        isDrained, err := wh.GetIsolationGroupState().IsDrained(childCtx, domainName, isolationGroup)
2✔
681
                        if err != nil {
2✔
682
                                wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
683
                                return true
×
684
                        }
×
685
                        if !isDrained {
2✔
686
                                break
×
687
                        }
688
                        select {
2✔
689
                        case <-childCtx.Done():
2✔
690
                                return false
2✔
691
                        case <-ticker.C:
×
692
                        }
693
                }
694
        }
695
        return true
1,702✔
696
}
697

698
func (wh *WorkflowHandler) checkBadBinary(domainEntry *cache.DomainCacheEntry, binaryChecksum string) error {
1,246✔
699
        if domainEntry.GetConfig().BadBinaries.Binaries != nil {
2,491✔
700
                badBinaries := domainEntry.GetConfig().BadBinaries.Binaries
1,245✔
701
                _, ok := badBinaries[binaryChecksum]
1,245✔
702
                if ok {
1,245✔
703
                        wh.GetMetricsClient().IncCounter(metrics.FrontendPollForDecisionTaskScope, metrics.CadenceErrBadBinaryCounter)
×
704
                        return &types.BadRequestError{
×
705
                                Message: fmt.Sprintf("binary %v already marked as bad deployment", binaryChecksum),
×
706
                        }
×
707
                }
×
708
        }
709
        return nil
1,246✔
710
}
711

712
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error, domainID string, taskListType int32,
713
        taskList *types.TaskList, pollerID string) error {
132✔
714
        // First check if this err is due to context cancellation.  This means client connection to frontend is closed.
132✔
715
        if ctx.Err() == context.Canceled {
264✔
716
                // Our rpc stack does not propagates context cancellation to the other service.  Lets make an explicit
132✔
717
                // call to matching to notify this poller is gone to prevent any tasks being dispatched to zombie pollers.
132✔
718
                err = wh.GetMatchingClient().CancelOutstandingPoll(context.Background(), &types.CancelOutstandingPollRequest{
132✔
719
                        DomainUUID:   domainID,
132✔
720
                        TaskListType: common.Int32Ptr(taskListType),
132✔
721
                        TaskList:     taskList,
132✔
722
                        PollerID:     pollerID,
132✔
723
                })
132✔
724
                // We can not do much if this call fails.  Just log the error and move on
132✔
725
                if err != nil {
132✔
726
                        wh.GetLogger().Warn("Failed to cancel outstanding poller.",
×
727
                                tag.WorkflowTaskListName(taskList.GetName()), tag.Error(err))
×
728
                }
×
729

730
                // Clear error as we don't want to report context cancellation error to count against our SLA
731
                return nil
132✔
732
        }
733

734
        return err
×
735
}
736

737
// RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
738
func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
739
        ctx context.Context,
740
        heartbeatRequest *types.RecordActivityTaskHeartbeatRequest,
741
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
384✔
742
        if wh.isShuttingDown() {
384✔
743
                return nil, validate.ErrShuttingDown
×
744
        }
×
745

746
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
384✔
747
                return nil, err
×
748
        }
×
749

750
        if heartbeatRequest == nil {
385✔
751
                return nil, validate.ErrRequestNotSet
1✔
752
        }
1✔
753

754
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeat")
383✔
755
        if heartbeatRequest.TaskToken == nil {
384✔
756
                return nil, validate.ErrTaskTokenNotSet
1✔
757
        }
1✔
758
        taskToken, err := wh.tokenSerializer.Deserialize(heartbeatRequest.TaskToken)
382✔
759
        if err != nil {
382✔
760
                return nil, err
×
761
        }
×
762
        if taskToken.DomainID == "" {
382✔
763
                return nil, validate.ErrDomainNotSet
×
764
        }
×
765

766
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
382✔
767
        if err != nil {
382✔
768
                return nil, err
×
769
        }
×
770

771
        dw := domainWrapper{
382✔
772
                domain: domainName,
382✔
773
        }
382✔
774
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
382✔
775
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
382✔
776
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
382✔
777

382✔
778
        if err := common.CheckEventBlobSizeLimit(
382✔
779
                len(heartbeatRequest.Details),
382✔
780
                sizeLimitWarn,
382✔
781
                sizeLimitError,
382✔
782
                taskToken.DomainID,
382✔
783
                taskToken.WorkflowID,
382✔
784
                taskToken.RunID,
382✔
785
                scope,
382✔
786
                wh.GetThrottledLogger(),
382✔
787
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeat"),
382✔
788
        ); err != nil {
382✔
789
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
790
                failRequest := &types.RespondActivityTaskFailedRequest{
×
791
                        TaskToken: heartbeatRequest.TaskToken,
×
792
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
793
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
794
                        Identity:  heartbeatRequest.Identity,
×
795
                }
×
796
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
797
                        DomainUUID:    taskToken.DomainID,
×
798
                        FailedRequest: failRequest,
×
799
                })
×
800
                if err != nil {
×
801
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
802
                }
×
803
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
804
        } else {
382✔
805
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
382✔
806
                        DomainUUID:       taskToken.DomainID,
382✔
807
                        HeartbeatRequest: heartbeatRequest,
382✔
808
                })
382✔
809
                if err != nil {
382✔
810
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
811
                }
×
812
        }
813

814
        return resp, nil
382✔
815
}
816

817
// RecordActivityTaskHeartbeatByID - Record Activity Task Heart beat.
818
func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
819
        ctx context.Context,
820
        heartbeatRequest *types.RecordActivityTaskHeartbeatByIDRequest,
821
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
3✔
822
        if wh.isShuttingDown() {
3✔
823
                return nil, validate.ErrShuttingDown
×
824
        }
×
825

826
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
827
                return nil, err
×
828
        }
×
829

830
        if heartbeatRequest == nil {
4✔
831
                return nil, validate.ErrRequestNotSet
1✔
832
        }
1✔
833

834
        domainName := heartbeatRequest.GetDomain()
2✔
835
        if domainName == "" {
3✔
836
                return nil, validate.ErrDomainNotSet
1✔
837
        }
1✔
838

839
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID")
1✔
840
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
1✔
841
        if err != nil {
1✔
842
                return nil, err
×
843
        }
×
844
        workflowID := heartbeatRequest.GetWorkflowID()
1✔
845
        runID := heartbeatRequest.GetRunID() // runID is optional so can be empty
1✔
846
        activityID := heartbeatRequest.GetActivityID()
1✔
847

1✔
848
        if domainID == "" {
1✔
849
                return nil, validate.ErrDomainNotSet
×
850
        }
×
851
        if workflowID == "" {
1✔
852
                return nil, validate.ErrWorkflowIDNotSet
×
853
        }
×
854
        if activityID == "" {
1✔
855
                return nil, validate.ErrActivityIDNotSet
×
856
        }
×
857

858
        taskToken := &common.TaskToken{
1✔
859
                DomainID:   domainID,
1✔
860
                RunID:      runID,
1✔
861
                WorkflowID: workflowID,
1✔
862
                ScheduleID: common.EmptyEventID,
1✔
863
                ActivityID: activityID,
1✔
864
        }
1✔
865
        token, err := wh.tokenSerializer.Serialize(taskToken)
1✔
866
        if err != nil {
1✔
867
                return nil, err
×
868
        }
×
869

870
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatByIDScope, heartbeatRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1✔
871
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
1✔
872
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
1✔
873

1✔
874
        if err := common.CheckEventBlobSizeLimit(
1✔
875
                len(heartbeatRequest.Details),
1✔
876
                sizeLimitWarn,
1✔
877
                sizeLimitError,
1✔
878
                taskToken.DomainID,
1✔
879
                taskToken.WorkflowID,
1✔
880
                taskToken.RunID,
1✔
881
                scope,
1✔
882
                wh.GetThrottledLogger(),
1✔
883
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeatByID"),
1✔
884
        ); err != nil {
1✔
885
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
886
                failRequest := &types.RespondActivityTaskFailedRequest{
×
887
                        TaskToken: token,
×
888
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
889
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
890
                        Identity:  heartbeatRequest.Identity,
×
891
                }
×
892
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
893
                        DomainUUID:    taskToken.DomainID,
×
894
                        FailedRequest: failRequest,
×
895
                })
×
896
                if err != nil {
×
897
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
898
                }
×
899
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
900
        } else {
1✔
901
                req := &types.RecordActivityTaskHeartbeatRequest{
1✔
902
                        TaskToken: token,
1✔
903
                        Details:   heartbeatRequest.Details,
1✔
904
                        Identity:  heartbeatRequest.Identity,
1✔
905
                }
1✔
906

1✔
907
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
1✔
908
                        DomainUUID:       taskToken.DomainID,
1✔
909
                        HeartbeatRequest: req,
1✔
910
                })
1✔
911
                if err != nil {
1✔
912
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
913
                }
×
914
        }
915

916
        return resp, nil
1✔
917
}
918

919
// RespondActivityTaskCompleted - response to an activity task
920
func (wh *WorkflowHandler) RespondActivityTaskCompleted(
921
        ctx context.Context,
922
        completeRequest *types.RespondActivityTaskCompletedRequest,
923
) (retError error) {
262✔
924
        if wh.isShuttingDown() {
262✔
925
                return validate.ErrShuttingDown
×
926
        }
×
927

928
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
262✔
929
                return err
×
930
        }
×
931

932
        if completeRequest == nil {
262✔
933
                return validate.ErrRequestNotSet
×
934
        }
×
935

936
        if completeRequest.TaskToken == nil {
262✔
937
                return validate.ErrTaskTokenNotSet
×
938
        }
×
939
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
262✔
940
        if err != nil {
262✔
941
                return err
×
942
        }
×
943
        if taskToken.DomainID == "" {
262✔
944
                return validate.ErrDomainNotSet
×
945
        }
×
946

947
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
262✔
948
        if err != nil {
262✔
949
                return err
×
950
        }
×
951

952
        dw := domainWrapper{
262✔
953
                domain: domainName,
262✔
954
        }
262✔
955
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
262✔
956
        if !common.IsValidIDLength(
262✔
957
                completeRequest.GetIdentity(),
262✔
958
                scope,
262✔
959
                wh.config.MaxIDLengthWarnLimit(),
262✔
960
                wh.config.IdentityMaxLength(domainName),
262✔
961
                metrics.CadenceErrIdentityExceededWarnLimit,
262✔
962
                domainName,
262✔
963
                wh.GetLogger(),
262✔
964
                tag.IDTypeIdentity) {
262✔
965
                return validate.ErrIdentityTooLong
×
966
        }
×
967

968
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
262✔
969
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
262✔
970

262✔
971
        if err := common.CheckEventBlobSizeLimit(
262✔
972
                len(completeRequest.Result),
262✔
973
                sizeLimitWarn,
262✔
974
                sizeLimitError,
262✔
975
                taskToken.DomainID,
262✔
976
                taskToken.WorkflowID,
262✔
977
                taskToken.RunID,
262✔
978
                scope,
262✔
979
                wh.GetThrottledLogger(),
262✔
980
                tag.BlobSizeViolationOperation("RespondActivityTaskCompleted"),
262✔
981
        ); err != nil {
262✔
982
                // result exceeds blob size limit, we would record it as failure
×
983
                failRequest := &types.RespondActivityTaskFailedRequest{
×
984
                        TaskToken: completeRequest.TaskToken,
×
985
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
986
                        Details:   completeRequest.Result[0:sizeLimitError],
×
987
                        Identity:  completeRequest.Identity,
×
988
                }
×
989
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
990
                        DomainUUID:    taskToken.DomainID,
×
991
                        FailedRequest: failRequest,
×
992
                })
×
993
                if err != nil {
×
994
                        return wh.normalizeVersionedErrors(ctx, err)
×
995
                }
×
996
        } else {
262✔
997
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
262✔
998
                        DomainUUID:      taskToken.DomainID,
262✔
999
                        CompleteRequest: completeRequest,
262✔
1000
                })
262✔
1001
                if err != nil {
307✔
1002
                        return wh.normalizeVersionedErrors(ctx, err)
45✔
1003
                }
45✔
1004
        }
1005

1006
        return nil
217✔
1007
}
1008

1009
// RespondActivityTaskCompletedByID - response to an activity task
1010
func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
1011
        ctx context.Context,
1012
        completeRequest *types.RespondActivityTaskCompletedByIDRequest,
1013
) (retError error) {
76✔
1014
        if wh.isShuttingDown() {
76✔
1015
                return validate.ErrShuttingDown
×
1016
        }
×
1017

1018
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
76✔
1019
                return err
×
1020
        }
×
1021

1022
        if completeRequest == nil {
76✔
1023
                return validate.ErrRequestNotSet
×
1024
        }
×
1025

1026
        domainName := completeRequest.GetDomain()
76✔
1027
        if domainName == "" {
76✔
1028
                return validate.ErrDomainNotSet
×
1029
        }
×
1030
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
76✔
1031
        if err != nil {
76✔
1032
                return err
×
1033
        }
×
1034
        workflowID := completeRequest.GetWorkflowID()
76✔
1035
        runID := completeRequest.GetRunID() // runID is optional so can be empty
76✔
1036
        activityID := completeRequest.GetActivityID()
76✔
1037

76✔
1038
        if domainID == "" {
76✔
1039
                return validate.ErrDomainNotSet
×
1040
        }
×
1041
        if workflowID == "" {
76✔
1042
                return validate.ErrWorkflowIDNotSet
×
1043
        }
×
1044
        if activityID == "" {
76✔
1045
                return validate.ErrActivityIDNotSet
×
1046
        }
×
1047

1048
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedByIDScope, completeRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
76✔
1049
        if !common.IsValidIDLength(
76✔
1050
                completeRequest.GetIdentity(),
76✔
1051
                scope,
76✔
1052
                wh.config.MaxIDLengthWarnLimit(),
76✔
1053
                wh.config.IdentityMaxLength(domainName),
76✔
1054
                metrics.CadenceErrIdentityExceededWarnLimit,
76✔
1055
                domainName,
76✔
1056
                wh.GetLogger(),
76✔
1057
                tag.IDTypeIdentity) {
76✔
1058
                return validate.ErrIdentityTooLong
×
1059
        }
×
1060

1061
        taskToken := &common.TaskToken{
76✔
1062
                DomainID:   domainID,
76✔
1063
                RunID:      runID,
76✔
1064
                WorkflowID: workflowID,
76✔
1065
                ScheduleID: common.EmptyEventID,
76✔
1066
                ActivityID: activityID,
76✔
1067
        }
76✔
1068
        token, err := wh.tokenSerializer.Serialize(taskToken)
76✔
1069
        if err != nil {
76✔
1070
                return err
×
1071
        }
×
1072

1073
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
76✔
1074
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
76✔
1075

76✔
1076
        if err := common.CheckEventBlobSizeLimit(
76✔
1077
                len(completeRequest.Result),
76✔
1078
                sizeLimitWarn,
76✔
1079
                sizeLimitError,
76✔
1080
                taskToken.DomainID,
76✔
1081
                taskToken.WorkflowID,
76✔
1082
                taskToken.RunID,
76✔
1083
                scope,
76✔
1084
                wh.GetThrottledLogger(),
76✔
1085
                tag.BlobSizeViolationOperation("RespondActivityTaskCompletedByID"),
76✔
1086
        ); err != nil {
76✔
1087
                // result exceeds blob size limit, we would record it as failure
×
1088
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1089
                        TaskToken: token,
×
1090
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
1091
                        Details:   completeRequest.Result[0:sizeLimitError],
×
1092
                        Identity:  completeRequest.Identity,
×
1093
                }
×
1094
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1095
                        DomainUUID:    taskToken.DomainID,
×
1096
                        FailedRequest: failRequest,
×
1097
                })
×
1098
                if err != nil {
×
1099
                        return wh.normalizeVersionedErrors(ctx, err)
×
1100
                }
×
1101
        } else {
76✔
1102
                req := &types.RespondActivityTaskCompletedRequest{
76✔
1103
                        TaskToken: token,
76✔
1104
                        Result:    completeRequest.Result,
76✔
1105
                        Identity:  completeRequest.Identity,
76✔
1106
                }
76✔
1107

76✔
1108
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
76✔
1109
                        DomainUUID:      taskToken.DomainID,
76✔
1110
                        CompleteRequest: req,
76✔
1111
                })
76✔
1112
                if err != nil {
76✔
1113
                        return wh.normalizeVersionedErrors(ctx, err)
×
1114
                }
×
1115
        }
1116

1117
        return nil
76✔
1118
}
1119

1120
// RespondActivityTaskFailed - response to an activity task failure
1121
func (wh *WorkflowHandler) RespondActivityTaskFailed(
1122
        ctx context.Context,
1123
        failedRequest *types.RespondActivityTaskFailedRequest,
1124
) (retError error) {
14✔
1125
        if wh.isShuttingDown() {
15✔
1126
                return validate.ErrShuttingDown
1✔
1127
        }
1✔
1128

1129
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
13✔
1130
                return err
×
1131
        }
×
1132

1133
        if failedRequest == nil {
13✔
1134
                return validate.ErrRequestNotSet
×
1135
        }
×
1136

1137
        if failedRequest.TaskToken == nil {
13✔
1138
                return validate.ErrTaskTokenNotSet
×
1139
        }
×
1140
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
13✔
1141
        if err != nil {
13✔
1142
                return err
×
1143
        }
×
1144
        if taskToken.DomainID == "" {
13✔
1145
                return validate.ErrDomainNotSet
×
1146
        }
×
1147

1148
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
13✔
1149
        if err != nil {
13✔
1150
                return err
×
1151
        }
×
1152

1153
        dw := domainWrapper{
13✔
1154
                domain: domainName,
13✔
1155
        }
13✔
1156
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
13✔
1157
        if !common.IsValidIDLength(
13✔
1158
                failedRequest.GetIdentity(),
13✔
1159
                scope,
13✔
1160
                wh.config.MaxIDLengthWarnLimit(),
13✔
1161
                wh.config.IdentityMaxLength(domainName),
13✔
1162
                metrics.CadenceErrIdentityExceededWarnLimit,
13✔
1163
                domainName,
13✔
1164
                wh.GetLogger(),
13✔
1165
                tag.IDTypeIdentity) {
13✔
1166
                return validate.ErrIdentityTooLong
×
1167
        }
×
1168

1169
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
13✔
1170
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
13✔
1171

13✔
1172
        if err := common.CheckEventBlobSizeLimit(
13✔
1173
                len(failedRequest.Details),
13✔
1174
                sizeLimitWarn,
13✔
1175
                sizeLimitError,
13✔
1176
                taskToken.DomainID,
13✔
1177
                taskToken.WorkflowID,
13✔
1178
                taskToken.RunID,
13✔
1179
                scope,
13✔
1180
                wh.GetThrottledLogger(),
13✔
1181
                tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
13✔
1182
        ); err != nil {
13✔
1183
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1184
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1185
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1186
        }
×
1187

1188
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
13✔
1189
                DomainUUID:    taskToken.DomainID,
13✔
1190
                FailedRequest: failedRequest,
13✔
1191
        })
13✔
1192
        if err != nil {
13✔
1193
                return wh.normalizeVersionedErrors(ctx, err)
×
1194
        }
×
1195
        return nil
13✔
1196
}
1197

1198
// RespondActivityTaskFailedByID - response to an activity task failure
1199
func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
1200
        ctx context.Context,
1201
        failedRequest *types.RespondActivityTaskFailedByIDRequest,
1202
) (retError error) {
12✔
1203
        if wh.isShuttingDown() {
13✔
1204
                return validate.ErrShuttingDown
1✔
1205
        }
1✔
1206

1207
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
12✔
1208
                return err
1✔
1209
        }
1✔
1210

1211
        if failedRequest == nil {
11✔
1212
                return validate.ErrRequestNotSet
1✔
1213
        }
1✔
1214

1215
        domainName := failedRequest.GetDomain()
9✔
1216

9✔
1217
        if domainName == "" {
10✔
1218
                return validate.ErrDomainNotSet
1✔
1219
        }
1✔
1220
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
8✔
1221
        if err != nil {
9✔
1222
                return err
1✔
1223
        }
1✔
1224
        workflowID := failedRequest.GetWorkflowID()
7✔
1225
        runID := failedRequest.GetRunID() // runID is optional so can be empty
7✔
1226
        activityID := failedRequest.GetActivityID()
7✔
1227

7✔
1228
        if domainID == "" {
8✔
1229
                return validate.ErrDomainNotSet
1✔
1230
        }
1✔
1231
        if workflowID == "" {
7✔
1232
                return validate.ErrWorkflowIDNotSet
1✔
1233
        }
1✔
1234
        if activityID == "" {
6✔
1235
                return validate.ErrActivityIDNotSet
1✔
1236
        }
1✔
1237

1238
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedByIDScope, failedRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
4✔
1239
        if !common.IsValidIDLength(
4✔
1240
                failedRequest.GetIdentity(),
4✔
1241
                scope,
4✔
1242
                wh.config.MaxIDLengthWarnLimit(),
4✔
1243
                wh.config.IdentityMaxLength(failedRequest.GetDomain()),
4✔
1244
                metrics.CadenceErrIdentityExceededWarnLimit,
4✔
1245
                domainName,
4✔
1246
                wh.GetLogger(),
4✔
1247
                tag.IDTypeIdentity) {
5✔
1248
                return validate.ErrIdentityTooLong
1✔
1249
        }
1✔
1250

1251
        taskToken := &common.TaskToken{
3✔
1252
                DomainID:   domainID,
3✔
1253
                RunID:      runID,
3✔
1254
                WorkflowID: workflowID,
3✔
1255
                ScheduleID: common.EmptyEventID,
3✔
1256
                ActivityID: activityID,
3✔
1257
        }
3✔
1258
        token, err := wh.tokenSerializer.Serialize(taskToken)
3✔
1259
        if err != nil {
4✔
1260
                return err
1✔
1261
        }
1✔
1262

1263
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
2✔
1264
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
2✔
1265

2✔
1266
        if err := common.CheckEventBlobSizeLimit(
2✔
1267
                len(failedRequest.Details),
2✔
1268
                sizeLimitWarn,
2✔
1269
                sizeLimitError,
2✔
1270
                taskToken.DomainID,
2✔
1271
                taskToken.WorkflowID,
2✔
1272
                taskToken.RunID,
2✔
1273
                scope,
2✔
1274
                wh.GetThrottledLogger(),
2✔
1275
                tag.BlobSizeViolationOperation("RespondActivityTaskFailedByID"),
2✔
1276
        ); err != nil {
3✔
1277
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
1✔
1278
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
1✔
1279
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
1✔
1280
        }
1✔
1281

1282
        req := &types.RespondActivityTaskFailedRequest{
2✔
1283
                TaskToken: token,
2✔
1284
                Reason:    failedRequest.Reason,
2✔
1285
                Details:   failedRequest.Details,
2✔
1286
                Identity:  failedRequest.Identity,
2✔
1287
        }
2✔
1288

2✔
1289
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
2✔
1290
                DomainUUID:    taskToken.DomainID,
2✔
1291
                FailedRequest: req,
2✔
1292
        })
2✔
1293
        if err != nil {
3✔
1294
                return wh.normalizeVersionedErrors(ctx, err)
1✔
1295
        }
1✔
1296
        return nil
1✔
1297
}
1298

1299
// RespondActivityTaskCanceled - called to cancel an activity task
1300
func (wh *WorkflowHandler) RespondActivityTaskCanceled(
1301
        ctx context.Context,
1302
        cancelRequest *types.RespondActivityTaskCanceledRequest,
1303
) (retError error) {
11✔
1304
        if wh.isShuttingDown() {
12✔
1305
                return validate.ErrShuttingDown
1✔
1306
        }
1✔
1307

1308
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
11✔
1309
                return err
1✔
1310
        }
1✔
1311

1312
        if cancelRequest == nil {
10✔
1313
                return validate.ErrRequestNotSet
1✔
1314
        }
1✔
1315

1316
        if cancelRequest.TaskToken == nil {
9✔
1317
                return validate.ErrTaskTokenNotSet
1✔
1318
        }
1✔
1319

1320
        taskToken, err := wh.tokenSerializer.Deserialize(cancelRequest.TaskToken)
7✔
1321
        if err != nil {
8✔
1322
                return err
1✔
1323
        }
1✔
1324

1325
        if taskToken.DomainID == "" {
7✔
1326
                return validate.ErrDomainNotSet
1✔
1327
        }
1✔
1328

1329
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
5✔
1330
        if err != nil {
6✔
1331
                return err
1✔
1332
        }
1✔
1333

1334
        dw := domainWrapper{
4✔
1335
                domain: domainName,
4✔
1336
        }
4✔
1337
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
4✔
1338
        if !common.IsValidIDLength(
4✔
1339
                cancelRequest.GetIdentity(),
4✔
1340
                scope,
4✔
1341
                wh.config.MaxIDLengthWarnLimit(),
4✔
1342
                wh.config.IdentityMaxLength(domainName),
4✔
1343
                metrics.CadenceErrIdentityExceededWarnLimit,
4✔
1344
                domainName,
4✔
1345
                wh.GetLogger(),
4✔
1346
                tag.IDTypeIdentity) {
5✔
1347
                return validate.ErrIdentityTooLong
1✔
1348
        }
1✔
1349

1350
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
3✔
1351
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
3✔
1352

3✔
1353
        if err := common.CheckEventBlobSizeLimit(
3✔
1354
                len(cancelRequest.Details),
3✔
1355
                sizeLimitWarn,
3✔
1356
                sizeLimitError,
3✔
1357
                taskToken.DomainID,
3✔
1358
                taskToken.WorkflowID,
3✔
1359
                taskToken.RunID,
3✔
1360
                scope,
3✔
1361
                wh.GetThrottledLogger(),
3✔
1362
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceled"),
3✔
1363
        ); err != nil {
4✔
1364
                // details exceeds blob size limit, we would record it as failure
1✔
1365
                failRequest := &types.RespondActivityTaskFailedRequest{
1✔
1366
                        TaskToken: cancelRequest.TaskToken,
1✔
1367
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
1✔
1368
                        Details:   cancelRequest.Details[0:sizeLimitError],
1✔
1369
                        Identity:  cancelRequest.Identity,
1✔
1370
                }
1✔
1371
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
1✔
1372
                        DomainUUID:    taskToken.DomainID,
1✔
1373
                        FailedRequest: failRequest,
1✔
1374
                })
1✔
1375
                if err != nil {
2✔
1376
                        return wh.normalizeVersionedErrors(ctx, err)
1✔
1377
                }
1✔
1378
        } else {
2✔
1379
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
2✔
1380
                        DomainUUID:    taskToken.DomainID,
2✔
1381
                        CancelRequest: cancelRequest,
2✔
1382
                })
2✔
1383
                if err != nil {
3✔
1384
                        return wh.normalizeVersionedErrors(ctx, err)
1✔
1385
                }
1✔
1386
        }
1387

1388
        return nil
1✔
1389
}
1390

1391
// RespondActivityTaskCanceledByID - called to cancel an activity task
1392
func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
1393
        ctx context.Context,
1394
        cancelRequest *types.RespondActivityTaskCanceledByIDRequest,
1395
) (retError error) {
13✔
1396
        if wh.isShuttingDown() {
14✔
1397
                return validate.ErrShuttingDown
1✔
1398
        }
1✔
1399

1400
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
13✔
1401
                return err
1✔
1402
        }
1✔
1403

1404
        if cancelRequest == nil {
12✔
1405
                return validate.ErrRequestNotSet
1✔
1406
        }
1✔
1407

1408
        domainName := cancelRequest.GetDomain()
10✔
1409
        if domainName == "" {
11✔
1410
                return validate.ErrDomainNotSet
1✔
1411
        }
1✔
1412
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
9✔
1413
        if err != nil {
10✔
1414
                return err
1✔
1415
        }
1✔
1416
        workflowID := cancelRequest.GetWorkflowID()
8✔
1417
        runID := cancelRequest.GetRunID() // runID is optional so can be empty
8✔
1418
        activityID := cancelRequest.GetActivityID()
8✔
1419

8✔
1420
        if domainID == "" {
9✔
1421
                return validate.ErrDomainNotSet
1✔
1422
        }
1✔
1423
        if workflowID == "" {
8✔
1424
                return validate.ErrWorkflowIDNotSet
1✔
1425
        }
1✔
1426
        if activityID == "" {
7✔
1427
                return validate.ErrActivityIDNotSet
1✔
1428
        }
1✔
1429

1430
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledByIDScope, cancelRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
5✔
1431
        if !common.IsValidIDLength(
5✔
1432
                cancelRequest.GetIdentity(),
5✔
1433
                scope,
5✔
1434
                wh.config.MaxIDLengthWarnLimit(),
5✔
1435
                wh.config.IdentityMaxLength(cancelRequest.GetDomain()),
5✔
1436
                metrics.CadenceErrIdentityExceededWarnLimit,
5✔
1437
                domainName,
5✔
1438
                wh.GetLogger(),
5✔
1439
                tag.IDTypeIdentity) {
6✔
1440
                return validate.ErrIdentityTooLong
1✔
1441
        }
1✔
1442

1443
        taskToken := &common.TaskToken{
4✔
1444
                DomainID:   domainID,
4✔
1445
                RunID:      runID,
4✔
1446
                WorkflowID: workflowID,
4✔
1447
                ScheduleID: common.EmptyEventID,
4✔
1448
                ActivityID: activityID,
4✔
1449
        }
4✔
1450
        token, err := wh.tokenSerializer.Serialize(taskToken)
4✔
1451
        if err != nil {
5✔
1452
                return err
1✔
1453
        }
1✔
1454

1455
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
3✔
1456
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
3✔
1457

3✔
1458
        if err := common.CheckEventBlobSizeLimit(
3✔
1459
                len(cancelRequest.Details),
3✔
1460
                sizeLimitWarn,
3✔
1461
                sizeLimitError,
3✔
1462
                taskToken.DomainID,
3✔
1463
                taskToken.WorkflowID,
3✔
1464
                taskToken.RunID,
3✔
1465
                scope,
3✔
1466
                wh.GetThrottledLogger(),
3✔
1467
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceledByID"),
3✔
1468
        ); err != nil {
4✔
1469
                // details exceeds blob size limit, we would record it as failure
1✔
1470
                failRequest := &types.RespondActivityTaskFailedRequest{
1✔
1471
                        TaskToken: token,
1✔
1472
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
1✔
1473
                        Details:   cancelRequest.Details[0:sizeLimitError],
1✔
1474
                        Identity:  cancelRequest.Identity,
1✔
1475
                }
1✔
1476
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
1✔
1477
                        DomainUUID:    taskToken.DomainID,
1✔
1478
                        FailedRequest: failRequest,
1✔
1479
                })
1✔
1480
                if err != nil {
2✔
1481
                        return wh.normalizeVersionedErrors(ctx, err)
1✔
1482
                }
1✔
1483
        } else {
2✔
1484
                req := &types.RespondActivityTaskCanceledRequest{
2✔
1485
                        TaskToken: token,
2✔
1486
                        Details:   cancelRequest.Details,
2✔
1487
                        Identity:  cancelRequest.Identity,
2✔
1488
                }
2✔
1489

2✔
1490
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
2✔
1491
                        DomainUUID:    taskToken.DomainID,
2✔
1492
                        CancelRequest: req,
2✔
1493
                })
2✔
1494
                if err != nil {
3✔
1495
                        return wh.normalizeVersionedErrors(ctx, err)
1✔
1496
                }
1✔
1497
        }
1498

1499
        return nil
1✔
1500
}
1501

1502
// RespondDecisionTaskCompleted - response to a decision task
1503
func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
1504
        ctx context.Context,
1505
        completeRequest *types.RespondDecisionTaskCompletedRequest,
1506
) (resp *types.RespondDecisionTaskCompletedResponse, retError error) {
964✔
1507
        if wh.isShuttingDown() {
965✔
1508
                return nil, validate.ErrShuttingDown
1✔
1509
        }
1✔
1510

1511
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
964✔
1512
                return nil, err
1✔
1513
        }
1✔
1514

1515
        if completeRequest == nil {
963✔
1516
                return nil, validate.ErrRequestNotSet
1✔
1517
        }
1✔
1518

1519
        if completeRequest.TaskToken == nil {
962✔
1520
                return nil, validate.ErrTaskTokenNotSet
1✔
1521
        }
1✔
1522
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
960✔
1523
        if err != nil {
961✔
1524
                return nil, err
1✔
1525
        }
1✔
1526
        if taskToken.DomainID == "" {
960✔
1527
                return nil, validate.ErrDomainNotSet
1✔
1528
        }
1✔
1529

1530
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
958✔
1531
        if err != nil {
959✔
1532
                return nil, err
1✔
1533
        }
1✔
1534

1535
        dw := domainWrapper{
957✔
1536
                domain: domainName,
957✔
1537
        }
957✔
1538
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
957✔
1539
        if !common.IsValidIDLength(
957✔
1540
                completeRequest.GetIdentity(),
957✔
1541
                scope,
957✔
1542
                wh.config.MaxIDLengthWarnLimit(),
957✔
1543
                wh.config.IdentityMaxLength(domainName),
957✔
1544
                metrics.CadenceErrIdentityExceededWarnLimit,
957✔
1545
                domainName,
957✔
1546
                wh.GetLogger(),
957✔
1547
                tag.IDTypeIdentity) {
958✔
1548
                return nil, validate.ErrIdentityTooLong
1✔
1549
        }
1✔
1550

1551
        if err := common.CheckDecisionResultLimit(
956✔
1552
                len(completeRequest.Decisions),
956✔
1553
                wh.config.DecisionResultCountLimit(domainName),
956✔
1554
                scope); err != nil {
957✔
1555
                return nil, err
1✔
1556
        }
1✔
1557

1558
        histResp, err := wh.GetHistoryClient().RespondDecisionTaskCompleted(ctx, &types.HistoryRespondDecisionTaskCompletedRequest{
955✔
1559
                DomainUUID:      taskToken.DomainID,
955✔
1560
                CompleteRequest: completeRequest},
955✔
1561
        )
955✔
1562
        if err != nil {
965✔
1563
                return nil, wh.normalizeVersionedErrors(ctx, err)
10✔
1564
        }
10✔
1565

1566
        completedResp := &types.RespondDecisionTaskCompletedResponse{}
945✔
1567
        completedResp.ActivitiesToDispatchLocally = histResp.ActivitiesToDispatchLocally
945✔
1568
        if completeRequest.GetReturnNewDecisionTask() && histResp != nil && histResp.StartedResponse != nil {
1,006✔
1569
                taskToken := &common.TaskToken{
61✔
1570
                        DomainID:        taskToken.DomainID,
61✔
1571
                        WorkflowID:      taskToken.WorkflowID,
61✔
1572
                        RunID:           taskToken.RunID,
61✔
1573
                        ScheduleID:      histResp.StartedResponse.GetScheduledEventID(),
61✔
1574
                        ScheduleAttempt: histResp.StartedResponse.GetAttempt(),
61✔
1575
                }
61✔
1576
                token, _ := wh.tokenSerializer.Serialize(taskToken)
61✔
1577
                workflowExecution := &types.WorkflowExecution{
61✔
1578
                        WorkflowID: taskToken.WorkflowID,
61✔
1579
                        RunID:      taskToken.RunID,
61✔
1580
                }
61✔
1581
                matchingResp := common.CreateMatchingPollForDecisionTaskResponse(histResp.StartedResponse, workflowExecution, token)
61✔
1582

61✔
1583
                newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, scope, taskToken.DomainID, matchingResp, matchingResp.GetBranchToken())
61✔
1584
                if err != nil {
61✔
1585
                        return nil, err
×
1586
                }
×
1587
                completedResp.DecisionTask = newDecisionTask
61✔
1588
        }
1589

1590
        return completedResp, nil
945✔
1591
}
1592

1593
// RespondDecisionTaskFailed - failed response to a decision task
1594
func (wh *WorkflowHandler) RespondDecisionTaskFailed(
1595
        ctx context.Context,
1596
        failedRequest *types.RespondDecisionTaskFailedRequest,
1597
) (retError error) {
169✔
1598
        if wh.isShuttingDown() {
170✔
1599
                return validate.ErrShuttingDown
1✔
1600
        }
1✔
1601

1602
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
169✔
1603
                return err
1✔
1604
        }
1✔
1605

1606
        if failedRequest == nil {
168✔
1607
                return validate.ErrRequestNotSet
1✔
1608
        }
1✔
1609

1610
        if failedRequest.TaskToken == nil {
167✔
1611
                return validate.ErrTaskTokenNotSet
1✔
1612
        }
1✔
1613
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
165✔
1614
        if err != nil {
166✔
1615
                return err
1✔
1616
        }
1✔
1617
        if taskToken.DomainID == "" {
165✔
1618
                return validate.ErrDomainNotSet
1✔
1619
        }
1✔
1620

1621
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
163✔
1622
        if err != nil {
164✔
1623
                return err
1✔
1624
        }
1✔
1625

1626
        dw := domainWrapper{
162✔
1627
                domain: domainName,
162✔
1628
        }
162✔
1629
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
162✔
1630
        if !common.IsValidIDLength(
162✔
1631
                failedRequest.GetIdentity(),
162✔
1632
                scope,
162✔
1633
                wh.config.MaxIDLengthWarnLimit(),
162✔
1634
                wh.config.IdentityMaxLength(domainName),
162✔
1635
                metrics.CadenceErrIdentityExceededWarnLimit,
162✔
1636
                domainName,
162✔
1637
                wh.GetLogger(),
162✔
1638
                tag.IDTypeIdentity) {
163✔
1639
                return validate.ErrIdentityTooLong
1✔
1640
        }
1✔
1641

1642
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
161✔
1643
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
161✔
1644

161✔
1645
        if err := common.CheckEventBlobSizeLimit(
161✔
1646
                len(failedRequest.Details),
161✔
1647
                sizeLimitWarn,
161✔
1648
                sizeLimitError,
161✔
1649
                taskToken.DomainID,
161✔
1650
                taskToken.WorkflowID,
161✔
1651
                taskToken.RunID,
161✔
1652
                scope,
161✔
1653
                wh.GetThrottledLogger(),
161✔
1654
                tag.BlobSizeViolationOperation("RespondDecisionTaskFailed"),
161✔
1655
        ); err != nil {
162✔
1656
                // details exceed, we would just truncate the size for decision task failed as the details is not used anywhere by client code
1✔
1657
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
1✔
1658
        }
1✔
1659

1660
        err = wh.GetHistoryClient().RespondDecisionTaskFailed(ctx, &types.HistoryRespondDecisionTaskFailedRequest{
161✔
1661
                DomainUUID:    taskToken.DomainID,
161✔
1662
                FailedRequest: failedRequest,
161✔
1663
        })
161✔
1664
        if err != nil {
162✔
1665
                return wh.normalizeVersionedErrors(ctx, err)
1✔
1666
        }
1✔
1667
        return nil
160✔
1668
}
1669

1670
// RespondQueryTaskCompleted - response to a query task
1671
func (wh *WorkflowHandler) RespondQueryTaskCompleted(
1672
        ctx context.Context,
1673
        completeRequest *types.RespondQueryTaskCompletedRequest,
1674
) (retError error) {
39✔
1675
        if wh.isShuttingDown() {
40✔
1676
                return validate.ErrShuttingDown
1✔
1677
        }
1✔
1678

1679
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
39✔
1680
                return err
1✔
1681
        }
1✔
1682

1683
        if completeRequest == nil {
38✔
1684
                return validate.ErrRequestNotSet
1✔
1685
        }
1✔
1686

1687
        if completeRequest.TaskToken == nil {
37✔
1688
                return validate.ErrTaskTokenNotSet
1✔
1689
        }
1✔
1690
        queryTaskToken, err := wh.tokenSerializer.DeserializeQueryTaskToken(completeRequest.TaskToken)
35✔
1691
        if err != nil {
36✔
1692
                return err
1✔
1693
        }
1✔
1694
        if queryTaskToken.DomainID == "" || queryTaskToken.TaskList == "" || queryTaskToken.TaskID == "" {
35✔
1695
                return validate.ErrInvalidTaskToken
1✔
1696
        }
1✔
1697

1698
        domainName, err := wh.GetDomainCache().GetDomainName(queryTaskToken.DomainID)
33✔
1699
        if err != nil {
34✔
1700
                return err
1✔
1701
        }
1✔
1702

1703
        dw := domainWrapper{
32✔
1704
                domain: domainName,
32✔
1705
        }
32✔
1706
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
32✔
1707
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
32✔
1708

32✔
1709
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondQueryTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
32✔
1710
        if err := common.CheckEventBlobSizeLimit(
32✔
1711
                len(completeRequest.GetQueryResult()),
32✔
1712
                sizeLimitWarn,
32✔
1713
                sizeLimitError,
32✔
1714
                queryTaskToken.DomainID,
32✔
1715
                "",
32✔
1716
                "",
32✔
1717
                scope,
32✔
1718
                wh.GetThrottledLogger(),
32✔
1719
                tag.BlobSizeViolationOperation("RespondQueryTaskCompleted"),
32✔
1720
        ); err != nil {
33✔
1721
                completeRequest = &types.RespondQueryTaskCompletedRequest{
1✔
1722
                        TaskToken:     completeRequest.TaskToken,
1✔
1723
                        CompletedType: types.QueryTaskCompletedTypeFailed.Ptr(),
1✔
1724
                        QueryResult:   nil,
1✔
1725
                        ErrorMessage:  err.Error(),
1✔
1726
                }
1✔
1727
        }
1✔
1728

1729
        call := yarpc.CallFromContext(ctx)
32✔
1730

32✔
1731
        completeRequest.WorkerVersionInfo = &types.WorkerVersionInfo{
32✔
1732
                Impl:           call.Header(common.ClientImplHeaderName),
32✔
1733
                FeatureVersion: call.Header(common.FeatureVersionHeaderName),
32✔
1734
        }
32✔
1735
        matchingRequest := &types.MatchingRespondQueryTaskCompletedRequest{
32✔
1736
                DomainUUID:       queryTaskToken.DomainID,
32✔
1737
                TaskList:         &types.TaskList{Name: queryTaskToken.TaskList},
32✔
1738
                TaskID:           queryTaskToken.TaskID,
32✔
1739
                CompletedRequest: completeRequest,
32✔
1740
        }
32✔
1741

32✔
1742
        err = wh.GetMatchingClient().RespondQueryTaskCompleted(ctx, matchingRequest)
32✔
1743
        if err != nil {
33✔
1744
                return err
1✔
1745
        }
1✔
1746
        return nil
31✔
1747
}
1748

1749
func (wh *WorkflowHandler) StartWorkflowExecutionAsync(
1750
        ctx context.Context,
1751
        startRequest *types.StartWorkflowExecutionAsyncRequest,
1752
) (resp *types.StartWorkflowExecutionAsyncResponse, retError error) {
3✔
1753
        if wh.isShuttingDown() {
3✔
1754
                return nil, validate.ErrShuttingDown
×
1755
        }
×
1756
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionAsyncScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
3✔
1757
        // validate request before pushing to queue
3✔
1758
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest.StartWorkflowExecutionRequest, scope)
3✔
1759
        if err != nil {
3✔
1760
                return nil, err
×
1761
        }
×
1762

1763
        producer, err := wh.producerManager.GetProducerByDomain(startRequest.GetDomain())
3✔
1764
        if err != nil {
4✔
1765
                return nil, err
1✔
1766
        }
1✔
1767

1768
        // Serialize the message to be sent to the queue.
1769
        // Avoid JSON because json encoding of requests excludes PII fields such as input. JSON encoded request are logged by acccess controlled api layer for audit purposes.
1770
        payload, err := wh.thriftrwEncoder.Encode(thrift.FromStartWorkflowExecutionAsyncRequest(startRequest))
2✔
1771
        if err != nil {
2✔
1772
                return nil, fmt.Errorf("failed to encode StartWorkflowExecutionAsyncRequest: %v", err)
×
1773
        }
×
1774
        scope.RecordTimer(metrics.AsyncRequestPayloadSize, time.Duration(len(payload)))
2✔
1775

2✔
1776
        // propagate the headers from the context to the message
2✔
1777
        header := &shared.Header{
2✔
1778
                Fields: make(map[string][]byte),
2✔
1779
        }
2✔
1780
        for k, v := range yarpc.CallFromContext(ctx).OriginalHeaders() {
2✔
1781
                header.Fields[k] = []byte(v)
×
1782
        }
×
1783
        messageType := sqlblobs.AsyncRequestTypeStartWorkflowExecutionAsyncRequest
2✔
1784
        message := &sqlblobs.AsyncRequestMessage{
2✔
1785
                PartitionKey: common.StringPtr(startRequest.GetWorkflowID()),
2✔
1786
                Type:         &messageType,
2✔
1787
                Header:       header,
2✔
1788
                Encoding:     common.StringPtr(string(common.EncodingTypeThriftRW)),
2✔
1789
                Payload:      payload,
2✔
1790
        }
2✔
1791
        err = producer.Publish(ctx, message)
2✔
1792
        if err != nil {
3✔
1793
                return nil, err
1✔
1794
        }
1✔
1795
        return &types.StartWorkflowExecutionAsyncResponse{}, nil
1✔
1796
}
1797

1798
// StartWorkflowExecution - Creates a new workflow execution
1799
func (wh *WorkflowHandler) StartWorkflowExecution(
1800
        ctx context.Context,
1801
        startRequest *types.StartWorkflowExecutionRequest,
1802
) (resp *types.StartWorkflowExecutionResponse, retError error) {
495✔
1803
        if wh.isShuttingDown() {
496✔
1804
                return nil, validate.ErrShuttingDown
1✔
1805
        }
1✔
1806
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
494✔
1807
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest, scope)
494✔
1808
        if err != nil {
505✔
1809
                return nil, err
11✔
1810
        }
11✔
1811
        domainName := startRequest.GetDomain()
483✔
1812
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
483✔
1813
        if err != nil {
484✔
1814
                return nil, err
1✔
1815
        }
1✔
1816
        wh.GetLogger().Debug("Start workflow execution request domainID", tag.WorkflowDomainID(domainID))
482✔
1817
        historyRequest, err := common.CreateHistoryStartWorkflowRequest(
482✔
1818
                domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
482✔
1819
        if err != nil {
482✔
1820
                return nil, err
×
1821
        }
×
1822

1823
        resp, err = wh.GetHistoryClient().StartWorkflowExecution(ctx, historyRequest)
482✔
1824
        if err != nil {
516✔
1825
                return nil, err
34✔
1826
        }
34✔
1827
        return resp, nil
448✔
1828
}
1829

1830
func (wh *WorkflowHandler) validateStartWorkflowExecutionRequest(ctx context.Context, startRequest *types.StartWorkflowExecutionRequest, scope metrics.Scope) error {
497✔
1831
        if startRequest == nil {
498✔
1832
                return validate.ErrRequestNotSet
1✔
1833
        }
1✔
1834
        domainName := startRequest.GetDomain()
496✔
1835
        if domainName == "" {
497✔
1836
                return validate.ErrDomainNotSet
1✔
1837
        }
1✔
1838
        if startRequest.GetWorkflowID() == "" {
496✔
1839
                return validate.ErrWorkflowIDNotSet
1✔
1840
        }
1✔
1841
        if _, err := uuid.Parse(startRequest.RequestID); err != nil {
496✔
1842
                return &types.BadRequestError{Message: fmt.Sprintf("requestId %q is not a valid UUID", startRequest.RequestID)}
2✔
1843
        }
2✔
1844
        if startRequest.WorkflowType == nil || startRequest.WorkflowType.GetName() == "" {
493✔
1845
                return validate.ErrWorkflowTypeNotSet
1✔
1846
        }
1✔
1847
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
491✔
1848
                return err
×
1849
        }
×
1850
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
491✔
1851
        if !common.IsValidIDLength(
491✔
1852
                domainName,
491✔
1853
                scope,
491✔
1854
                idLengthWarnLimit,
491✔
1855
                wh.config.DomainNameMaxLength(domainName),
491✔
1856
                metrics.CadenceErrDomainNameExceededWarnLimit,
491✔
1857
                domainName,
491✔
1858
                wh.GetLogger(),
491✔
1859
                tag.IDTypeDomainName) {
491✔
1860
                return validate.ErrDomainTooLong
×
1861
        }
×
1862
        if !common.IsValidIDLength(
491✔
1863
                startRequest.GetWorkflowID(),
491✔
1864
                scope,
491✔
1865
                idLengthWarnLimit,
491✔
1866
                wh.config.WorkflowIDMaxLength(domainName),
491✔
1867
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
491✔
1868
                domainName,
491✔
1869
                wh.GetLogger(),
491✔
1870
                tag.IDTypeWorkflowID) {
491✔
1871
                return validate.ErrWorkflowIDTooLong
×
1872
        }
×
1873
        if err := common.ValidateRetryPolicy(startRequest.RetryPolicy); err != nil {
491✔
1874
                return err
×
1875
        }
×
1876
        wh.GetLogger().Debug(
491✔
1877
                "Received StartWorkflowExecution. WorkflowID",
491✔
1878
                tag.WorkflowID(startRequest.GetWorkflowID()))
491✔
1879
        if !common.IsValidIDLength(
491✔
1880
                startRequest.WorkflowType.GetName(),
491✔
1881
                scope,
491✔
1882
                idLengthWarnLimit,
491✔
1883
                wh.config.WorkflowTypeMaxLength(domainName),
491✔
1884
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
491✔
1885
                domainName,
491✔
1886
                wh.GetLogger(),
491✔
1887
                tag.IDTypeWorkflowType) {
491✔
1888
                return validate.ErrWorkflowTypeTooLong
×
1889
        }
×
1890
        if err := wh.validateTaskList(startRequest.TaskList, scope, domainName); err != nil {
492✔
1891
                return err
1✔
1892
        }
1✔
1893
        if startRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
491✔
1894
                return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
1✔
1895
        }
1✔
1896
        if startRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
490✔
1897
                return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
1✔
1898
        }
1✔
1899
        if startRequest.GetDelayStartSeconds() < 0 {
489✔
1900
                return validate.ErrInvalidDelayStartSeconds
1✔
1901
        }
1✔
1902
        if startRequest.GetJitterStartSeconds() < 0 {
487✔
1903
                return validate.ErrInvalidJitterStartSeconds
×
1904
        }
×
1905
        jitter := startRequest.GetJitterStartSeconds()
487✔
1906
        cron := startRequest.GetCronSchedule()
487✔
1907
        if cron != "" {
505✔
1908
                if _, err := backoff.ValidateSchedule(startRequest.GetCronSchedule()); err != nil {
18✔
1909
                        return err
×
1910
                }
×
1911
        }
1912
        if jitter > 0 && cron != "" {
487✔
1913
                // Calculate the cron duration and ensure that jitter is not greater than the cron duration,
×
1914
                // because that would be confusing to users.
×
1915

×
1916
                // Request using start/end time zero value, which will get us an exact answer (i.e. its not in the
×
1917
                // middle of a minute)
×
1918
                backoffSeconds, err := backoff.GetBackoffForNextScheduleInSeconds(cron, time.Time{}, time.Time{}, jitter)
×
1919
                if err != nil {
×
1920
                        return err
×
1921
                }
×
1922
                if jitter > backoffSeconds {
×
1923
                        return validate.ErrInvalidJitterStartSeconds2
×
1924
                }
×
1925
        }
1926
        if !common.IsValidIDLength(
487✔
1927
                startRequest.GetRequestID(),
487✔
1928
                scope,
487✔
1929
                idLengthWarnLimit,
487✔
1930
                wh.config.RequestIDMaxLength(domainName),
487✔
1931
                metrics.CadenceErrRequestIDExceededWarnLimit,
487✔
1932
                domainName,
487✔
1933
                wh.GetLogger(),
487✔
1934
                tag.IDTypeRequestID) {
487✔
1935
                return validate.ErrRequestIDTooLong
×
1936
        }
×
1937
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(startRequest.SearchAttributes, domainName); err != nil {
487✔
1938
                return err
×
1939
        }
×
1940
        wh.GetLogger().Debug("Start workflow execution request domain", tag.WorkflowDomainName(domainName))
487✔
1941
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
487✔
1942
        if err != nil {
487✔
1943
                return err
×
1944
        }
×
1945
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
487✔
1946
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
487✔
1947
        actualSize := len(startRequest.Input)
487✔
1948
        if startRequest.Memo != nil {
499✔
1949
                actualSize += common.GetSizeOfMapStringToByteArray(startRequest.Memo.GetFields())
12✔
1950
        }
12✔
1951
        if err := common.CheckEventBlobSizeLimit(
487✔
1952
                actualSize,
487✔
1953
                sizeLimitWarn,
487✔
1954
                sizeLimitError,
487✔
1955
                domainID,
487✔
1956
                startRequest.GetWorkflowID(),
487✔
1957
                "",
487✔
1958
                scope,
487✔
1959
                wh.GetThrottledLogger(),
487✔
1960
                tag.BlobSizeViolationOperation("StartWorkflowExecution"),
487✔
1961
        ); err != nil {
487✔
1962
                return err
×
1963
        }
×
1964
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
487✔
1965
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
488✔
1966
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
1967
        }
1✔
1968
        return nil
486✔
1969
}
1970

1971
// GetWorkflowExecutionHistory - retrieves the history of workflow execution
1972
func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
1973
        ctx context.Context,
1974
        getRequest *types.GetWorkflowExecutionHistoryRequest,
1975
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
464✔
1976
        if wh.isShuttingDown() {
464✔
1977
                return nil, validate.ErrShuttingDown
×
1978
        }
×
1979

1980
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
464✔
1981
                return nil, err
×
1982
        }
×
1983

1984
        if getRequest == nil {
464✔
1985
                return nil, validate.ErrRequestNotSet
×
1986
        }
×
1987

1988
        domainName := getRequest.GetDomain()
464✔
1989
        wfExecution := getRequest.GetExecution()
464✔
1990

464✔
1991
        if domainName == "" {
464✔
1992
                return nil, validate.ErrDomainNotSet
×
1993
        }
×
1994

1995
        if err := validate.CheckExecution(wfExecution); err != nil {
464✔
1996
                return nil, err
×
1997
        }
×
1998

1999
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
464✔
2000
        if err != nil {
464✔
2001
                return nil, err
×
2002
        }
×
2003

2004
        if getRequest.GetMaximumPageSize() <= 0 {
796✔
2005
                getRequest.MaximumPageSize = int32(wh.config.HistoryMaxPageSize(getRequest.GetDomain()))
332✔
2006
        }
332✔
2007
        // force limit page size if exceed
2008
        if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize {
464✔
2009
                wh.GetThrottledLogger().Warn("GetHistory page size is larger than threshold",
×
2010
                        tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2011
                        tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2012
                        tag.WorkflowDomainID(domainID),
×
2013
                        tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))
×
2014
                getRequest.MaximumPageSize = common.GetHistoryMaxPageSize
×
2015
        }
×
2016

2017
        scope := getMetricsScopeWithDomain(metrics.FrontendGetWorkflowExecutionHistoryScope, getRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
464✔
2018
        if !getRequest.GetSkipArchival() {
910✔
2019
                enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled()
446✔
2020
                historyArchived := wh.historyArchived(ctx, getRequest, domainID)
446✔
2021
                if enableArchivalRead && historyArchived {
467✔
2022
                        return wh.getArchivedHistory(ctx, getRequest, domainID)
21✔
2023
                }
21✔
2024
        }
2025

2026
        // this function return the following 6 things,
2027
        // 1. branch token
2028
        // 2. the workflow run ID
2029
        // 3. the last first event ID (the event ID of the last batch of events in the history)
2030
        // 4. the next event ID
2031
        // 5. whether the workflow is closed
2032
        // 6. error if any
2033
        queryHistory := func(
443✔
2034
                domainUUID string,
443✔
2035
                execution *types.WorkflowExecution,
443✔
2036
                expectedNextEventID int64,
443✔
2037
                currentBranchToken []byte,
443✔
2038
        ) ([]byte, string, int64, int64, bool, error) {
842✔
2039
                response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
399✔
2040
                        DomainUUID:          domainUUID,
399✔
2041
                        Execution:           execution,
399✔
2042
                        ExpectedNextEventID: expectedNextEventID,
399✔
2043
                        CurrentBranchToken:  currentBranchToken,
399✔
2044
                })
399✔
2045

399✔
2046
                if err != nil {
400✔
2047
                        return nil, "", 0, 0, false, err
1✔
2048
                }
1✔
2049
                isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
398✔
2050

398✔
2051
                return response.CurrentBranchToken,
398✔
2052
                        response.Execution.GetRunID(),
398✔
2053
                        response.GetLastFirstEventID(),
398✔
2054
                        response.GetNextEventID(),
398✔
2055
                        isWorkflowRunning,
398✔
2056
                        nil
398✔
2057
        }
2058

2059
        isLongPoll := getRequest.GetWaitForNewEvent()
443✔
2060
        isCloseEventOnly := getRequest.GetHistoryEventFilterType() == types.HistoryEventFilterTypeCloseEvent
443✔
2061
        execution := getRequest.Execution
443✔
2062
        token := &getHistoryContinuationToken{}
443✔
2063

443✔
2064
        var runID string
443✔
2065
        lastFirstEventID := common.FirstEventID
443✔
2066
        var nextEventID int64
443✔
2067
        var isWorkflowRunning bool
443✔
2068

443✔
2069
        // process the token for paging
443✔
2070
        queryNextEventID := common.EndEventID
443✔
2071
        if getRequest.NextPageToken != nil {
487✔
2072
                token, err = deserializeHistoryToken(getRequest.NextPageToken)
44✔
2073
                if err != nil {
44✔
2074
                        return nil, validate.ErrInvalidNextPageToken
×
2075
                }
×
2076
                if execution.RunID != "" && execution.GetRunID() != token.RunID {
44✔
2077
                        return nil, validate.ErrNextPageTokenRunIDMismatch
×
2078
                }
×
2079

2080
                execution.RunID = token.RunID
44✔
2081

44✔
2082
                // we need to update the current next event ID and whether workflow is running
44✔
2083
                if len(token.PersistenceToken) == 0 && isLongPoll && token.IsWorkflowRunning {
44✔
2084
                        logger := wh.GetLogger().WithTags(
×
2085
                                tag.WorkflowDomainName(getRequest.GetDomain()),
×
2086
                                tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2087
                                tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2088
                        )
×
2089
                        // TODO: for now we only log the invalid timeout (this is done inside the helper function) in case
×
2090
                        // this change breaks existing customers. Once we are sure no one is calling this API with very short timeout
×
2091
                        // we can return the error.
×
2092
                        _ = common.ValidateLongPollContextTimeout(ctx, "GetWorkflowExecutionHistory", logger)
×
2093

×
2094
                        if !isCloseEventOnly {
×
2095
                                queryNextEventID = token.NextEventID
×
2096
                        }
×
2097
                        token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
×
2098
                                queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
×
2099
                        if err != nil {
×
2100
                                return nil, err
×
2101
                        }
×
2102
                        token.FirstEventID = token.NextEventID
×
2103
                        token.NextEventID = nextEventID
×
2104
                        token.IsWorkflowRunning = isWorkflowRunning
×
2105
                }
2106
        } else {
399✔
2107
                if !isCloseEventOnly {
780✔
2108
                        queryNextEventID = common.FirstEventID
381✔
2109
                }
381✔
2110
                token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
399✔
2111
                        queryHistory(domainID, execution, queryNextEventID, nil)
399✔
2112
                if err != nil {
400✔
2113
                        return nil, err
1✔
2114
                }
1✔
2115

2116
                execution.RunID = runID
398✔
2117

398✔
2118
                token.RunID = runID
398✔
2119
                token.FirstEventID = common.FirstEventID
398✔
2120
                token.NextEventID = nextEventID
398✔
2121
                token.IsWorkflowRunning = isWorkflowRunning
398✔
2122
                token.PersistenceToken = nil
398✔
2123
        }
2124

2125
        call := yarpc.CallFromContext(ctx)
442✔
2126
        clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
442✔
2127
        clientImpl := call.Header(common.ClientImplHeaderName)
442✔
2128
        supportsRawHistoryQuery := wh.versionChecker.SupportsRawHistoryQuery(clientImpl, clientFeatureVersion) == nil
442✔
2129
        isRawHistoryEnabled := wh.config.SendRawWorkflowHistory(domainName) && supportsRawHistoryQuery
442✔
2130

442✔
2131
        history := &types.History{}
442✔
2132
        history.Events = []*types.HistoryEvent{}
442✔
2133
        var historyBlob []*types.DataBlob
442✔
2134

442✔
2135
        // helper function to just getHistory
442✔
2136
        getHistory := func(firstEventID, nextEventID int64, nextPageToken []byte) error {
884✔
2137
                if isRawHistoryEnabled {
444✔
2138
                        historyBlob, token.PersistenceToken, err = wh.getRawHistory(
2✔
2139
                                ctx,
2✔
2140
                                scope,
2✔
2141
                                domainID,
2✔
2142
                                domainName,
2✔
2143
                                *execution,
2✔
2144
                                firstEventID,
2✔
2145
                                nextEventID,
2✔
2146
                                getRequest.GetMaximumPageSize(),
2✔
2147
                                nextPageToken,
2✔
2148
                                token.TransientDecision,
2✔
2149
                                token.BranchToken,
2✔
2150
                        )
2✔
2151
                } else {
442✔
2152
                        history, token.PersistenceToken, err = wh.getHistory(
440✔
2153
                                ctx,
440✔
2154
                                scope,
440✔
2155
                                domainID,
440✔
2156
                                domainName,
440✔
2157
                                *execution,
440✔
2158
                                firstEventID,
440✔
2159
                                nextEventID,
440✔
2160
                                getRequest.GetMaximumPageSize(),
440✔
2161
                                nextPageToken,
440✔
2162
                                token.TransientDecision,
440✔
2163
                                token.BranchToken,
440✔
2164
                        )
440✔
2165
                }
440✔
2166
                if err != nil {
442✔
2167
                        return err
×
2168
                }
×
2169
                return nil
442✔
2170
        }
2171

2172
        if isCloseEventOnly {
460✔
2173
                if !isWorkflowRunning {
36✔
2174
                        if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil {
18✔
2175
                                return nil, err
×
2176
                        }
×
2177
                        if isRawHistoryEnabled {
18✔
2178
                                // since getHistory func will not return empty history, so the below is safe
×
2179
                                historyBlob = historyBlob[len(historyBlob)-1:]
×
2180
                        } else {
18✔
2181
                                // since getHistory func will not return empty history, so the below is safe
18✔
2182
                                history.Events = history.Events[len(history.Events)-1:]
18✔
2183
                        }
18✔
2184
                        token = nil
18✔
2185
                } else if isLongPoll {
×
2186
                        // set the persistence token to be nil so next time we will query history for updates
×
2187
                        token.PersistenceToken = nil
×
2188
                } else {
×
2189
                        token = nil
×
2190
                }
×
2191
        } else {
424✔
2192
                // return all events
424✔
2193
                if token.FirstEventID >= token.NextEventID {
424✔
2194
                        // currently there is no new event
×
2195
                        history.Events = []*types.HistoryEvent{}
×
2196
                        if !isWorkflowRunning {
×
2197
                                token = nil
×
2198
                        }
×
2199
                } else {
424✔
2200
                        if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil {
424✔
2201
                                return nil, err
×
2202
                        }
×
2203
                        // here, for long pull on history events, we need to intercept the paging token from cassandra
2204
                        // and do something clever
2205
                        if len(token.PersistenceToken) == 0 && (!token.IsWorkflowRunning || !isLongPoll) {
806✔
2206
                                // meaning, there is no more history to be returned
382✔
2207
                                token = nil
382✔
2208
                        }
382✔
2209
                }
2210
        }
2211

2212
        nextToken, err := serializeHistoryToken(token)
442✔
2213
        if err != nil {
442✔
2214
                return nil, err
×
2215
        }
×
2216
        return &types.GetWorkflowExecutionHistoryResponse{
442✔
2217
                History:       history,
442✔
2218
                RawHistory:    historyBlob,
442✔
2219
                NextPageToken: nextToken,
442✔
2220
                Archived:      false,
442✔
2221
        }, nil
442✔
2222
}
2223

2224
// SignalWorkflowExecution is used to send a signal event to running workflow execution.  This results in
2225
// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
2226
func (wh *WorkflowHandler) SignalWorkflowExecution(
2227
        ctx context.Context,
2228
        signalRequest *types.SignalWorkflowExecutionRequest,
2229
) (retError error) {
736✔
2230
        if wh.isShuttingDown() {
737✔
2231
                return validate.ErrShuttingDown
1✔
2232
        }
1✔
2233

2234
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
736✔
2235
                return err
1✔
2236
        }
1✔
2237

2238
        if signalRequest == nil {
735✔
2239
                return validate.ErrRequestNotSet
1✔
2240
        }
1✔
2241

2242
        domainName := signalRequest.GetDomain()
733✔
2243
        wfExecution := signalRequest.GetWorkflowExecution()
733✔
2244

733✔
2245
        if domainName == "" {
734✔
2246
                return validate.ErrDomainNotSet
1✔
2247
        }
1✔
2248
        if err := validate.CheckExecution(wfExecution); err != nil {
733✔
2249
                return err
1✔
2250
        }
1✔
2251

2252
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWorkflowExecutionScope, signalRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
731✔
2253
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
731✔
2254
        if !common.IsValidIDLength(
731✔
2255
                domainName,
731✔
2256
                scope,
731✔
2257
                idLengthWarnLimit,
731✔
2258
                wh.config.DomainNameMaxLength(domainName),
731✔
2259
                metrics.CadenceErrDomainNameExceededWarnLimit,
731✔
2260
                domainName,
731✔
2261
                wh.GetLogger(),
731✔
2262
                tag.IDTypeDomainName) {
732✔
2263
                return validate.ErrDomainTooLong
1✔
2264
        }
1✔
2265

2266
        if signalRequest.GetSignalName() == "" {
731✔
2267
                return validate.ErrSignalNameNotSet
1✔
2268
        }
1✔
2269

2270
        if !common.IsValidIDLength(
729✔
2271
                signalRequest.GetSignalName(),
729✔
2272
                scope,
729✔
2273
                idLengthWarnLimit,
729✔
2274
                wh.config.SignalNameMaxLength(domainName),
729✔
2275
                metrics.CadenceErrSignalNameExceededWarnLimit,
729✔
2276
                domainName,
729✔
2277
                wh.GetLogger(),
729✔
2278
                tag.IDTypeSignalName) {
730✔
2279
                return validate.ErrSignalNameTooLong
1✔
2280
        }
1✔
2281

2282
        if !common.IsValidIDLength(
728✔
2283
                signalRequest.GetRequestID(),
728✔
2284
                scope,
728✔
2285
                idLengthWarnLimit,
728✔
2286
                wh.config.RequestIDMaxLength(domainName),
728✔
2287
                metrics.CadenceErrRequestIDExceededWarnLimit,
728✔
2288
                domainName,
728✔
2289
                wh.GetLogger(),
728✔
2290
                tag.IDTypeRequestID) {
729✔
2291
                return validate.ErrRequestIDTooLong
1✔
2292
        }
1✔
2293

2294
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
727✔
2295
        if err != nil {
728✔
2296
                return err
1✔
2297
        }
1✔
2298

2299
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
726✔
2300
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
726✔
2301
        if err := common.CheckEventBlobSizeLimit(
726✔
2302
                len(signalRequest.Input),
726✔
2303
                sizeLimitWarn,
726✔
2304
                sizeLimitError,
726✔
2305
                domainID,
726✔
2306
                signalRequest.GetWorkflowExecution().GetWorkflowID(),
726✔
2307
                signalRequest.GetWorkflowExecution().GetRunID(),
726✔
2308
                scope,
726✔
2309
                wh.GetThrottledLogger(),
726✔
2310
                tag.BlobSizeViolationOperation("SignalWorkflowExecution"),
726✔
2311
        ); err != nil {
727✔
2312
                return err
1✔
2313
        }
1✔
2314

2315
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
725✔
2316
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
725✔
2317
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2318
        }
×
2319

2320
        err = wh.GetHistoryClient().SignalWorkflowExecution(ctx, &types.HistorySignalWorkflowExecutionRequest{
725✔
2321
                DomainUUID:    domainID,
725✔
2322
                SignalRequest: signalRequest,
725✔
2323
        })
725✔
2324
        if err != nil {
735✔
2325
                return wh.normalizeVersionedErrors(ctx, err)
10✔
2326
        }
10✔
2327

2328
        return nil
715✔
2329
}
2330

2331
func (wh *WorkflowHandler) SignalWithStartWorkflowExecutionAsync(
2332
        ctx context.Context,
2333
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionAsyncRequest,
2334
) (resp *types.SignalWithStartWorkflowExecutionAsyncResponse, retError error) {
3✔
2335
        if wh.isShuttingDown() {
3✔
2336
                return nil, validate.ErrShuttingDown
×
2337
        }
×
2338
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionAsyncScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
3✔
2339
        // validate request before pushing to queue
3✔
2340
        err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest.SignalWithStartWorkflowExecutionRequest, scope)
3✔
2341
        if err != nil {
3✔
2342
                return nil, err
×
2343
        }
×
2344
        producer, err := wh.producerManager.GetProducerByDomain(signalWithStartRequest.GetDomain())
3✔
2345
        if err != nil {
4✔
2346
                return nil, err
1✔
2347
        }
1✔
2348

2349
        // Serialize the message to be sent to the queue.
2350
        // Avoid JSON because json encoding of requests excludes PII fields such as input. JSON encoded request are logged by acccess controlled api layer for audit purposes.
2351
        payload, err := wh.thriftrwEncoder.Encode(thrift.FromSignalWithStartWorkflowExecutionAsyncRequest(signalWithStartRequest))
2✔
2352
        if err != nil {
2✔
2353
                return nil, fmt.Errorf("failed to encode SignalWithStartWorkflowExecutionAsyncRequest: %v", err)
×
2354
        }
×
2355
        scope.RecordTimer(metrics.AsyncRequestPayloadSize, time.Duration(len(payload)))
2✔
2356

2✔
2357
        // propagate the headers from the context to the message
2✔
2358
        header := &shared.Header{
2✔
2359
                Fields: map[string][]byte{},
2✔
2360
        }
2✔
2361
        for k, v := range yarpc.CallFromContext(ctx).OriginalHeaders() {
2✔
2362
                header.Fields[k] = []byte(v)
×
2363
        }
×
2364
        messageType := sqlblobs.AsyncRequestTypeSignalWithStartWorkflowExecutionAsyncRequest
2✔
2365
        message := &sqlblobs.AsyncRequestMessage{
2✔
2366
                PartitionKey: common.StringPtr(signalWithStartRequest.GetWorkflowID()),
2✔
2367
                Type:         &messageType,
2✔
2368
                Header:       header,
2✔
2369
                Encoding:     common.StringPtr(string(common.EncodingTypeThriftRW)),
2✔
2370
                Payload:      payload,
2✔
2371
        }
2✔
2372
        err = producer.Publish(ctx, message)
2✔
2373
        if err != nil {
3✔
2374
                return nil, err
1✔
2375
        }
1✔
2376
        return &types.SignalWithStartWorkflowExecutionAsyncResponse{}, nil
1✔
2377
}
2378

2379
// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
2380
// If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history
2381
// and a decision task being created for the execution.
2382
// If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled
2383
// event recorded in history, and a decision task being created for the execution
2384
func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(
2385
        ctx context.Context,
2386
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest,
2387
) (resp *types.StartWorkflowExecutionResponse, retError error) {
41✔
2388
        if wh.isShuttingDown() {
42✔
2389
                return nil, validate.ErrShuttingDown
1✔
2390
        }
1✔
2391

2392
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
40✔
2393
        err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest, scope)
40✔
2394
        if err != nil {
44✔
2395
                return nil, err
4✔
2396
        }
4✔
2397

2398
        domainName := signalWithStartRequest.GetDomain()
36✔
2399
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
36✔
2400
        if err != nil {
37✔
2401
                return nil, err
1✔
2402
        }
1✔
2403
        resp, err = wh.GetHistoryClient().SignalWithStartWorkflowExecution(ctx, &types.HistorySignalWithStartWorkflowExecutionRequest{
35✔
2404
                DomainUUID:             domainID,
35✔
2405
                SignalWithStartRequest: signalWithStartRequest,
35✔
2406
                PartitionConfig:        wh.getPartitionConfig(ctx, domainName),
35✔
2407
        })
35✔
2408
        if err != nil {
42✔
2409
                return nil, err
7✔
2410
        }
7✔
2411

2412
        return resp, nil
28✔
2413
}
2414

2415
func (wh *WorkflowHandler) validateSignalWithStartWorkflowExecutionRequest(ctx context.Context, signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest, scope metrics.Scope) error {
43✔
2416
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
43✔
2417
                return err
×
2418
        }
×
2419

2420
        if signalWithStartRequest == nil {
44✔
2421
                return validate.ErrRequestNotSet
1✔
2422
        }
1✔
2423

2424
        domainName := signalWithStartRequest.GetDomain()
42✔
2425
        if domainName == "" {
43✔
2426
                return validate.ErrDomainNotSet
1✔
2427
        }
1✔
2428
        if signalWithStartRequest.GetWorkflowID() == "" {
42✔
2429
                return validate.ErrWorkflowIDNotSet
1✔
2430
        }
1✔
2431

2432
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
40✔
2433
        if !common.IsValidIDLength(
40✔
2434
                domainName,
40✔
2435
                scope,
40✔
2436
                idLengthWarnLimit,
40✔
2437
                wh.config.DomainNameMaxLength(domainName),
40✔
2438
                metrics.CadenceErrDomainNameExceededWarnLimit,
40✔
2439
                domainName,
40✔
2440
                wh.GetLogger(),
40✔
2441
                tag.IDTypeDomainName) {
40✔
2442
                return validate.ErrDomainTooLong
×
2443
        }
×
2444

2445
        if !common.IsValidIDLength(
40✔
2446
                signalWithStartRequest.GetWorkflowID(),
40✔
2447
                scope,
40✔
2448
                idLengthWarnLimit,
40✔
2449
                wh.config.WorkflowIDMaxLength(domainName),
40✔
2450
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
40✔
2451
                domainName,
40✔
2452
                wh.GetLogger(),
40✔
2453
                tag.IDTypeWorkflowID) {
40✔
2454
                return validate.ErrWorkflowIDTooLong
×
2455
        }
×
2456

2457
        if signalWithStartRequest.GetSignalName() == "" {
41✔
2458
                return validate.ErrSignalNameNotSet
1✔
2459
        }
1✔
2460

2461
        if !common.IsValidIDLength(
39✔
2462
                signalWithStartRequest.GetSignalName(),
39✔
2463
                scope,
39✔
2464
                idLengthWarnLimit,
39✔
2465
                wh.config.SignalNameMaxLength(domainName),
39✔
2466
                metrics.CadenceErrSignalNameExceededWarnLimit,
39✔
2467
                domainName,
39✔
2468
                wh.GetLogger(),
39✔
2469
                tag.IDTypeSignalName) {
39✔
2470
                return validate.ErrSignalNameTooLong
×
2471
        }
×
2472

2473
        if signalWithStartRequest.WorkflowType == nil || signalWithStartRequest.WorkflowType.GetName() == "" {
39✔
2474
                return validate.ErrWorkflowTypeNotSet
×
2475
        }
×
2476

2477
        if !common.IsValidIDLength(
39✔
2478
                signalWithStartRequest.WorkflowType.GetName(),
39✔
2479
                scope,
39✔
2480
                idLengthWarnLimit,
39✔
2481
                wh.config.WorkflowTypeMaxLength(domainName),
39✔
2482
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
39✔
2483
                domainName,
39✔
2484
                wh.GetLogger(),
39✔
2485
                tag.IDTypeWorkflowType) {
39✔
2486
                return validate.ErrWorkflowTypeTooLong
×
2487
        }
×
2488

2489
        if err := wh.validateTaskList(signalWithStartRequest.TaskList, scope, domainName); err != nil {
39✔
2490
                return err
×
2491
        }
×
2492

2493
        if !common.IsValidIDLength(
39✔
2494
                signalWithStartRequest.GetRequestID(),
39✔
2495
                scope,
39✔
2496
                idLengthWarnLimit,
39✔
2497
                wh.config.RequestIDMaxLength(domainName),
39✔
2498
                metrics.CadenceErrRequestIDExceededWarnLimit,
39✔
2499
                domainName,
39✔
2500
                wh.GetLogger(),
39✔
2501
                tag.IDTypeRequestID) {
39✔
2502
                return validate.ErrRequestIDTooLong
×
2503
        }
×
2504

2505
        if signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
39✔
2506
                return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
×
2507
        }
×
2508

2509
        if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
39✔
2510
                return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
×
2511
        }
×
2512

2513
        if err := common.ValidateRetryPolicy(signalWithStartRequest.RetryPolicy); err != nil {
39✔
2514
                return err
×
2515
        }
×
2516

2517
        if signalWithStartRequest.GetCronSchedule() != "" {
39✔
2518
                if _, err := backoff.ValidateSchedule(signalWithStartRequest.GetCronSchedule()); err != nil {
×
2519
                        return err
×
2520
                }
×
2521
        }
2522

2523
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(signalWithStartRequest.SearchAttributes, domainName); err != nil {
39✔
2524
                return err
×
2525
        }
×
2526

2527
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
39✔
2528
        if err != nil {
39✔
2529
                return err
×
2530
        }
×
2531

2532
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
39✔
2533
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
39✔
2534
        if err := common.CheckEventBlobSizeLimit(
39✔
2535
                len(signalWithStartRequest.SignalInput),
39✔
2536
                sizeLimitWarn,
39✔
2537
                sizeLimitError,
39✔
2538
                domainID,
39✔
2539
                signalWithStartRequest.GetWorkflowID(),
39✔
2540
                "",
39✔
2541
                scope,
39✔
2542
                wh.GetThrottledLogger(),
39✔
2543
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
39✔
2544
        ); err != nil {
39✔
2545
                return err
×
2546
        }
×
2547
        actualSize := len(signalWithStartRequest.Input) + common.GetSizeOfMapStringToByteArray(signalWithStartRequest.Memo.GetFields())
39✔
2548
        if err := common.CheckEventBlobSizeLimit(
39✔
2549
                actualSize,
39✔
2550
                sizeLimitWarn,
39✔
2551
                sizeLimitError,
39✔
2552
                domainID,
39✔
2553
                signalWithStartRequest.GetWorkflowID(),
39✔
2554
                "",
39✔
2555
                scope,
39✔
2556
                wh.GetThrottledLogger(),
39✔
2557
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
39✔
2558
        ); err != nil {
39✔
2559
                return err
×
2560
        }
×
2561

2562
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
39✔
2563
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
39✔
2564
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2565
        }
×
2566
        return nil
39✔
2567
}
2568

2569
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event
2570
// in the history and immediately terminating the execution instance.
2571
func (wh *WorkflowHandler) TerminateWorkflowExecution(
2572
        ctx context.Context,
2573
        terminateRequest *types.TerminateWorkflowExecutionRequest,
2574
) (retError error) {
57✔
2575
        if wh.isShuttingDown() {
58✔
2576
                return validate.ErrShuttingDown
1✔
2577
        }
1✔
2578

2579
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
57✔
2580
                return err
1✔
2581
        }
1✔
2582

2583
        if terminateRequest == nil {
56✔
2584
                return validate.ErrRequestNotSet
1✔
2585
        }
1✔
2586

2587
        domainName := terminateRequest.GetDomain()
54✔
2588
        wfExecution := terminateRequest.GetWorkflowExecution()
54✔
2589
        if terminateRequest.GetDomain() == "" {
55✔
2590
                return validate.ErrDomainNotSet
1✔
2591
        }
1✔
2592
        if err := validate.CheckExecution(wfExecution); err != nil {
55✔
2593
                return err
2✔
2594
        }
2✔
2595

2596
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
51✔
2597
        if err != nil {
52✔
2598
                return err
1✔
2599
        }
1✔
2600

2601
        err = wh.GetHistoryClient().TerminateWorkflowExecution(ctx, &types.HistoryTerminateWorkflowExecutionRequest{
50✔
2602
                DomainUUID:       domainID,
50✔
2603
                TerminateRequest: terminateRequest,
50✔
2604
        })
50✔
2605
        if err != nil {
51✔
2606
                return wh.normalizeVersionedErrors(ctx, err)
1✔
2607
        }
1✔
2608

2609
        return nil
49✔
2610
}
2611

2612
// ResetWorkflowExecution reset an existing workflow execution to the nextFirstEventID
2613
// in the history and immediately terminating the current execution instance.
2614
func (wh *WorkflowHandler) ResetWorkflowExecution(
2615
        ctx context.Context,
2616
        resetRequest *types.ResetWorkflowExecutionRequest,
2617
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
24✔
2618
        if wh.isShuttingDown() {
25✔
2619
                return nil, validate.ErrShuttingDown
1✔
2620
        }
1✔
2621

2622
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
24✔
2623
                return nil, err
1✔
2624
        }
1✔
2625

2626
        if resetRequest == nil {
23✔
2627
                return nil, validate.ErrRequestNotSet
1✔
2628
        }
1✔
2629

2630
        domainName := resetRequest.GetDomain()
21✔
2631
        wfExecution := resetRequest.GetWorkflowExecution()
21✔
2632
        if domainName == "" {
22✔
2633
                return nil, validate.ErrDomainNotSet
1✔
2634
        }
1✔
2635
        if err := validate.CheckExecution(wfExecution); err != nil {
22✔
2636
                return nil, err
2✔
2637
        }
2✔
2638

2639
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
18✔
2640
        if err != nil {
19✔
2641
                return nil, err
1✔
2642
        }
1✔
2643

2644
        resp, err = wh.GetHistoryClient().ResetWorkflowExecution(ctx, &types.HistoryResetWorkflowExecutionRequest{
17✔
2645
                DomainUUID:   domainID,
17✔
2646
                ResetRequest: resetRequest,
17✔
2647
        })
17✔
2648
        if err != nil {
18✔
2649
                return nil, err
1✔
2650
        }
1✔
2651

2652
        return resp, nil
16✔
2653
}
2654

2655
// RequestCancelWorkflowExecution - requests to cancel a workflow execution
2656
func (wh *WorkflowHandler) RequestCancelWorkflowExecution(
2657
        ctx context.Context,
2658
        cancelRequest *types.RequestCancelWorkflowExecutionRequest,
2659
) (retError error) {
14✔
2660
        if wh.isShuttingDown() {
15✔
2661
                return validate.ErrShuttingDown
1✔
2662
        }
1✔
2663

2664
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
14✔
2665
                return err
1✔
2666
        }
1✔
2667

2668
        if cancelRequest == nil {
13✔
2669
                return validate.ErrRequestNotSet
1✔
2670
        }
1✔
2671

2672
        domainName := cancelRequest.GetDomain()
11✔
2673
        wfExecution := cancelRequest.GetWorkflowExecution()
11✔
2674
        if domainName == "" {
12✔
2675
                return validate.ErrDomainNotSet
1✔
2676
        }
1✔
2677
        if err := validate.CheckExecution(wfExecution); err != nil {
11✔
2678
                return err
1✔
2679
        }
1✔
2680

2681
        domainID, err := wh.GetDomainCache().GetDomainID(cancelRequest.GetDomain())
9✔
2682
        if err != nil {
10✔
2683
                return err
1✔
2684
        }
1✔
2685

2686
        err = wh.GetHistoryClient().RequestCancelWorkflowExecution(ctx, &types.HistoryRequestCancelWorkflowExecutionRequest{
8✔
2687
                DomainUUID:    domainID,
8✔
2688
                CancelRequest: cancelRequest,
8✔
2689
        })
8✔
2690
        if err != nil {
12✔
2691
                return wh.normalizeVersionedErrors(ctx, err)
4✔
2692
        }
4✔
2693

2694
        return nil
4✔
2695
}
2696

2697
// ListOpenWorkflowExecutions - retrieves info for open workflow executions in a domain
2698
func (wh *WorkflowHandler) ListOpenWorkflowExecutions(
2699
        ctx context.Context,
2700
        listRequest *types.ListOpenWorkflowExecutionsRequest,
2701
) (resp *types.ListOpenWorkflowExecutionsResponse, retError error) {
106✔
2702
        if wh.isShuttingDown() {
106✔
2703
                return nil, validate.ErrShuttingDown
×
2704
        }
×
2705

2706
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
106✔
2707
                return nil, err
×
2708
        }
×
2709

2710
        if listRequest == nil {
106✔
2711
                return nil, validate.ErrRequestNotSet
×
2712
        }
×
2713

2714
        if listRequest.GetDomain() == "" {
106✔
2715
                return nil, validate.ErrDomainNotSet
×
2716
        }
×
2717

2718
        if listRequest.StartTimeFilter == nil {
106✔
2719
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2720
        }
×
2721

2722
        if listRequest.StartTimeFilter.EarliestTime == nil {
106✔
2723
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2724
        }
×
2725

2726
        if listRequest.StartTimeFilter.LatestTime == nil {
106✔
2727
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2728
        }
×
2729

2730
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
106✔
2731
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2732
        }
×
2733

2734
        if listRequest.ExecutionFilter != nil && listRequest.TypeFilter != nil {
106✔
2735
                return nil, &types.BadRequestError{
×
2736
                        Message: "Only one of ExecutionFilter or TypeFilter is allowed"}
×
2737
        }
×
2738

2739
        if listRequest.GetMaximumPageSize() <= 0 {
167✔
2740
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
61✔
2741
        }
61✔
2742

2743
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
106✔
2744
                return nil, &types.BadRequestError{
×
2745
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2746
        }
×
2747

2748
        domain := listRequest.GetDomain()
106✔
2749
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
106✔
2750
        if err != nil {
106✔
2751
                return nil, err
×
2752
        }
×
2753

2754
        baseReq := persistence.ListWorkflowExecutionsRequest{
106✔
2755
                DomainUUID:    domainID,
106✔
2756
                Domain:        domain,
106✔
2757
                PageSize:      int(listRequest.GetMaximumPageSize()),
106✔
2758
                NextPageToken: listRequest.NextPageToken,
106✔
2759
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
106✔
2760
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
106✔
2761
        }
106✔
2762

106✔
2763
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
106✔
2764
        if listRequest.ExecutionFilter != nil {
206✔
2765
                if wh.config.DisableListVisibilityByFilter(domain) {
101✔
2766
                        err = validate.ErrNoPermission
1✔
2767
                } else {
100✔
2768
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByWorkflowID(
99✔
2769
                                ctx,
99✔
2770
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
99✔
2771
                                        ListWorkflowExecutionsRequest: baseReq,
99✔
2772
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
99✔
2773
                                })
99✔
2774
                }
99✔
2775
                wh.GetLogger().Debug("List open workflow with filter",
100✔
2776
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
100✔
2777
        } else if listRequest.TypeFilter != nil {
7✔
2778
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2779
                        err = validate.ErrNoPermission
1✔
2780
                } else {
1✔
2781
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByType(
×
2782
                                ctx,
×
2783
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2784
                                        ListWorkflowExecutionsRequest: baseReq,
×
2785
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2786
                                },
×
2787
                        )
×
2788
                }
×
2789
                wh.GetLogger().Debug("List open workflow with filter",
1✔
2790
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2791
        } else {
5✔
2792
                persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutions(ctx, &baseReq)
5✔
2793
        }
5✔
2794

2795
        if err != nil {
108✔
2796
                return nil, err
2✔
2797
        }
2✔
2798

2799
        resp = &types.ListOpenWorkflowExecutionsResponse{}
104✔
2800
        resp.Executions = persistenceResp.Executions
104✔
2801
        resp.NextPageToken = persistenceResp.NextPageToken
104✔
2802
        return resp, nil
104✔
2803
}
2804

2805
// ListArchivedWorkflowExecutions - retrieves archived info for closed workflow executions in a domain
2806
func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(
2807
        ctx context.Context,
2808
        listRequest *types.ListArchivedWorkflowExecutionsRequest,
2809
) (resp *types.ListArchivedWorkflowExecutionsResponse, retError error) {
15✔
2810
        if wh.isShuttingDown() {
15✔
2811
                return nil, validate.ErrShuttingDown
×
2812
        }
×
2813

2814
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2815
                return nil, err
×
2816
        }
×
2817

2818
        if listRequest == nil {
15✔
2819
                return nil, validate.ErrRequestNotSet
×
2820
        }
×
2821

2822
        if listRequest.GetDomain() == "" {
16✔
2823
                return nil, validate.ErrDomainNotSet
1✔
2824
        }
1✔
2825

2826
        if listRequest.GetPageSize() <= 0 {
14✔
2827
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
2828
        }
×
2829

2830
        maxPageSize := wh.config.VisibilityArchivalQueryMaxPageSize()
14✔
2831
        if int(listRequest.GetPageSize()) > maxPageSize {
14✔
2832
                return nil, &types.BadRequestError{
×
2833
                        Message: fmt.Sprintf("Pagesize is larger than allowed %d", maxPageSize)}
×
2834
        }
×
2835

2836
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() {
15✔
2837
                return nil, &types.BadRequestError{Message: "Cluster is not configured for visibility archival"}
1✔
2838
        }
1✔
2839

2840
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ReadEnabled() {
13✔
2841
                return nil, &types.BadRequestError{Message: "Cluster is not configured for reading archived visibility records"}
×
2842
        }
×
2843

2844
        entry, err := wh.GetDomainCache().GetDomain(listRequest.GetDomain())
13✔
2845
        if err != nil {
14✔
2846
                return nil, err
1✔
2847
        }
1✔
2848

2849
        if entry.GetConfig().VisibilityArchivalStatus != types.ArchivalStatusEnabled {
14✔
2850
                return nil, &types.BadRequestError{Message: "Domain is not configured for visibility archival"}
2✔
2851
        }
2✔
2852

2853
        URI, err := archiver.NewURI(entry.GetConfig().VisibilityArchivalURI)
10✔
2854
        if err != nil {
10✔
2855
                return nil, err
×
2856
        }
×
2857

2858
        visibilityArchiver, err := wh.GetArchiverProvider().GetVisibilityArchiver(URI.Scheme(), service.Frontend)
10✔
2859
        if err != nil {
10✔
2860
                return nil, err
×
2861
        }
×
2862

2863
        archiverRequest := &archiver.QueryVisibilityRequest{
10✔
2864
                DomainID:      entry.GetInfo().ID,
10✔
2865
                PageSize:      int(listRequest.GetPageSize()),
10✔
2866
                NextPageToken: listRequest.NextPageToken,
10✔
2867
                Query:         listRequest.GetQuery(),
10✔
2868
        }
10✔
2869

10✔
2870
        archiverResponse, err := visibilityArchiver.Query(ctx, URI, archiverRequest)
10✔
2871
        if err != nil {
10✔
2872
                return nil, err
×
2873
        }
×
2874

2875
        // special handling of ExecutionTime for cron or retry
2876
        for _, execution := range archiverResponse.Executions {
25✔
2877
                if execution.GetExecutionTime() == 0 {
30✔
2878
                        execution.ExecutionTime = common.Int64Ptr(execution.GetStartTime())
15✔
2879
                }
15✔
2880
        }
2881

2882
        return &types.ListArchivedWorkflowExecutionsResponse{
10✔
2883
                Executions:    archiverResponse.Executions,
10✔
2884
                NextPageToken: archiverResponse.NextPageToken,
10✔
2885
        }, nil
10✔
2886
}
2887

2888
// ListClosedWorkflowExecutions - retrieves info for closed workflow executions in a domain
2889
func (wh *WorkflowHandler) ListClosedWorkflowExecutions(
2890
        ctx context.Context,
2891
        listRequest *types.ListClosedWorkflowExecutionsRequest,
2892
) (resp *types.ListClosedWorkflowExecutionsResponse, retError error) {
28✔
2893
        if wh.isShuttingDown() {
28✔
2894
                return nil, validate.ErrShuttingDown
×
2895
        }
×
2896

2897
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
28✔
2898
                return nil, err
×
2899
        }
×
2900

2901
        if listRequest == nil {
28✔
2902
                return nil, validate.ErrRequestNotSet
×
2903
        }
×
2904

2905
        if listRequest.GetDomain() == "" {
28✔
2906
                return nil, validate.ErrDomainNotSet
×
2907
        }
×
2908

2909
        if listRequest.StartTimeFilter == nil {
28✔
2910
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2911
        }
×
2912

2913
        if listRequest.StartTimeFilter.EarliestTime == nil {
28✔
2914
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2915
        }
×
2916

2917
        if listRequest.StartTimeFilter.LatestTime == nil {
28✔
2918
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2919
        }
×
2920

2921
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
28✔
2922
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2923
        }
×
2924

2925
        filterCount := 0
28✔
2926
        if listRequest.TypeFilter != nil {
29✔
2927
                filterCount++
1✔
2928
        }
1✔
2929
        if listRequest.StatusFilter != nil {
29✔
2930
                filterCount++
1✔
2931
        }
1✔
2932

2933
        if filterCount > 1 {
28✔
2934
                return nil, &types.BadRequestError{
×
2935
                        Message: "Only one of ExecutionFilter, TypeFilter or StatusFilter is allowed"}
×
2936
        } // If ExecutionFilter is provided with one of TypeFilter or StatusFilter, use ExecutionFilter and ignore other filter
×
2937

2938
        if listRequest.GetMaximumPageSize() <= 0 {
29✔
2939
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
1✔
2940
        }
1✔
2941

2942
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
28✔
2943
                return nil, &types.BadRequestError{
×
2944
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2945
        }
×
2946

2947
        domain := listRequest.GetDomain()
28✔
2948
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
28✔
2949
        if err != nil {
28✔
2950
                return nil, err
×
2951
        }
×
2952

2953
        baseReq := persistence.ListWorkflowExecutionsRequest{
28✔
2954
                DomainUUID:    domainID,
28✔
2955
                Domain:        domain,
28✔
2956
                PageSize:      int(listRequest.GetMaximumPageSize()),
28✔
2957
                NextPageToken: listRequest.NextPageToken,
28✔
2958
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
28✔
2959
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
28✔
2960
        }
28✔
2961

28✔
2962
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
28✔
2963
        if listRequest.ExecutionFilter != nil {
44✔
2964
                if wh.config.DisableListVisibilityByFilter(domain) {
17✔
2965
                        err = validate.ErrNoPermission
1✔
2966
                } else {
16✔
2967
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByWorkflowID(
15✔
2968
                                ctx,
15✔
2969
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
15✔
2970
                                        ListWorkflowExecutionsRequest: baseReq,
15✔
2971
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
15✔
2972
                                },
15✔
2973
                        )
15✔
2974
                }
15✔
2975
                wh.GetLogger().Debug("List closed workflow with filter",
16✔
2976
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
16✔
2977
        } else if listRequest.TypeFilter != nil {
13✔
2978
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2979
                        err = validate.ErrNoPermission
1✔
2980
                } else {
1✔
2981
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByType(
×
2982
                                ctx,
×
2983
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2984
                                        ListWorkflowExecutionsRequest: baseReq,
×
2985
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2986
                                },
×
2987
                        )
×
2988
                }
×
2989
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
2990
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2991
        } else if listRequest.StatusFilter != nil {
12✔
2992
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2993
                        err = validate.ErrNoPermission
1✔
2994
                } else {
1✔
2995
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByStatus(
×
2996
                                ctx,
×
2997
                                &persistence.ListClosedWorkflowExecutionsByStatusRequest{
×
2998
                                        ListWorkflowExecutionsRequest: baseReq,
×
2999
                                        Status:                        listRequest.GetStatusFilter(),
×
3000
                                },
×
3001
                        )
×
3002
                }
×
3003
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
3004
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByStatus)
1✔
3005
        } else {
10✔
3006
                persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutions(ctx, &baseReq)
10✔
3007
        }
10✔
3008

3009
        if err != nil {
31✔
3010
                return nil, err
3✔
3011
        }
3✔
3012

3013
        resp = &types.ListClosedWorkflowExecutionsResponse{}
25✔
3014
        resp.Executions = persistenceResp.Executions
25✔
3015
        resp.NextPageToken = persistenceResp.NextPageToken
25✔
3016
        return resp, nil
25✔
3017
}
3018

3019
// ListWorkflowExecutions - retrieves info for workflow executions in a domain
3020
func (wh *WorkflowHandler) ListWorkflowExecutions(
3021
        ctx context.Context,
3022
        listRequest *types.ListWorkflowExecutionsRequest,
3023
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
142✔
3024
        if wh.isShuttingDown() {
142✔
3025
                return nil, validate.ErrShuttingDown
×
3026
        }
×
3027

3028
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
142✔
3029
                return nil, err
×
3030
        }
×
3031

3032
        if listRequest == nil {
142✔
3033
                return nil, validate.ErrRequestNotSet
×
3034
        }
×
3035

3036
        if listRequest.GetDomain() == "" {
142✔
3037
                return nil, validate.ErrDomainNotSet
×
3038
        }
×
3039

3040
        if listRequest.GetPageSize() <= 0 {
142✔
3041
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3042
        }
×
3043

3044
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
142✔
3045
                return nil, &types.BadRequestError{
×
3046
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3047
        }
×
3048

3049
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
142✔
3050
        if err != nil {
145✔
3051
                return nil, err
3✔
3052
        }
3✔
3053

3054
        domain := listRequest.GetDomain()
139✔
3055
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
139✔
3056
        if err != nil {
139✔
3057
                return nil, err
×
3058
        }
×
3059

3060
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
139✔
3061
                DomainUUID:    domainID,
139✔
3062
                Domain:        domain,
139✔
3063
                PageSize:      int(listRequest.GetPageSize()),
139✔
3064
                NextPageToken: listRequest.NextPageToken,
139✔
3065
                Query:         validatedQuery,
139✔
3066
        }
139✔
3067
        persistenceResp, err := wh.GetVisibilityManager().ListWorkflowExecutions(ctx, req)
139✔
3068
        if err != nil {
139✔
3069
                return nil, err
×
3070
        }
×
3071

3072
        resp = &types.ListWorkflowExecutionsResponse{}
139✔
3073
        resp.Executions = persistenceResp.Executions
139✔
3074
        resp.NextPageToken = persistenceResp.NextPageToken
139✔
3075
        return resp, nil
139✔
3076
}
3077

3078
// ListAllWorkflowExecutions - retrieves info for all workflow executions in a domain
3079
func (wh *WorkflowHandler) ListAllWorkflowExecutions(
3080
        ctx context.Context,
3081
        listRequest *types.ListAllWorkflowExecutionsRequest,
3082
) (resp *types.ListAllWorkflowExecutionsResponse, retError error) {
4✔
3083
        if wh.isShuttingDown() {
4✔
3084
                return nil, validate.ErrShuttingDown
×
3085
        }
×
3086

3087
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
4✔
3088
                return nil, err
×
3089
        }
×
3090

3091
        if listRequest == nil {
5✔
3092
                return nil, validate.ErrRequestNotSet
1✔
3093
        }
1✔
3094

3095
        if listRequest.GetDomain() == "" {
3✔
3096
                return nil, validate.ErrDomainNotSet
×
3097
        }
×
3098

3099
        if listRequest.StartTimeFilter == nil {
4✔
3100
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
1✔
3101
        }
1✔
3102

3103
        if listRequest.StartTimeFilter.EarliestTime == nil {
2✔
3104
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
3105
        }
×
3106

3107
        if listRequest.StartTimeFilter.LatestTime == nil {
2✔
3108
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
3109
        }
×
3110

3111
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
3✔
3112
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
1✔
3113
        }
1✔
3114

3115
        if listRequest.GetMaximumPageSize() <= 0 {
2✔
3116
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
1✔
3117
        }
1✔
3118

3119
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
1✔
3120
                return nil, &types.BadRequestError{
×
3121
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3122
        }
×
3123

3124
        domain := listRequest.GetDomain()
1✔
3125
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
1✔
3126
        if err != nil {
1✔
3127
                return nil, err
×
3128
        }
×
3129

3130
        baseReq := persistence.ListWorkflowExecutionsRequest{
1✔
3131
                DomainUUID:    domainID,
1✔
3132
                Domain:        domain,
1✔
3133
                PageSize:      int(listRequest.GetMaximumPageSize()),
1✔
3134
                NextPageToken: listRequest.NextPageToken,
1✔
3135
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
1✔
3136
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
1✔
3137
        }
1✔
3138
        listallrequest := &persistence.ListAllWorkflowExecutionsRequest{
1✔
3139
                ListWorkflowExecutionsRequest: baseReq,
1✔
3140
                PartialMatch:                  listRequest.PartialMatch,
1✔
3141
                WorkflowSearchValue:           listRequest.WorkflowSearchValue,
1✔
3142
                SortColumn:                    listRequest.SortColumn,
1✔
3143
                SortOrder:                     listRequest.SortOrder,
1✔
3144
        }
1✔
3145
        copy(listallrequest.StatusFilter, listRequest.StatusFilter)
1✔
3146
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
1✔
3147

1✔
3148
        persistenceResp, err = wh.GetVisibilityManager().ListAllWorkflowExecutions(
1✔
3149
                ctx,
1✔
3150
                listallrequest,
1✔
3151
        )
1✔
3152
        if err != nil {
1✔
3153
                return nil, err
×
3154
        }
×
3155

3156
        wh.GetLogger().Debug("List all workflows",
1✔
3157
                tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
3158

1✔
3159
        resp = &types.ListAllWorkflowExecutionsResponse{}
1✔
3160
        resp.Executions = persistenceResp.Executions
1✔
3161
        resp.NextPageToken = persistenceResp.NextPageToken
1✔
3162
        return resp, nil
1✔
3163
}
3164

3165
// RestartWorkflowExecution - retrieves info for an existing workflow then restarts it
3166
func (wh *WorkflowHandler) RestartWorkflowExecution(ctx context.Context, request *types.RestartWorkflowExecutionRequest) (resp *types.RestartWorkflowExecutionResponse, retError error) {
2✔
3167
        if wh.isShuttingDown() {
2✔
3168
                return nil, validate.ErrShuttingDown
×
3169
        }
×
3170

3171
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
3172
                return nil, err
×
3173
        }
×
3174

3175
        if request == nil {
2✔
3176
                return nil, validate.ErrRequestNotSet
×
3177
        }
×
3178

3179
        domainName := request.GetDomain()
2✔
3180
        wfExecution := request.GetWorkflowExecution()
2✔
3181

2✔
3182
        if request.GetDomain() == "" {
2✔
3183
                return nil, validate.ErrDomainNotSet
×
3184
        }
×
3185

3186
        if err := validate.CheckExecution(wfExecution); err != nil {
2✔
3187
                return nil, err
×
3188
        }
×
3189

3190
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
2✔
3191
        if err != nil {
2✔
3192
                return nil, err
×
3193
        }
×
3194

3195
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
2✔
3196
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
3✔
3197
                return nil, &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
3198
        }
1✔
3199

3200
        history, err := wh.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
1✔
3201
                Domain: domainName,
1✔
3202
                Execution: &types.WorkflowExecution{
1✔
3203
                        WorkflowID: wfExecution.WorkflowID,
1✔
3204
                        RunID:      wfExecution.RunID,
1✔
3205
                },
1✔
3206
                SkipArchival: true,
1✔
3207
        })
1✔
3208
        if err != nil {
1✔
3209
                return nil, validate.ErrHistoryNotFound
×
3210
        }
×
3211
        startRequest := constructRestartWorkflowRequest(history.History.Events[0].WorkflowExecutionStartedEventAttributes,
1✔
3212
                domainName, request.Identity, wfExecution.WorkflowID)
1✔
3213
        req, err := common.CreateHistoryStartWorkflowRequest(domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
1✔
3214
        if err != nil {
1✔
3215
                return nil, err
×
3216
        }
×
3217
        startResp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, req)
1✔
3218
        if err != nil {
1✔
3219
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3220
        }
×
3221
        resp = &types.RestartWorkflowExecutionResponse{
1✔
3222
                RunID: startResp.RunID,
1✔
3223
        }
1✔
3224

1✔
3225
        return resp, nil
1✔
3226
}
3227

3228
// ScanWorkflowExecutions - retrieves info for large amount of workflow executions in a domain without order
3229
func (wh *WorkflowHandler) ScanWorkflowExecutions(
3230
        ctx context.Context,
3231
        listRequest *types.ListWorkflowExecutionsRequest,
3232
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
39✔
3233
        if wh.isShuttingDown() {
39✔
3234
                return nil, validate.ErrShuttingDown
×
3235
        }
×
3236

3237
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
39✔
3238
                return nil, err
×
3239
        }
×
3240

3241
        if listRequest == nil {
39✔
3242
                return nil, validate.ErrRequestNotSet
×
3243
        }
×
3244

3245
        if listRequest.GetDomain() == "" {
39✔
3246
                return nil, validate.ErrDomainNotSet
×
3247
        }
×
3248

3249
        if listRequest.GetPageSize() <= 0 {
39✔
3250
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3251
        }
×
3252

3253
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
39✔
3254
                return nil, &types.BadRequestError{
×
3255
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3256
        }
×
3257

3258
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
39✔
3259
        if err != nil {
40✔
3260
                return nil, err
1✔
3261
        }
1✔
3262

3263
        domain := listRequest.GetDomain()
38✔
3264
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
38✔
3265
        if err != nil {
38✔
3266
                return nil, err
×
3267
        }
×
3268

3269
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
38✔
3270
                DomainUUID:    domainID,
38✔
3271
                Domain:        domain,
38✔
3272
                PageSize:      int(listRequest.GetPageSize()),
38✔
3273
                NextPageToken: listRequest.NextPageToken,
38✔
3274
                Query:         validatedQuery,
38✔
3275
        }
38✔
3276
        persistenceResp, err := wh.GetVisibilityManager().ScanWorkflowExecutions(ctx, req)
38✔
3277
        if err != nil {
38✔
3278
                return nil, err
×
3279
        }
×
3280

3281
        resp = &types.ListWorkflowExecutionsResponse{}
38✔
3282
        resp.Executions = persistenceResp.Executions
38✔
3283
        resp.NextPageToken = persistenceResp.NextPageToken
38✔
3284
        return resp, nil
38✔
3285
}
3286

3287
// CountWorkflowExecutions - count number of workflow executions in a domain
3288
func (wh *WorkflowHandler) CountWorkflowExecutions(
3289
        ctx context.Context,
3290
        countRequest *types.CountWorkflowExecutionsRequest,
3291
) (resp *types.CountWorkflowExecutionsResponse, retError error) {
14✔
3292
        if wh.isShuttingDown() {
14✔
3293
                return nil, validate.ErrShuttingDown
×
3294
        }
×
3295

3296
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
14✔
3297
                return nil, err
×
3298
        }
×
3299

3300
        if countRequest == nil {
14✔
3301
                return nil, validate.ErrRequestNotSet
×
3302
        }
×
3303

3304
        if countRequest.GetDomain() == "" {
14✔
3305
                return nil, validate.ErrDomainNotSet
×
3306
        }
×
3307

3308
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(countRequest.GetQuery())
14✔
3309
        if err != nil {
15✔
3310
                return nil, err
1✔
3311
        }
1✔
3312

3313
        domain := countRequest.GetDomain()
13✔
3314
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
13✔
3315
        if err != nil {
13✔
3316
                return nil, err
×
3317
        }
×
3318

3319
        req := &persistence.CountWorkflowExecutionsRequest{
13✔
3320
                DomainUUID: domainID,
13✔
3321
                Domain:     domain,
13✔
3322
                Query:      validatedQuery,
13✔
3323
        }
13✔
3324
        persistenceResp, err := wh.GetVisibilityManager().CountWorkflowExecutions(ctx, req)
13✔
3325
        if err != nil {
13✔
3326
                return nil, err
×
3327
        }
×
3328

3329
        resp = &types.CountWorkflowExecutionsResponse{
13✔
3330
                Count: persistenceResp.Count,
13✔
3331
        }
13✔
3332
        return resp, nil
13✔
3333
}
3334

3335
// GetSearchAttributes return valid indexed keys
3336
func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context) (resp *types.GetSearchAttributesResponse, retError error) {
1✔
3337
        if wh.isShuttingDown() {
1✔
3338
                return nil, validate.ErrShuttingDown
×
3339
        }
×
3340

3341
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1✔
3342
                return nil, err
×
3343
        }
×
3344

3345
        keys := wh.config.ValidSearchAttributes()
1✔
3346
        resp = &types.GetSearchAttributesResponse{
1✔
3347
                Keys: wh.convertIndexedKeyToThrift(keys),
1✔
3348
        }
1✔
3349
        return resp, nil
1✔
3350
}
3351

3352
// ResetStickyTaskList reset the volatile information in mutable state of a given workflow.
3353
func (wh *WorkflowHandler) ResetStickyTaskList(
3354
        ctx context.Context,
3355
        resetRequest *types.ResetStickyTaskListRequest,
3356
) (resp *types.ResetStickyTaskListResponse, retError error) {
3✔
3357
        if wh.isShuttingDown() {
3✔
3358
                return nil, validate.ErrShuttingDown
×
3359
        }
×
3360

3361
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
3362
                return nil, err
×
3363
        }
×
3364

3365
        if resetRequest == nil {
3✔
3366
                return nil, validate.ErrRequestNotSet
×
3367
        }
×
3368

3369
        domainName := resetRequest.GetDomain()
3✔
3370
        wfExecution := resetRequest.GetExecution()
3✔
3371

3✔
3372
        if domainName == "" {
3✔
3373
                return nil, validate.ErrDomainNotSet
×
3374
        }
×
3375

3376
        if err := validate.CheckExecution(wfExecution); err != nil {
3✔
3377
                return nil, err
×
3378
        }
×
3379

3380
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
3✔
3381
        if err != nil {
3✔
3382
                return nil, err
×
3383
        }
×
3384

3385
        _, err = wh.GetHistoryClient().ResetStickyTaskList(ctx, &types.HistoryResetStickyTaskListRequest{
3✔
3386
                DomainUUID: domainID,
3✔
3387
                Execution:  resetRequest.Execution,
3✔
3388
        })
3✔
3389
        if err != nil {
3✔
3390
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3391
        }
×
3392
        return &types.ResetStickyTaskListResponse{}, nil
3✔
3393
}
3394

3395
// QueryWorkflow returns query result for a specified workflow execution
3396
func (wh *WorkflowHandler) QueryWorkflow(
3397
        ctx context.Context,
3398
        queryRequest *types.QueryWorkflowRequest,
3399
) (resp *types.QueryWorkflowResponse, retError error) {
57✔
3400
        if wh.isShuttingDown() {
58✔
3401
                return nil, validate.ErrShuttingDown
1✔
3402
        }
1✔
3403

3404
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
57✔
3405
                return nil, err
1✔
3406
        }
1✔
3407

3408
        if queryRequest == nil {
56✔
3409
                return nil, validate.ErrRequestNotSet
1✔
3410
        }
1✔
3411

3412
        domainName := queryRequest.GetDomain()
54✔
3413
        wfExecution := queryRequest.GetExecution()
54✔
3414

54✔
3415
        if domainName == "" {
55✔
3416
                return nil, validate.ErrDomainNotSet
1✔
3417
        }
1✔
3418

3419
        if err := validate.CheckExecution(wfExecution); err != nil {
54✔
3420
                return nil, err
1✔
3421
        }
1✔
3422

3423
        if wh.config.DisallowQuery(domainName) {
53✔
3424
                return nil, validate.ErrQueryDisallowedForDomain
1✔
3425
        }
1✔
3426

3427
        if queryRequest.Query == nil {
52✔
3428
                return nil, validate.ErrQueryNotSet
1✔
3429
        }
1✔
3430

3431
        if queryRequest.Query.GetQueryType() == "" {
51✔
3432
                return nil, validate.ErrQueryTypeNotSet
1✔
3433
        }
1✔
3434

3435
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
49✔
3436
        if err != nil {
50✔
3437
                return nil, err
1✔
3438
        }
1✔
3439

3440
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
48✔
3441
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
48✔
3442

48✔
3443
        scope := getMetricsScopeWithDomain(metrics.FrontendQueryWorkflowScope, queryRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
48✔
3444
        if err := common.CheckEventBlobSizeLimit(
48✔
3445
                len(queryRequest.GetQuery().GetQueryArgs()),
48✔
3446
                sizeLimitWarn,
48✔
3447
                sizeLimitError,
48✔
3448
                domainID,
48✔
3449
                queryRequest.GetExecution().GetWorkflowID(),
48✔
3450
                queryRequest.GetExecution().GetRunID(),
48✔
3451
                scope,
48✔
3452
                wh.GetThrottledLogger(),
48✔
3453
                tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil {
49✔
3454
                return nil, err
1✔
3455
        }
1✔
3456

3457
        req := &types.HistoryQueryWorkflowRequest{
47✔
3458
                DomainUUID: domainID,
47✔
3459
                Request:    queryRequest,
47✔
3460
        }
47✔
3461
        hResponse, err := wh.GetHistoryClient().QueryWorkflow(ctx, req)
47✔
3462
        if err != nil {
60✔
3463
                return nil, err
13✔
3464
        }
13✔
3465
        return hResponse.GetResponse(), nil
34✔
3466
}
3467

3468
// DescribeWorkflowExecution returns information about the specified workflow execution.
3469
func (wh *WorkflowHandler) DescribeWorkflowExecution(
3470
        ctx context.Context,
3471
        request *types.DescribeWorkflowExecutionRequest,
3472
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
101✔
3473
        if wh.isShuttingDown() {
102✔
3474
                return nil, validate.ErrShuttingDown
1✔
3475
        }
1✔
3476

3477
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
101✔
3478
                return nil, err
1✔
3479
        }
1✔
3480

3481
        if request == nil {
100✔
3482
                return nil, validate.ErrRequestNotSet
1✔
3483
        }
1✔
3484

3485
        domainName := request.GetDomain()
98✔
3486
        wfExecution := request.GetExecution()
98✔
3487
        if domainName == "" {
99✔
3488
                return nil, validate.ErrDomainNotSet
1✔
3489
        }
1✔
3490

3491
        if err := validate.CheckExecution(wfExecution); err != nil {
98✔
3492
                return nil, err
1✔
3493
        }
1✔
3494

3495
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
96✔
3496
        if err != nil {
97✔
3497
                return nil, err
1✔
3498
        }
1✔
3499

3500
        response, err := wh.GetHistoryClient().DescribeWorkflowExecution(ctx, &types.HistoryDescribeWorkflowExecutionRequest{
95✔
3501
                DomainUUID: domainID,
95✔
3502
                Request:    request,
95✔
3503
        })
95✔
3504

95✔
3505
        if err != nil {
96✔
3506
                return nil, err
1✔
3507
        }
1✔
3508

3509
        return response, nil
94✔
3510
}
3511

3512
// DescribeTaskList returns information about the target tasklist, right now this API returns the
3513
// pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true,
3514
// it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
3515
func (wh *WorkflowHandler) DescribeTaskList(
3516
        ctx context.Context,
3517
        request *types.DescribeTaskListRequest,
3518
) (resp *types.DescribeTaskListResponse, retError error) {
18✔
3519
        if wh.isShuttingDown() {
18✔
3520
                return nil, validate.ErrShuttingDown
×
3521
        }
×
3522

3523
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
18✔
3524
                return nil, err
×
3525
        }
×
3526

3527
        if request == nil {
18✔
3528
                return nil, validate.ErrRequestNotSet
×
3529
        }
×
3530

3531
        if request.GetDomain() == "" {
18✔
3532
                return nil, validate.ErrDomainNotSet
×
3533
        }
×
3534

3535
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
18✔
3536
        if err != nil {
18✔
3537
                return nil, err
×
3538
        }
×
3539

3540
        scope := getMetricsScopeWithDomain(metrics.FrontendDescribeTaskListScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
18✔
3541
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
18✔
3542
                return nil, err
×
3543
        }
×
3544

3545
        if request.TaskListType == nil {
18✔
3546
                return nil, validate.ErrTaskListTypeNotSet
×
3547
        }
×
3548

3549
        response, err := wh.GetMatchingClient().DescribeTaskList(ctx, &types.MatchingDescribeTaskListRequest{
18✔
3550
                DomainUUID:  domainID,
18✔
3551
                DescRequest: request,
18✔
3552
        })
18✔
3553
        if err != nil {
18✔
3554
                return nil, err
×
3555
        }
×
3556

3557
        return response, nil
18✔
3558
}
3559

3560
// ListTaskListPartitions returns all the partition and host for a taskList
3561
func (wh *WorkflowHandler) ListTaskListPartitions(
3562
        ctx context.Context,
3563
        request *types.ListTaskListPartitionsRequest,
3564
) (resp *types.ListTaskListPartitionsResponse, retError error) {
×
3565
        if wh.isShuttingDown() {
×
3566
                return nil, validate.ErrShuttingDown
×
3567
        }
×
3568

3569
        if request == nil {
×
3570
                return nil, validate.ErrRequestNotSet
×
3571
        }
×
3572

3573
        if request.GetDomain() == "" {
×
3574
                return nil, validate.ErrDomainNotSet
×
3575
        }
×
3576

3577
        scope := getMetricsScopeWithDomain(metrics.FrontendListTaskListPartitionsScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
3578
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
×
3579
                return nil, err
×
3580
        }
×
3581

3582
        resp, err := wh.GetMatchingClient().ListTaskListPartitions(ctx, &types.MatchingListTaskListPartitionsRequest{
×
3583
                Domain:   request.Domain,
×
3584
                TaskList: request.TaskList,
×
3585
        })
×
3586
        return resp, err
×
3587
}
3588

3589
// GetTaskListsByDomain returns all the partition and host for a taskList
3590
func (wh *WorkflowHandler) GetTaskListsByDomain(
3591
        ctx context.Context,
3592
        request *types.GetTaskListsByDomainRequest,
3593
) (resp *types.GetTaskListsByDomainResponse, retError error) {
×
3594
        if wh.isShuttingDown() {
×
3595
                return nil, validate.ErrShuttingDown
×
3596
        }
×
3597

3598
        if request == nil {
×
3599
                return nil, validate.ErrRequestNotSet
×
3600
        }
×
3601

3602
        if request.GetDomain() == "" {
×
3603
                return nil, validate.ErrDomainNotSet
×
3604
        }
×
3605

3606
        resp, err := wh.GetMatchingClient().GetTaskListsByDomain(ctx, &types.GetTaskListsByDomainRequest{
×
3607
                Domain: request.Domain,
×
3608
        })
×
3609
        return resp, err
×
3610
}
3611

3612
// RefreshWorkflowTasks re-generates the workflow tasks
3613
func (wh *WorkflowHandler) RefreshWorkflowTasks(
3614
        ctx context.Context,
3615
        request *types.RefreshWorkflowTasksRequest,
3616
) (err error) {
×
3617
        if request == nil {
×
3618
                return validate.ErrRequestNotSet
×
3619
        }
×
3620
        if err := validate.CheckExecution(request.Execution); err != nil {
×
3621
                return err
×
3622
        }
×
3623
        domainEntry, err := wh.GetDomainCache().GetDomain(request.GetDomain())
×
3624
        if err != nil {
×
3625
                return err
×
3626
        }
×
3627

3628
        err = wh.GetHistoryClient().RefreshWorkflowTasks(ctx, &types.HistoryRefreshWorkflowTasksRequest{
×
3629
                DomainUIID: domainEntry.GetInfo().ID,
×
3630
                Request:    request,
×
3631
        })
×
3632
        if err != nil {
×
3633
                return err
×
3634
        }
×
3635
        return nil
×
3636
}
3637

3638
func (wh *WorkflowHandler) getRawHistory(
3639
        ctx context.Context,
3640
        scope metrics.Scope,
3641
        domainID string,
3642
        domainName string,
3643
        execution types.WorkflowExecution,
3644
        firstEventID int64,
3645
        nextEventID int64,
3646
        pageSize int32,
3647
        nextPageToken []byte,
3648
        transientDecision *types.TransientDecisionInfo,
3649
        branchToken []byte,
3650
) ([]*types.DataBlob, []byte, error) {
2✔
3651
        rawHistory := []*types.DataBlob{}
2✔
3652
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
2✔
3653

2✔
3654
        resp, err := wh.GetHistoryManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
2✔
3655
                BranchToken:   branchToken,
2✔
3656
                MinEventID:    firstEventID,
2✔
3657
                MaxEventID:    nextEventID,
2✔
3658
                PageSize:      int(pageSize),
2✔
3659
                NextPageToken: nextPageToken,
2✔
3660
                ShardID:       common.IntPtr(shardID),
2✔
3661
                DomainName:    domainName,
2✔
3662
        })
2✔
3663
        if err != nil {
2✔
3664
                return nil, nil, err
×
3665
        }
×
3666

3667
        var encoding *types.EncodingType
2✔
3668
        for _, data := range resp.HistoryEventBlobs {
4✔
3669
                switch data.Encoding {
2✔
3670
                case common.EncodingTypeJSON:
×
3671
                        encoding = types.EncodingTypeJSON.Ptr()
×
3672
                case common.EncodingTypeThriftRW:
2✔
3673
                        encoding = types.EncodingTypeThriftRW.Ptr()
2✔
3674
                default:
×
3675
                        panic(fmt.Sprintf("Invalid encoding type for raw history, encoding type: %s", data.Encoding))
×
3676
                }
3677
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3678
                        EncodingType: encoding,
2✔
3679
                        Data:         data.Data,
2✔
3680
                })
2✔
3681
        }
3682

3683
        if len(resp.NextPageToken) == 0 && transientDecision != nil {
4✔
3684
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
2✔
3685
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3686
                        wh.GetLogger().Error("getHistory error",
×
3687
                                tag.WorkflowDomainID(domainID),
×
3688
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3689
                                tag.WorkflowRunID(execution.GetRunID()),
×
3690
                                tag.Error(err))
×
3691
                }
×
3692
                blob, err := wh.GetPayloadSerializer().SerializeBatchEvents(
2✔
3693
                        []*types.HistoryEvent{transientDecision.ScheduledEvent, transientDecision.StartedEvent}, common.EncodingTypeThriftRW)
2✔
3694
                if err != nil {
2✔
3695
                        return nil, nil, err
×
3696
                }
×
3697
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3698
                        EncodingType: types.EncodingTypeThriftRW.Ptr(),
2✔
3699
                        Data:         blob.Data,
2✔
3700
                })
2✔
3701
        }
3702

3703
        return rawHistory, resp.NextPageToken, nil
2✔
3704
}
3705

3706
func (wh *WorkflowHandler) getHistory(
3707
        ctx context.Context,
3708
        scope metrics.Scope,
3709
        domainID string,
3710
        domainName string,
3711
        execution types.WorkflowExecution,
3712
        firstEventID, nextEventID int64,
3713
        pageSize int32,
3714
        nextPageToken []byte,
3715
        transientDecision *types.TransientDecisionInfo,
3716
        branchToken []byte,
3717
) (*types.History, []byte, error) {
1,622✔
3718

1,622✔
3719
        var size int
1,622✔
3720

1,622✔
3721
        isFirstPage := len(nextPageToken) == 0
1,622✔
3722
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
1,622✔
3723
        var err error
1,622✔
3724
        historyEvents, size, nextPageToken, err := persistenceutils.ReadFullPageV2Events(ctx, wh.GetHistoryManager(), &persistence.ReadHistoryBranchRequest{
1,622✔
3725
                BranchToken:   branchToken,
1,622✔
3726
                MinEventID:    firstEventID,
1,622✔
3727
                MaxEventID:    nextEventID,
1,622✔
3728
                PageSize:      int(pageSize),
1,622✔
3729
                NextPageToken: nextPageToken,
1,622✔
3730
                ShardID:       common.IntPtr(shardID),
1,622✔
3731
                DomainName:    domainName,
1,622✔
3732
        })
1,622✔
3733

1,622✔
3734
        if err != nil {
1,622✔
3735
                return nil, nil, err
×
3736
        }
×
3737

3738
        scope.RecordTimer(metrics.HistorySize, time.Duration(size))
1,622✔
3739

1,622✔
3740
        isLastPage := len(nextPageToken) == 0
1,622✔
3741
        if err := verifyHistoryIsComplete(
1,622✔
3742
                historyEvents,
1,622✔
3743
                firstEventID,
1,622✔
3744
                nextEventID-1,
1,622✔
3745
                isFirstPage,
1,622✔
3746
                isLastPage,
1,622✔
3747
                int(pageSize)); err != nil {
1,622✔
3748
                scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3749
                wh.GetLogger().Error("getHistory: incomplete history",
×
3750
                        tag.WorkflowDomainID(domainID),
×
3751
                        tag.WorkflowID(execution.GetWorkflowID()),
×
3752
                        tag.WorkflowRunID(execution.GetRunID()),
×
3753
                        tag.Error(err))
×
3754
                return nil, nil, err
×
3755
        }
×
3756

3757
        if len(nextPageToken) == 0 && transientDecision != nil {
1,793✔
3758
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
171✔
3759
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3760
                        wh.GetLogger().Error("getHistory error",
×
3761
                                tag.WorkflowDomainID(domainID),
×
3762
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3763
                                tag.WorkflowRunID(execution.GetRunID()),
×
3764
                                tag.Error(err))
×
3765
                }
×
3766
                // Append the transient decision events once we are done enumerating everything from the events table
3767
                historyEvents = append(historyEvents, transientDecision.ScheduledEvent, transientDecision.StartedEvent)
171✔
3768
        }
3769

3770
        executionHistory := &types.History{}
1,622✔
3771
        executionHistory.Events = historyEvents
1,622✔
3772
        return executionHistory, nextPageToken, nil
1,622✔
3773
}
3774

3775
func (wh *WorkflowHandler) validateTransientDecisionEvents(
3776
        expectedNextEventID int64,
3777
        decision *types.TransientDecisionInfo,
3778
) error {
173✔
3779

173✔
3780
        if decision.ScheduledEvent.ID == expectedNextEventID &&
173✔
3781
                decision.StartedEvent.ID == expectedNextEventID+1 {
346✔
3782
                return nil
173✔
3783
        }
173✔
3784

3785
        return fmt.Errorf(
×
3786
                "invalid transient decision: "+
×
3787
                        "expectedScheduledEventID=%v expectedStartedEventID=%v but have scheduledEventID=%v startedEventID=%v",
×
3788
                expectedNextEventID,
×
3789
                expectedNextEventID+1,
×
3790
                decision.ScheduledEvent.ID,
×
3791
                decision.StartedEvent.ID)
×
3792
}
3793

3794
func (wh *WorkflowHandler) validateTaskList(t *types.TaskList, scope metrics.Scope, domain string) error {
2,783✔
3795
        if t == nil || t.GetName() == "" {
2,784✔
3796
                return validate.ErrTaskListNotSet
1✔
3797
        }
1✔
3798

3799
        if !common.IsValidIDLength(
2,782✔
3800
                t.GetName(),
2,782✔
3801
                scope,
2,782✔
3802
                wh.config.MaxIDLengthWarnLimit(),
2,782✔
3803
                wh.config.TaskListNameMaxLength(domain),
2,782✔
3804
                metrics.CadenceErrTaskListNameExceededWarnLimit,
2,782✔
3805
                domain,
2,782✔
3806
                wh.GetLogger(),
2,782✔
3807
                tag.IDTypeTaskListName) {
2,782✔
3808
                return validate.ErrTaskListTooLong
×
3809
        }
×
3810
        return nil
2,782✔
3811
}
3812

3813
func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
3814
        ctx context.Context,
3815
        scope metrics.Scope,
3816
        domainID string,
3817
        matchingResp *types.MatchingPollForDecisionTaskResponse,
3818
        branchToken []byte,
3819
) (*types.PollForDecisionTaskResponse, error) {
1,240✔
3820

1,240✔
3821
        if matchingResp.WorkflowExecution == nil {
1,293✔
3822
                // this will happen if there is no decision task to be send to worker / caller
53✔
3823
                return &types.PollForDecisionTaskResponse{}, nil
53✔
3824
        }
53✔
3825

3826
        var history *types.History
1,187✔
3827
        var continuation []byte
1,187✔
3828
        var err error
1,187✔
3829

1,187✔
3830
        if matchingResp.GetStickyExecutionEnabled() && matchingResp.Query != nil {
1,193✔
3831
                // meaning sticky query, we should not return any events to worker
6✔
3832
                // since query task only check the current status
6✔
3833
                history = &types.History{
6✔
3834
                        Events: []*types.HistoryEvent{},
6✔
3835
                }
6✔
3836
        } else {
1,187✔
3837
                // here we have 3 cases:
1,181✔
3838
                // 1. sticky && non query task
1,181✔
3839
                // 2. non sticky &&  non query task
1,181✔
3840
                // 3. non sticky && query task
1,181✔
3841
                // for 1, partial history have to be send back
1,181✔
3842
                // for 2 and 3, full history have to be send back
1,181✔
3843

1,181✔
3844
                var persistenceToken []byte
1,181✔
3845

1,181✔
3846
                firstEventID := common.FirstEventID
1,181✔
3847
                nextEventID := matchingResp.GetNextEventID()
1,181✔
3848
                if matchingResp.GetStickyExecutionEnabled() {
1,277✔
3849
                        firstEventID = matchingResp.GetPreviousStartedEventID() + 1
96✔
3850
                }
96✔
3851
                domainName, dErr := wh.GetDomainCache().GetDomainName(domainID)
1,181✔
3852
                if dErr != nil {
1,181✔
3853
                        return nil, dErr
×
3854
                }
×
3855
                scope = scope.Tagged(metrics.DomainTag(domainName))
1,181✔
3856
                history, persistenceToken, err = wh.getHistory(
1,181✔
3857
                        ctx,
1,181✔
3858
                        scope,
1,181✔
3859
                        domainID,
1,181✔
3860
                        domainName,
1,181✔
3861
                        *matchingResp.WorkflowExecution,
1,181✔
3862
                        firstEventID,
1,181✔
3863
                        nextEventID,
1,181✔
3864
                        int32(wh.config.HistoryMaxPageSize(domainName)),
1,181✔
3865
                        nil,
1,181✔
3866
                        matchingResp.DecisionInfo,
1,181✔
3867
                        branchToken,
1,181✔
3868
                )
1,181✔
3869
                if err != nil {
1,181✔
3870
                        return nil, err
×
3871
                }
×
3872

3873
                if len(persistenceToken) != 0 {
1,181✔
3874
                        continuation, err = serializeHistoryToken(&getHistoryContinuationToken{
×
3875
                                RunID:             matchingResp.WorkflowExecution.GetRunID(),
×
3876
                                FirstEventID:      firstEventID,
×
3877
                                NextEventID:       nextEventID,
×
3878
                                PersistenceToken:  persistenceToken,
×
3879
                                TransientDecision: matchingResp.DecisionInfo,
×
3880
                                BranchToken:       branchToken,
×
3881
                        })
×
3882
                        if err != nil {
×
3883
                                return nil, err
×
3884
                        }
×
3885
                }
3886
        }
3887

3888
        resp := &types.PollForDecisionTaskResponse{
1,187✔
3889
                TaskToken:                 matchingResp.TaskToken,
1,187✔
3890
                WorkflowExecution:         matchingResp.WorkflowExecution,
1,187✔
3891
                WorkflowType:              matchingResp.WorkflowType,
1,187✔
3892
                PreviousStartedEventID:    matchingResp.PreviousStartedEventID,
1,187✔
3893
                StartedEventID:            matchingResp.StartedEventID, // this field is not set for query tasks as there's no decision task started event
1,187✔
3894
                Query:                     matchingResp.Query,
1,187✔
3895
                BacklogCountHint:          matchingResp.BacklogCountHint,
1,187✔
3896
                Attempt:                   matchingResp.Attempt,
1,187✔
3897
                History:                   history,
1,187✔
3898
                NextPageToken:             continuation,
1,187✔
3899
                WorkflowExecutionTaskList: matchingResp.WorkflowExecutionTaskList,
1,187✔
3900
                ScheduledTimestamp:        matchingResp.ScheduledTimestamp,
1,187✔
3901
                StartedTimestamp:          matchingResp.StartedTimestamp,
1,187✔
3902
                Queries:                   matchingResp.Queries,
1,187✔
3903
                NextEventID:               matchingResp.NextEventID,
1,187✔
3904
                TotalHistoryBytes:         matchingResp.TotalHistoryBytes,
1,187✔
3905
        }
1,187✔
3906

1,187✔
3907
        return resp, nil
1,187✔
3908
}
3909

3910
func verifyHistoryIsComplete(
3911
        events []*types.HistoryEvent,
3912
        expectedFirstEventID int64,
3913
        expectedLastEventID int64,
3914
        isFirstPage bool,
3915
        isLastPage bool,
3916
        pageSize int,
3917
) error {
1,641✔
3918

1,641✔
3919
        nEvents := len(events)
1,641✔
3920
        if nEvents == 0 {
1,654✔
3921
                if isLastPage {
26✔
3922
                        // we seem to be returning a non-nil pageToken on the lastPage which
13✔
3923
                        // in turn cases the client to call getHistory again - only to find
13✔
3924
                        // there are no more events to consume - bail out if this is the case here
13✔
3925
                        return nil
13✔
3926
                }
13✔
3927
                return fmt.Errorf("invalid history: contains zero events")
×
3928
        }
3929

3930
        firstEventID := events[0].ID
1,628✔
3931
        lastEventID := events[nEvents-1].ID
1,628✔
3932

1,628✔
3933
        if !isFirstPage { // atleast one page of history has been read previously
1,664✔
3934
                if firstEventID <= expectedFirstEventID {
36✔
3935
                        // not first page and no events have been read in the previous pages - not possible
×
3936
                        return &types.InternalServiceError{
×
3937
                                Message: fmt.Sprintf(
×
3938
                                        "invalid history: expected first eventID to be > %v but got %v", expectedFirstEventID, firstEventID),
×
3939
                        }
×
3940
                }
×
3941
                expectedFirstEventID = firstEventID
36✔
3942
        }
3943

3944
        if !isLastPage {
1,677✔
3945
                // estimate lastEventID based on pageSize. This is a lower bound
49✔
3946
                // since the persistence layer counts "batch of events" as a single page
49✔
3947
                expectedLastEventID = expectedFirstEventID + int64(pageSize) - 1
49✔
3948
        }
49✔
3949

3950
        nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1
1,628✔
3951

1,628✔
3952
        if firstEventID == expectedFirstEventID &&
1,628✔
3953
                ((isLastPage && lastEventID == expectedLastEventID && int64(nEvents) == nExpectedEvents) ||
1,628✔
3954
                        (!isLastPage && lastEventID >= expectedLastEventID && int64(nEvents) >= nExpectedEvents)) {
3,244✔
3955
                return nil
1,616✔
3956
        }
1,616✔
3957

3958
        return &types.InternalServiceError{
12✔
3959
                Message: fmt.Sprintf(
12✔
3960
                        "incomplete history: "+
12✔
3961
                                "expected events [%v-%v] but got events [%v-%v] of length %v:"+
12✔
3962
                                "isFirstPage=%v,isLastPage=%v,pageSize=%v",
12✔
3963
                        expectedFirstEventID,
12✔
3964
                        expectedLastEventID,
12✔
3965
                        firstEventID,
12✔
3966
                        lastEventID,
12✔
3967
                        nEvents,
12✔
3968
                        isFirstPage,
12✔
3969
                        isLastPage,
12✔
3970
                        pageSize),
12✔
3971
        }
12✔
3972
}
3973

3974
func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) {
44✔
3975
        token := &getHistoryContinuationToken{}
44✔
3976
        err := json.Unmarshal(bytes, token)
44✔
3977
        return token, err
44✔
3978
}
44✔
3979

3980
func serializeHistoryToken(token *getHistoryContinuationToken) ([]byte, error) {
442✔
3981
        if token == nil {
842✔
3982
                return nil, nil
400✔
3983
        }
400✔
3984

3985
        bytes, err := json.Marshal(token)
42✔
3986
        return bytes, err
42✔
3987
}
3988

3989
func isFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3990
        return updateRequest.ActiveClusterName != nil
9✔
3991
}
9✔
3992

3993
func isGraceFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3994
        return updateRequest.FailoverTimeoutInSeconds != nil
9✔
3995
}
9✔
3996

3997
func (wh *WorkflowHandler) checkOngoingFailover(
3998
        ctx context.Context,
3999
        domainName *string,
4000
) error {
1✔
4001

1✔
4002
        enabledClusters := wh.GetClusterMetadata().GetEnabledClusterInfo()
1✔
4003
        respChan := make(chan *types.DescribeDomainResponse, len(enabledClusters))
1✔
4004

1✔
4005
        g := &errgroup.Group{}
1✔
4006
        for clusterName := range enabledClusters {
3✔
4007
                frontendClient := wh.GetRemoteFrontendClient(clusterName)
2✔
4008
                g.Go(func() (e error) {
4✔
4009
                        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &e) }()
4✔
4010

4011
                        resp, _ := frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{Name: domainName})
2✔
4012
                        respChan <- resp
2✔
4013
                        return nil
2✔
4014
                })
4015
        }
4016
        g.Wait()
1✔
4017
        close(respChan)
1✔
4018

1✔
4019
        var failoverVersion *int64
1✔
4020
        for resp := range respChan {
3✔
4021
                if resp == nil {
2✔
4022
                        return &types.InternalServiceError{
×
4023
                                Message: "Failed to verify failover version from all clusters",
×
4024
                        }
×
4025
                }
×
4026
                if failoverVersion == nil {
3✔
4027
                        failoverVersion = &resp.FailoverVersion
1✔
4028
                }
1✔
4029
                if *failoverVersion != resp.GetFailoverVersion() {
2✔
4030
                        return &types.BadRequestError{
×
4031
                                Message: "Concurrent failover is not allow.",
×
4032
                        }
×
4033
                }
×
4034
        }
4035
        return nil
1✔
4036
}
4037

4038
func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *types.GetWorkflowExecutionHistoryRequest, domainID string) bool {
451✔
4039
        if request.GetExecution() == nil || request.GetExecution().GetRunID() == "" {
489✔
4040
                return false
38✔
4041
        }
38✔
4042
        getMutableStateRequest := &types.GetMutableStateRequest{
413✔
4043
                DomainUUID: domainID,
413✔
4044
                Execution:  request.Execution,
413✔
4045
        }
413✔
4046
        _, err := wh.GetHistoryClient().GetMutableState(ctx, getMutableStateRequest)
413✔
4047
        if err == nil {
803✔
4048
                return false
390✔
4049
        }
390✔
4050
        switch err.(type) {
23✔
4051
        case *types.EntityNotExistsError:
22✔
4052
                // the only case in which history is assumed to be archived is if getting mutable state returns entity not found error
22✔
4053
                return true
22✔
4054
        }
4055
        return false
1✔
4056
}
4057

4058
func (wh *WorkflowHandler) getArchivedHistory(
4059
        ctx context.Context,
4060
        request *types.GetWorkflowExecutionHistoryRequest,
4061
        domainID string,
4062
) (*types.GetWorkflowExecutionHistoryResponse, error) {
25✔
4063
        entry, err := wh.GetDomainCache().GetDomainByID(domainID)
25✔
4064
        if err != nil {
26✔
4065
                return nil, err
1✔
4066
        }
1✔
4067

4068
        URIString := entry.GetConfig().HistoryArchivalURI
24✔
4069
        if URIString == "" {
25✔
4070
                // if URI is empty, it means the domain has never enabled for archival.
1✔
4071
                // the error is not "workflow has passed retention period", because
1✔
4072
                // we have no way to tell if the requested workflow exists or not.
1✔
4073
                return nil, validate.ErrHistoryNotFound
1✔
4074
        }
1✔
4075

4076
        URI, err := archiver.NewURI(URIString)
23✔
4077
        if err != nil {
24✔
4078
                return nil, err
1✔
4079
        }
1✔
4080

4081
        historyArchiver, err := wh.GetArchiverProvider().GetHistoryArchiver(URI.Scheme(), service.Frontend)
22✔
4082
        if err != nil {
22✔
4083
                return nil, err
×
4084
        }
×
4085

4086
        resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{
22✔
4087
                DomainID:      domainID,
22✔
4088
                WorkflowID:    request.GetExecution().GetWorkflowID(),
22✔
4089
                RunID:         request.GetExecution().GetRunID(),
22✔
4090
                NextPageToken: request.GetNextPageToken(),
22✔
4091
                PageSize:      int(request.GetMaximumPageSize()),
22✔
4092
        })
22✔
4093
        if err != nil {
22✔
UNCOV
4094
                return nil, err
×
UNCOV
4095
        }
×
4096

4097
        history := &types.History{}
22✔
4098
        for _, batch := range resp.HistoryBatches {
279✔
4099
                history.Events = append(history.Events, batch.Events...)
257✔
4100
        }
257✔
4101
        return &types.GetWorkflowExecutionHistoryResponse{
22✔
4102
                History:       history,
22✔
4103
                NextPageToken: resp.NextPageToken,
22✔
4104
                Archived:      true,
22✔
4105
        }, nil
22✔
4106
}
4107

4108
func (wh *WorkflowHandler) convertIndexedKeyToThrift(keys map[string]interface{}) map[string]types.IndexedValueType {
3✔
4109
        converted := make(map[string]types.IndexedValueType)
3✔
4110
        for k, v := range keys {
51✔
4111
                converted[k] = common.ConvertIndexedValueTypeToInternalType(v, wh.GetLogger())
48✔
4112
        }
48✔
4113
        return converted
2✔
4114
}
4115

4116
func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain string) bool {
316✔
4117
        return common.IsAdvancedVisibilityReadingEnabled(wh.config.EnableReadVisibilityFromES(domain), wh.config.IsAdvancedVisConfigExist) &&
316✔
4118
                pageSize > int32(wh.config.ESIndexMaxResultWindow())
316✔
4119
}
316✔
4120

4121
// GetClusterInfo return information about cadence deployment
4122
func (wh *WorkflowHandler) GetClusterInfo(
4123
        ctx context.Context,
4124
) (resp *types.ClusterInfo, err error) {
×
4125
        return &types.ClusterInfo{
×
4126
                SupportedClientVersions: &types.SupportedClientVersions{
×
4127
                        GoSdk:   client.SupportedGoSDKVersion,
×
4128
                        JavaSdk: client.SupportedJavaSDKVersion,
×
4129
                },
×
4130
        }, nil
×
4131
}
×
4132

4133
func checkFailOverPermission(config *config.Config, domainName string) error {
2✔
4134
        if config.Lockdown(domainName) {
3✔
4135
                return validate.ErrDomainInLockdown
1✔
4136
        }
1✔
4137
        return nil
1✔
4138
}
4139

4140
type domainWrapper struct {
4141
        domain string
4142
}
4143

4144
func (d domainWrapper) GetDomain() string {
1,812✔
4145
        return d.domain
1,812✔
4146
}
1,812✔
4147

4148
func (hs HealthStatus) String() string {
3✔
4149
        switch hs {
3✔
4150
        case HealthStatusOK:
1✔
4151
                return "OK"
1✔
4152
        case HealthStatusWarmingUp:
1✔
4153
                return "WarmingUp"
1✔
4154
        case HealthStatusShuttingDown:
1✔
4155
                return "ShuttingDown"
1✔
4156
        default:
×
4157
                return "unknown"
×
4158
        }
4159
}
4160

4161
func getDomainWfIDRunIDTags(
4162
        domainName string,
4163
        wf *types.WorkflowExecution,
4164
) []tag.Tag {
1,512✔
4165
        tags := []tag.Tag{tag.WorkflowDomainName(domainName)}
1,512✔
4166
        if wf == nil {
3,024✔
4167
                return tags
1,512✔
4168
        }
1,512✔
4169
        return append(
×
4170
                tags,
×
4171
                tag.WorkflowID(wf.GetWorkflowID()),
×
4172
                tag.WorkflowRunID(wf.GetRunID()),
×
4173
        )
×
4174
}
4175

4176
func checkRequiredDomainDataKVs(requiredDomainDataKeys map[string]interface{}, domainData map[string]string) error {
51✔
4177
        // check requiredDomainDataKeys
51✔
4178
        for k := range requiredDomainDataKeys {
52✔
4179
                _, ok := domainData[k]
1✔
4180
                if !ok {
2✔
4181
                        return fmt.Errorf("domain data error, missing required key %v . All required keys: %v", k, requiredDomainDataKeys)
1✔
4182
                }
1✔
4183
        }
4184
        return nil
50✔
4185
}
4186

4187
// Some error types are introduced later that some clients might not support
4188
// To make them backward compatible, we continue returning the legacy error types
4189
// for older clients
4190
func (wh *WorkflowHandler) normalizeVersionedErrors(ctx context.Context, err error) error {
76✔
4191
        switch err.(type) {
76✔
4192
        case *types.WorkflowExecutionAlreadyCompletedError:
21✔
4193
                call := yarpc.CallFromContext(ctx)
21✔
4194
                clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
21✔
4195
                clientImpl := call.Header(common.ClientImplHeaderName)
21✔
4196
                featureFlags := client.GetFeatureFlagsFromHeader(call)
21✔
4197

21✔
4198
                vErr := wh.versionChecker.SupportsWorkflowAlreadyCompletedError(clientImpl, clientFeatureVersion, featureFlags)
21✔
4199
                if vErr == nil {
24✔
4200
                        return err
3✔
4201
                }
3✔
4202
                return &types.EntityNotExistsError{Message: "Workflow execution already completed."}
18✔
4203
        default:
55✔
4204
                return err
55✔
4205
        }
4206
}
4207
func constructRestartWorkflowRequest(w *types.WorkflowExecutionStartedEventAttributes, domain string, identity string, workflowID string) *types.StartWorkflowExecutionRequest {
1✔
4208

1✔
4209
        startRequest := &types.StartWorkflowExecutionRequest{
1✔
4210
                RequestID:  uuid.New().String(),
1✔
4211
                Domain:     domain,
1✔
4212
                WorkflowID: workflowID,
1✔
4213
                WorkflowType: &types.WorkflowType{
1✔
4214
                        Name: w.WorkflowType.Name,
1✔
4215
                },
1✔
4216
                TaskList: &types.TaskList{
1✔
4217
                        Name: w.TaskList.Name,
1✔
4218
                },
1✔
4219
                Input:                               w.Input,
1✔
4220
                ExecutionStartToCloseTimeoutSeconds: w.ExecutionStartToCloseTimeoutSeconds,
1✔
4221
                TaskStartToCloseTimeoutSeconds:      w.TaskStartToCloseTimeoutSeconds,
1✔
4222
                Identity:                            identity,
1✔
4223
                WorkflowIDReusePolicy:               types.WorkflowIDReusePolicyTerminateIfRunning.Ptr(),
1✔
4224
        }
1✔
4225
        startRequest.CronSchedule = w.CronSchedule
1✔
4226
        startRequest.RetryPolicy = w.RetryPolicy
1✔
4227
        startRequest.DelayStartSeconds = w.FirstDecisionTaskBackoffSeconds
1✔
4228
        startRequest.Header = w.Header
1✔
4229
        startRequest.Memo = w.Memo
1✔
4230
        startRequest.SearchAttributes = w.SearchAttributes
1✔
4231

1✔
4232
        return startRequest
1✔
4233
}
1✔
4234

4235
func getMetricsScopeWithDomain(
4236
        scope int,
4237
        d domainGetter,
4238
        metricsClient metrics.Client,
4239
) metrics.Scope {
5,938✔
4240
        var metricsScope metrics.Scope
5,938✔
4241
        if d != nil {
11,876✔
4242
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainTag(d.GetDomain()))
5,938✔
4243
        } else {
5,938✔
4244
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainUnknownTag())
×
4245
        }
×
4246
        return metricsScope
5,938✔
4247
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc