• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018eedc4-642b-49ce-be74-b8649f645074

17 Apr 2024 08:34PM UTC coverage: 67.406% (-0.004%) from 67.41%
018eedc4-642b-49ce-be74-b8649f645074

push

buildkite

web-flow
Bugfix for enumer in go 1.22 (#5915)

Enumer is currently crashing if `make go-generate` is run with Go 1.22: https://github.com/alvaroloes/enumer/issues/71
Temporal helpfully found an easy fix: https://github.com/temporalio/sdk-go/issues/1382
For the underlying cause: https://github.com/golang/go/issues/62167
So I've mimicked that by just doing a `go get golang.org/x/tools@latest` in the tools-module.

And now `make clean` -> `GOTOOLCHAIN=[go1.20.1 or go1.22.1] make go-generate` both work correctly.
(you need Go 1.21 or newer to use GOTOOLCHAIN.  highly recommended!)

Since this only affects build-time tools and doesn't change any generated code, it seems trivially safe, but I have not checked what all has changed in golang.org/x/tools across these versions.

98661 of 146368 relevant lines covered (67.41%)

2380.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.98
/service/frontend/api/handler.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package api
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "fmt"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/google/uuid"
31
        "go.uber.org/yarpc"
32
        "golang.org/x/sync/errgroup"
33

34
        "github.com/uber/cadence/.gen/go/shared"
35
        "github.com/uber/cadence/.gen/go/sqlblobs"
36
        "github.com/uber/cadence/common"
37
        "github.com/uber/cadence/common/archiver"
38
        "github.com/uber/cadence/common/backoff"
39
        "github.com/uber/cadence/common/cache"
40
        "github.com/uber/cadence/common/client"
41
        "github.com/uber/cadence/common/codec"
42
        "github.com/uber/cadence/common/domain"
43
        "github.com/uber/cadence/common/elasticsearch/validator"
44
        "github.com/uber/cadence/common/log"
45
        "github.com/uber/cadence/common/log/tag"
46
        "github.com/uber/cadence/common/metrics"
47
        "github.com/uber/cadence/common/partition"
48
        "github.com/uber/cadence/common/persistence"
49
        persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
50
        "github.com/uber/cadence/common/resource"
51
        "github.com/uber/cadence/common/service"
52
        "github.com/uber/cadence/common/types"
53
        "github.com/uber/cadence/common/types/mapper/thrift"
54
        "github.com/uber/cadence/service/frontend/config"
55
        "github.com/uber/cadence/service/frontend/validate"
56
)
57

58
const (
59
        // HealthStatusOK is used when this node is healthy and rpc requests are allowed
60
        HealthStatusOK HealthStatus = iota + 1
61
        // HealthStatusWarmingUp is used when the rpc handler is warming up
62
        HealthStatusWarmingUp
63
        // HealthStatusShuttingDown is used when the rpc handler is shutting down
64
        HealthStatusShuttingDown
65
)
66

67
var _ Handler = (*WorkflowHandler)(nil)
68

69
type (
70
        // WorkflowHandler - Thrift handler interface for workflow service
71
        WorkflowHandler struct {
72
                resource.Resource
73

74
                shuttingDown              int32
75
                healthStatus              int32
76
                tokenSerializer           common.TaskTokenSerializer
77
                config                    *config.Config
78
                versionChecker            client.VersionChecker
79
                domainHandler             domain.Handler
80
                visibilityQueryValidator  *validator.VisibilityQueryValidator
81
                searchAttributesValidator *validator.SearchAttributesValidator
82
                throttleRetry             *backoff.ThrottleRetry
83
                producerManager           ProducerManager
84
                thriftrwEncoder           codec.BinaryEncoder
85
        }
86

87
        getHistoryContinuationToken struct {
88
                RunID             string
89
                FirstEventID      int64
90
                NextEventID       int64
91
                IsWorkflowRunning bool
92
                PersistenceToken  []byte
93
                TransientDecision *types.TransientDecisionInfo
94
                BranchToken       []byte
95
        }
96

97
        domainGetter interface {
98
                GetDomain() string
99
        }
100

101
        // HealthStatus is an enum that refers to the rpc handler health status
102
        HealthStatus int32
103
)
104

105
var (
106
        frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
107
)
108

109
// NewWorkflowHandler creates a thrift handler for the cadence service
110
func NewWorkflowHandler(
111
        resource resource.Resource,
112
        config *config.Config,
113
        versionChecker client.VersionChecker,
114
        domainHandler domain.Handler,
115
) *WorkflowHandler {
87✔
116
        return &WorkflowHandler{
87✔
117
                Resource:        resource,
87✔
118
                config:          config,
87✔
119
                healthStatus:    int32(HealthStatusWarmingUp),
87✔
120
                tokenSerializer: common.NewJSONTaskTokenSerializer(),
87✔
121
                versionChecker:  versionChecker,
87✔
122
                domainHandler:   domainHandler,
87✔
123
                visibilityQueryValidator: validator.NewQueryValidator(
87✔
124
                        config.ValidSearchAttributes,
87✔
125
                        config.EnableQueryAttributeValidation,
87✔
126
                ),
87✔
127
                searchAttributesValidator: validator.NewSearchAttributesValidator(
87✔
128
                        resource.GetLogger(),
87✔
129
                        config.EnableQueryAttributeValidation,
87✔
130
                        config.ValidSearchAttributes,
87✔
131
                        config.SearchAttributesNumberOfKeysLimit,
87✔
132
                        config.SearchAttributesSizeOfValueLimit,
87✔
133
                        config.SearchAttributesTotalSizeLimit,
87✔
134
                ),
87✔
135
                throttleRetry: backoff.NewThrottleRetry(
87✔
136
                        backoff.WithRetryPolicy(frontendServiceRetryPolicy),
87✔
137
                        backoff.WithRetryableError(common.IsServiceTransientError),
87✔
138
                ),
87✔
139
                producerManager: NewProducerManager(
87✔
140
                        resource.GetDomainCache(),
87✔
141
                        resource.GetAsyncWorkflowQueueProvider(),
87✔
142
                        resource.GetLogger(),
87✔
143
                        resource.GetMetricsClient(),
87✔
144
                ),
87✔
145
                thriftrwEncoder: codec.NewThriftRWEncoder(),
87✔
146
        }
87✔
147
}
87✔
148

149
// Start starts the handler
150
func (wh *WorkflowHandler) Start() {
18✔
151
        // TODO: Get warmup duration from config. Even better, run proactive checks such as probing downstream connections.
18✔
152
        const warmUpDuration = 30 * time.Second
18✔
153

18✔
154
        warmupTimer := time.NewTimer(warmUpDuration)
18✔
155
        go func() {
36✔
156
                <-warmupTimer.C
18✔
157
                wh.GetLogger().Warn("Service warmup duration has elapsed.")
18✔
158
                if atomic.CompareAndSwapInt32(&wh.healthStatus, int32(HealthStatusWarmingUp), int32(HealthStatusOK)) {
30✔
159
                        wh.GetLogger().Warn("Warmup time has elapsed. Service is healthy.")
12✔
160
                } else {
12✔
161
                        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
×
162
                        wh.GetLogger().Warn(fmt.Sprintf("Warmup time has elapsed. Service status is: %v", status.String()))
×
163
                }
×
164
        }()
165
}
166

167
// Stop stops the handler
168
func (wh *WorkflowHandler) Stop() {
18✔
169
        atomic.StoreInt32(&wh.shuttingDown, 1)
18✔
170
}
18✔
171

172
// UpdateHealthStatus sets the health status for this rpc handler.
173
// This health status will be used within the rpc health check handler
174
func (wh *WorkflowHandler) UpdateHealthStatus(status HealthStatus) {
19✔
175
        atomic.StoreInt32(&wh.healthStatus, int32(status))
19✔
176
}
19✔
177

178
func (wh *WorkflowHandler) isShuttingDown() bool {
6,477✔
179
        return atomic.LoadInt32(&wh.shuttingDown) != 0
6,477✔
180
}
6,477✔
181

182
// Health is for health check
183
func (wh *WorkflowHandler) Health(ctx context.Context) (*types.HealthStatus, error) {
2✔
184
        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
2✔
185
        msg := status.String()
2✔
186

2✔
187
        if status != HealthStatusOK {
3✔
188
                wh.GetLogger().Warn(fmt.Sprintf("Service status is: %v", msg))
1✔
189
        }
1✔
190

191
        return &types.HealthStatus{
2✔
192
                Ok:  status == HealthStatusOK,
2✔
193
                Msg: msg,
2✔
194
        }, nil
2✔
195
}
196

197
// RegisterDomain creates a new domain which can be used as a container for all resources.  Domain is a top level
198
// entity within Cadence, used as a container for all resources like workflow executions, tasklists, etc.  Domain
199
// acts as a sandbox and provides isolation for all resources within the domain.  All resources belongs to exactly one
200
// domain.
201
func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *types.RegisterDomainRequest) (retError error) {
48✔
202
        if wh.isShuttingDown() {
48✔
203
                return validate.ErrShuttingDown
×
204
        }
×
205

206
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
48✔
207
                return err
×
208
        }
×
209

210
        if registerRequest == nil {
48✔
211
                return validate.ErrRequestNotSet
×
212
        }
×
213

214
        if registerRequest.GetWorkflowExecutionRetentionPeriodInDays() > int32(wh.config.DomainConfig.MaxRetentionDays()) {
48✔
215
                return validate.ErrInvalidRetention
×
216
        }
×
217

218
        if err := validate.CheckPermission(wh.config, registerRequest.SecurityToken); err != nil {
48✔
219
                return err
×
220
        }
×
221

222
        if err := checkRequiredDomainDataKVs(wh.config.DomainConfig.RequiredDomainDataKeys(), registerRequest.GetData()); err != nil {
49✔
223
                return err
1✔
224
        }
1✔
225

226
        if registerRequest.GetName() == "" {
47✔
227
                return validate.ErrDomainNotSet
×
228
        }
×
229

230
        return wh.domainHandler.RegisterDomain(ctx, registerRequest)
47✔
231
}
232

233
// ListDomains returns the information and configuration for a registered domain.
234
func (wh *WorkflowHandler) ListDomains(
235
        ctx context.Context,
236
        listRequest *types.ListDomainsRequest,
237
) (response *types.ListDomainsResponse, retError error) {
2✔
238
        if wh.isShuttingDown() {
2✔
239
                return nil, validate.ErrShuttingDown
×
240
        }
×
241

242
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
243
                return nil, err
×
244
        }
×
245

246
        if listRequest == nil {
3✔
247
                return nil, validate.ErrRequestNotSet
1✔
248
        }
1✔
249

250
        return wh.domainHandler.ListDomains(ctx, listRequest)
1✔
251
}
252

253
// DescribeDomain returns the information and configuration for a registered domain.
254
func (wh *WorkflowHandler) DescribeDomain(
255
        ctx context.Context,
256
        describeRequest *types.DescribeDomainRequest,
257
) (response *types.DescribeDomainResponse, retError error) {
134✔
258
        if wh.isShuttingDown() {
134✔
259
                return nil, validate.ErrShuttingDown
×
260
        }
×
261

262
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
134✔
263
                return nil, err
×
264
        }
×
265

266
        if describeRequest == nil {
134✔
267
                return nil, validate.ErrRequestNotSet
×
268
        }
×
269

270
        if describeRequest.GetName() == "" && describeRequest.GetUUID() == "" {
134✔
271
                return nil, validate.ErrDomainNotSet
×
272
        }
×
273

274
        resp, err := wh.domainHandler.DescribeDomain(ctx, describeRequest)
134✔
275
        if err != nil {
134✔
276
                return nil, err
×
277
        }
×
278

279
        if resp.GetFailoverInfo() != nil && resp.GetFailoverInfo().GetFailoverExpireTimestamp() > 0 {
134✔
280
                // fetch ongoing failover info from history service
×
281
                failoverResp, err := wh.GetHistoryClient().GetFailoverInfo(ctx, &types.GetFailoverInfoRequest{
×
282
                        DomainID: resp.GetDomainInfo().UUID,
×
283
                })
×
284
                if err != nil {
×
285
                        // despite the error from history, return describe domain response
×
286
                        wh.GetLogger().Error(
×
287
                                fmt.Sprintf("Failed to get failover info for domain %s", resp.DomainInfo.GetName()),
×
288
                                tag.Error(err),
×
289
                        )
×
290
                        return resp, nil
×
291
                }
×
292
                resp.FailoverInfo.CompletedShardCount = failoverResp.GetCompletedShardCount()
×
293
                resp.FailoverInfo.PendingShards = failoverResp.GetPendingShards()
×
294
        }
295
        return resp, nil
134✔
296
}
297

298
// UpdateDomain is used to update the information and configuration for a registered domain.
299
func (wh *WorkflowHandler) UpdateDomain(
300
        ctx context.Context,
301
        updateRequest *types.UpdateDomainRequest,
302
) (resp *types.UpdateDomainResponse, retError error) {
9✔
303
        domainName := ""
9✔
304
        if updateRequest != nil {
18✔
305
                domainName = updateRequest.GetName()
9✔
306
        }
9✔
307

308
        logger := wh.GetLogger().WithTags(
9✔
309
                tag.WorkflowDomainName(domainName),
9✔
310
                tag.OperationName("DomainUpdate"))
9✔
311

9✔
312
        if updateRequest == nil {
9✔
313
                logger.Error("Nil domain update request.",
×
314
                        tag.Error(validate.ErrRequestNotSet))
×
315
                return nil, validate.ErrRequestNotSet
×
316
        }
×
317

318
        isFailover := isFailoverRequest(updateRequest)
9✔
319
        isGraceFailover := isGraceFailoverRequest(updateRequest)
9✔
320
        logger.Info(fmt.Sprintf(
9✔
321
                "Domain Update requested. isFailover: %v, isGraceFailover: %v, Request: %#v.",
9✔
322
                isFailover,
9✔
323
                isGraceFailover,
9✔
324
                updateRequest))
9✔
325

9✔
326
        if wh.isShuttingDown() {
9✔
327
                logger.Error("Won't apply the domain update since workflowHandler is shutting down.",
×
328
                        tag.Error(validate.ErrShuttingDown))
×
329
                return nil, validate.ErrShuttingDown
×
330
        }
×
331

332
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
9✔
333
                logger.Error("Won't apply the domain update since client version is not supported.",
×
334
                        tag.Error(err))
×
335
                return nil, err
×
336
        }
×
337

338
        // don't require permission for failover request
339
        if isFailover {
11✔
340
                // reject the failover if the cluster is in lockdown
2✔
341
                if err := checkFailOverPermission(wh.config, updateRequest.GetName()); err != nil {
3✔
342
                        logger.Error("Domain failover request rejected since domain is in lockdown.",
1✔
343
                                tag.Error(err))
1✔
344
                        return nil, err
1✔
345
                }
1✔
346
        } else {
7✔
347
                if err := validate.CheckPermission(wh.config, updateRequest.SecurityToken); err != nil {
7✔
348
                        logger.Error("Domain update request rejected due to failing permissions.",
×
349
                                tag.Error(err))
×
350
                        return nil, err
×
351
                }
×
352
        }
353

354
        if isGraceFailover {
9✔
355
                if err := wh.checkOngoingFailover(
1✔
356
                        ctx,
1✔
357
                        &updateRequest.Name,
1✔
358
                ); err != nil {
1✔
359
                        logger.Error("Graceful domain failover request failed. Not able to check ongoing failovers.",
×
360
                                tag.Error(err))
×
361
                        return nil, err
×
362
                }
×
363
        }
364

365
        if updateRequest.GetName() == "" {
8✔
366
                logger.Error("Domain not set on request.",
×
367
                        tag.Error(validate.ErrDomainNotSet))
×
368
                return nil, validate.ErrDomainNotSet
×
369
        }
×
370
        // TODO: call remote clusters to verify domain data
371
        resp, err := wh.domainHandler.UpdateDomain(ctx, updateRequest)
8✔
372
        if err != nil {
10✔
373
                logger.Error("Domain update operation failed.",
2✔
374
                        tag.Error(err))
2✔
375
                return nil, err
2✔
376
        }
2✔
377
        logger.Info("Domain update operation succeeded.")
6✔
378
        return resp, nil
6✔
379
}
380

381
// DeprecateDomain us used to update status of a registered domain to DEPRECATED. Once the domain is deprecated
382
// it cannot be used to start new workflow executions.  Existing workflow executions will continue to run on
383
// deprecated domains.
384
func (wh *WorkflowHandler) DeprecateDomain(ctx context.Context, deprecateRequest *types.DeprecateDomainRequest) (retError error) {
×
385
        if wh.isShuttingDown() {
×
386
                return validate.ErrShuttingDown
×
387
        }
×
388

389
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
390
                return err
×
391
        }
×
392

393
        if deprecateRequest == nil {
×
394
                return validate.ErrRequestNotSet
×
395
        }
×
396

397
        if err := validate.CheckPermission(wh.config, deprecateRequest.SecurityToken); err != nil {
×
398
                return err
×
399
        }
×
400

401
        if deprecateRequest.GetName() == "" {
×
402
                return validate.ErrDomainNotSet
×
403
        }
×
404

405
        return wh.domainHandler.DeprecateDomain(ctx, deprecateRequest)
×
406
}
407

408
// PollForActivityTask - Poll for an activity task.
409
func (wh *WorkflowHandler) PollForActivityTask(
410
        ctx context.Context,
411
        pollRequest *types.PollForActivityTaskRequest,
412
) (resp *types.PollForActivityTaskResponse, retError error) {
703✔
413
        callTime := time.Now()
703✔
414

703✔
415
        if wh.isShuttingDown() {
703✔
416
                return nil, validate.ErrShuttingDown
×
417
        }
×
418

419
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
703✔
420
                return nil, err
×
421
        }
×
422

423
        if pollRequest == nil {
703✔
424
                return nil, validate.ErrRequestNotSet
×
425
        }
×
426

427
        domainName := pollRequest.GetDomain()
703✔
428
        if domainName == "" {
703✔
429
                return nil, validate.ErrDomainNotSet
×
430
        }
×
431

432
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForActivityTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
703✔
433
        wh.GetLogger().Debug("Received PollForActivityTask")
703✔
434
        if err := common.ValidateLongPollContextTimeout(
703✔
435
                ctx,
703✔
436
                "PollForActivityTask",
703✔
437
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
703✔
438
        ); err != nil {
705✔
439
                return nil, err
2✔
440
        }
2✔
441

442
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
701✔
443
        if !common.IsValidIDLength(
701✔
444
                domainName,
701✔
445
                scope,
701✔
446
                idLengthWarnLimit,
701✔
447
                wh.config.DomainNameMaxLength(domainName),
701✔
448
                metrics.CadenceErrDomainNameExceededWarnLimit,
701✔
449
                domainName,
701✔
450
                wh.GetLogger(),
701✔
451
                tag.IDTypeDomainName) {
701✔
452
                return nil, validate.ErrDomainTooLong
×
453
        }
×
454

455
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
701✔
456
                return nil, err
×
457
        }
×
458

459
        if !common.IsValidIDLength(
701✔
460
                pollRequest.GetIdentity(),
701✔
461
                scope,
701✔
462
                idLengthWarnLimit,
701✔
463
                wh.config.IdentityMaxLength(domainName),
701✔
464
                metrics.CadenceErrIdentityExceededWarnLimit,
701✔
465
                domainName,
701✔
466
                wh.GetLogger(),
701✔
467
                tag.IDTypeIdentity) {
701✔
468
                return nil, validate.ErrIdentityTooLong
×
469
        }
×
470

471
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
701✔
472
        if err != nil {
963✔
473
                return nil, err
262✔
474
        }
262✔
475

476
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
439✔
477
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
440✔
478
                return &types.PollForActivityTaskResponse{}, nil
1✔
479
        }
1✔
480
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
481
        // in this case, return an empty response
482
        if err := common.ValidateLongPollContextTimeout(
438✔
483
                ctx,
438✔
484
                "PollForActivityTask",
438✔
485
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
438✔
486
        ); err != nil {
438✔
487
                return &types.PollForActivityTaskResponse{}, nil
×
488
        }
×
489
        pollerID := uuid.New().String()
438✔
490
        op := func() error {
876✔
491
                resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{
438✔
492
                        DomainUUID:     domainID,
438✔
493
                        PollerID:       pollerID,
438✔
494
                        PollRequest:    pollRequest,
438✔
495
                        IsolationGroup: isolationGroup,
438✔
496
                })
438✔
497
                return err
438✔
498
        }
438✔
499

500
        err = wh.throttleRetry.Do(ctx, op)
438✔
501
        if err != nil {
504✔
502
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeActivity, pollRequest.TaskList, pollerID)
66✔
503
                if err != nil {
66✔
504
                        // For all other errors log an error and return it back to client.
×
505
                        ctxTimeout := "not-set"
×
506
                        ctxDeadline, ok := ctx.Deadline()
×
507
                        if ok {
×
508
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
509
                        }
×
510
                        wh.GetLogger().Error("PollForActivityTask failed.",
×
511
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
512
                                tag.Value(ctxTimeout),
×
513
                                tag.Error(err))
×
514
                        return nil, err
×
515
                }
516
        }
517
        return resp, nil
438✔
518
}
519

520
// PollForDecisionTask - Poll for a decision task.
521
func (wh *WorkflowHandler) PollForDecisionTask(
522
        ctx context.Context,
523
        pollRequest *types.PollForDecisionTaskRequest,
524
) (resp *types.PollForDecisionTaskResponse, retError error) {
1,478✔
525
        callTime := time.Now()
1,478✔
526

1,478✔
527
        if wh.isShuttingDown() {
1,478✔
528
                return nil, validate.ErrShuttingDown
×
529
        }
×
530

531
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1,478✔
532
                return nil, err
×
533
        }
×
534

535
        if pollRequest == nil {
1,478✔
536
                return nil, validate.ErrRequestNotSet
×
537
        }
×
538

539
        domainName := pollRequest.GetDomain()
1,478✔
540
        tags := getDomainWfIDRunIDTags(domainName, nil)
1,478✔
541

1,478✔
542
        if domainName == "" {
1,478✔
543
                return nil, validate.ErrDomainNotSet
×
544
        }
×
545

546
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForDecisionTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1,478✔
547
        wh.GetLogger().Debug("Received PollForDecisionTask")
1,478✔
548
        if err := common.ValidateLongPollContextTimeout(
1,478✔
549
                ctx,
1,478✔
550
                "PollForDecisionTask",
1,478✔
551
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
1,478✔
552
        ); err != nil {
1,480✔
553
                return nil, err
2✔
554
        }
2✔
555

556
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
1,476✔
557
        if !common.IsValidIDLength(
1,476✔
558
                domainName,
1,476✔
559
                scope,
1,476✔
560
                idLengthWarnLimit,
1,476✔
561
                wh.config.DomainNameMaxLength(domainName),
1,476✔
562
                metrics.CadenceErrDomainNameExceededWarnLimit,
1,476✔
563
                domainName,
1,476✔
564
                wh.GetLogger(),
1,476✔
565
                tag.IDTypeDomainName) {
1,476✔
566
                return nil, validate.ErrDomainTooLong
×
567
        }
×
568

569
        if !common.IsValidIDLength(
1,476✔
570
                pollRequest.GetIdentity(),
1,476✔
571
                scope,
1,476✔
572
                idLengthWarnLimit,
1,476✔
573
                wh.config.IdentityMaxLength(domainName),
1,476✔
574
                metrics.CadenceErrIdentityExceededWarnLimit,
1,476✔
575
                domainName,
1,476✔
576
                wh.GetLogger(),
1,476✔
577
                tag.IDTypeIdentity) {
1,476✔
578
                return nil, validate.ErrIdentityTooLong
×
579
        }
×
580

581
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
1,476✔
582
                return nil, err
×
583
        }
×
584

585
        domainEntry, err := wh.GetDomainCache().GetDomain(domainName)
1,476✔
586
        if err != nil {
1,737✔
587
                return nil, err
261✔
588
        }
261✔
589
        domainID := domainEntry.GetInfo().ID
1,215✔
590

1,215✔
591
        wh.GetLogger().Debug("Poll for decision.", tag.WorkflowDomainName(domainName), tag.WorkflowDomainID(domainID))
1,215✔
592
        if err := wh.checkBadBinary(domainEntry, pollRequest.GetBinaryChecksum()); err != nil {
1,215✔
593
                return nil, err
×
594
        }
×
595

596
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
1,215✔
597
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
1,216✔
598
                return &types.PollForDecisionTaskResponse{}, nil
1✔
599
        }
1✔
600
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
601
        // in this case, return an empty response
602
        if err := common.ValidateLongPollContextTimeout(
1,214✔
603
                ctx,
1,214✔
604
                "PollForDecisionTask",
1,214✔
605
                wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
1,214✔
606
        ); err != nil {
1,214✔
607
                return &types.PollForDecisionTaskResponse{}, nil
×
608
        }
×
609

610
        pollerID := uuid.New().String()
1,214✔
611
        var matchingResp *types.MatchingPollForDecisionTaskResponse
1,214✔
612
        op := func() error {
2,428✔
613
                matchingResp, err = wh.GetMatchingClient().PollForDecisionTask(ctx, &types.MatchingPollForDecisionTaskRequest{
1,214✔
614
                        DomainUUID:     domainID,
1,214✔
615
                        PollerID:       pollerID,
1,214✔
616
                        PollRequest:    pollRequest,
1,214✔
617
                        IsolationGroup: isolationGroup,
1,214✔
618
                })
1,214✔
619
                return err
1,214✔
620
        }
1,214✔
621

622
        err = wh.throttleRetry.Do(ctx, op)
1,214✔
623
        if err != nil {
1,280✔
624
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeDecision, pollRequest.TaskList, pollerID)
66✔
625
                if err != nil {
66✔
626
                        // For all other errors log an error and return it back to client.
×
627
                        ctxTimeout := "not-set"
×
628
                        ctxDeadline, ok := ctx.Deadline()
×
629
                        if ok {
×
630
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
631
                        }
×
632
                        wh.GetLogger().Error("PollForDecisionTask failed.",
×
633
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
634
                                tag.Value(ctxTimeout),
×
635
                                tag.Error(err))
×
636
                        return nil, err
×
637
                }
638

639
                // Must be cancellation error.  Does'nt matter what we return here.  Client already went away.
640
                return nil, nil
66✔
641
        }
642

643
        tags = append(tags, []tag.Tag{tag.WorkflowID(
1,148✔
644
                matchingResp.GetWorkflowExecution().GetWorkflowID()),
1,148✔
645
                tag.WorkflowRunID(matchingResp.GetWorkflowExecution().GetRunID())}...)
1,148✔
646
        resp, err = wh.createPollForDecisionTaskResponse(ctx, scope, domainID, matchingResp, matchingResp.GetBranchToken())
1,148✔
647
        if err != nil {
1,148✔
648
                return nil, err
×
649
        }
×
650
        return resp, nil
1,148✔
651
}
652

653
func (wh *WorkflowHandler) getIsolationGroup(ctx context.Context, domainName string) string {
2,896✔
654
        return partition.IsolationGroupFromContext(ctx)
2,896✔
655
}
2,896✔
656

657
func (wh *WorkflowHandler) getPartitionConfig(ctx context.Context, domainName string) map[string]string {
511✔
658
        return partition.ConfigFromContext(ctx)
511✔
659
}
511✔
660

661
func (wh *WorkflowHandler) isIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,242✔
662
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,244✔
663
                isDrained, err := wh.GetIsolationGroupState().IsDrained(ctx, domainName, isolationGroup)
2✔
664
                if err != nil {
2✔
665
                        wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
666
                        return true
×
667
                }
×
668
                return !isDrained
2✔
669
        }
670
        return true
1,240✔
671
}
672

673
func (wh *WorkflowHandler) waitUntilIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,654✔
674
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,656✔
675
                ticker := time.NewTicker(time.Second * 30)
2✔
676
                defer ticker.Stop()
2✔
677
                childCtx, cancel := common.CreateChildContext(ctx, 0.05)
2✔
678
                defer cancel()
2✔
679
                for {
4✔
680
                        isDrained, err := wh.GetIsolationGroupState().IsDrained(childCtx, domainName, isolationGroup)
2✔
681
                        if err != nil {
2✔
682
                                wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
683
                                return true
×
684
                        }
×
685
                        if !isDrained {
2✔
686
                                break
×
687
                        }
688
                        select {
2✔
689
                        case <-childCtx.Done():
2✔
690
                                return false
2✔
691
                        case <-ticker.C:
×
692
                        }
693
                }
694
        }
695
        return true
1,652✔
696
}
697

698
func (wh *WorkflowHandler) checkBadBinary(domainEntry *cache.DomainCacheEntry, binaryChecksum string) error {
1,215✔
699
        if domainEntry.GetConfig().BadBinaries.Binaries != nil {
2,429✔
700
                badBinaries := domainEntry.GetConfig().BadBinaries.Binaries
1,214✔
701
                _, ok := badBinaries[binaryChecksum]
1,214✔
702
                if ok {
1,214✔
703
                        wh.GetMetricsClient().IncCounter(metrics.FrontendPollForDecisionTaskScope, metrics.CadenceErrBadBinaryCounter)
×
704
                        return &types.BadRequestError{
×
705
                                Message: fmt.Sprintf("binary %v already marked as bad deployment", binaryChecksum),
×
706
                        }
×
707
                }
×
708
        }
709
        return nil
1,215✔
710
}
711

712
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error, domainID string, taskListType int32,
713
        taskList *types.TaskList, pollerID string) error {
132✔
714
        // First check if this err is due to context cancellation.  This means client connection to frontend is closed.
132✔
715
        if ctx.Err() == context.Canceled {
264✔
716
                // Our rpc stack does not propagates context cancellation to the other service.  Lets make an explicit
132✔
717
                // call to matching to notify this poller is gone to prevent any tasks being dispatched to zombie pollers.
132✔
718
                err = wh.GetMatchingClient().CancelOutstandingPoll(context.Background(), &types.CancelOutstandingPollRequest{
132✔
719
                        DomainUUID:   domainID,
132✔
720
                        TaskListType: common.Int32Ptr(taskListType),
132✔
721
                        TaskList:     taskList,
132✔
722
                        PollerID:     pollerID,
132✔
723
                })
132✔
724
                // We can not do much if this call fails.  Just log the error and move on
132✔
725
                if err != nil {
132✔
726
                        wh.GetLogger().Warn("Failed to cancel outstanding poller.",
×
727
                                tag.WorkflowTaskListName(taskList.GetName()), tag.Error(err))
×
728
                }
×
729

730
                // Clear error as we don't want to report context cancellation error to count against our SLA
731
                return nil
132✔
732
        }
733

734
        return err
×
735
}
736

737
// RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
738
func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
739
        ctx context.Context,
740
        heartbeatRequest *types.RecordActivityTaskHeartbeatRequest,
741
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
384✔
742
        if wh.isShuttingDown() {
384✔
743
                return nil, validate.ErrShuttingDown
×
744
        }
×
745

746
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
384✔
747
                return nil, err
×
748
        }
×
749

750
        if heartbeatRequest == nil {
385✔
751
                return nil, validate.ErrRequestNotSet
1✔
752
        }
1✔
753

754
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeat")
383✔
755
        if heartbeatRequest.TaskToken == nil {
384✔
756
                return nil, validate.ErrTaskTokenNotSet
1✔
757
        }
1✔
758
        taskToken, err := wh.tokenSerializer.Deserialize(heartbeatRequest.TaskToken)
382✔
759
        if err != nil {
382✔
760
                return nil, err
×
761
        }
×
762
        if taskToken.DomainID == "" {
382✔
763
                return nil, validate.ErrDomainNotSet
×
764
        }
×
765

766
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
382✔
767
        if err != nil {
382✔
768
                return nil, err
×
769
        }
×
770

771
        dw := domainWrapper{
382✔
772
                domain: domainName,
382✔
773
        }
382✔
774
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
382✔
775
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
382✔
776
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
382✔
777

382✔
778
        if err := common.CheckEventBlobSizeLimit(
382✔
779
                len(heartbeatRequest.Details),
382✔
780
                sizeLimitWarn,
382✔
781
                sizeLimitError,
382✔
782
                taskToken.DomainID,
382✔
783
                taskToken.WorkflowID,
382✔
784
                taskToken.RunID,
382✔
785
                scope,
382✔
786
                wh.GetThrottledLogger(),
382✔
787
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeat"),
382✔
788
        ); err != nil {
382✔
789
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
790
                failRequest := &types.RespondActivityTaskFailedRequest{
×
791
                        TaskToken: heartbeatRequest.TaskToken,
×
792
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
793
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
794
                        Identity:  heartbeatRequest.Identity,
×
795
                }
×
796
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
797
                        DomainUUID:    taskToken.DomainID,
×
798
                        FailedRequest: failRequest,
×
799
                })
×
800
                if err != nil {
×
801
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
802
                }
×
803
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
804
        } else {
382✔
805
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
382✔
806
                        DomainUUID:       taskToken.DomainID,
382✔
807
                        HeartbeatRequest: heartbeatRequest,
382✔
808
                })
382✔
809
                if err != nil {
382✔
810
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
811
                }
×
812
        }
813

814
        return resp, nil
382✔
815
}
816

817
// RecordActivityTaskHeartbeatByID - Record Activity Task Heart beat.
818
func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
819
        ctx context.Context,
820
        heartbeatRequest *types.RecordActivityTaskHeartbeatByIDRequest,
821
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
3✔
822
        if wh.isShuttingDown() {
3✔
823
                return nil, validate.ErrShuttingDown
×
824
        }
×
825

826
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
827
                return nil, err
×
828
        }
×
829

830
        if heartbeatRequest == nil {
4✔
831
                return nil, validate.ErrRequestNotSet
1✔
832
        }
1✔
833

834
        domainName := heartbeatRequest.GetDomain()
2✔
835
        if domainName == "" {
3✔
836
                return nil, validate.ErrDomainNotSet
1✔
837
        }
1✔
838

839
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID")
1✔
840
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
1✔
841
        if err != nil {
1✔
842
                return nil, err
×
843
        }
×
844
        workflowID := heartbeatRequest.GetWorkflowID()
1✔
845
        runID := heartbeatRequest.GetRunID() // runID is optional so can be empty
1✔
846
        activityID := heartbeatRequest.GetActivityID()
1✔
847

1✔
848
        if domainID == "" {
1✔
849
                return nil, validate.ErrDomainNotSet
×
850
        }
×
851
        if workflowID == "" {
1✔
852
                return nil, validate.ErrWorkflowIDNotSet
×
853
        }
×
854
        if activityID == "" {
1✔
855
                return nil, validate.ErrActivityIDNotSet
×
856
        }
×
857

858
        taskToken := &common.TaskToken{
1✔
859
                DomainID:   domainID,
1✔
860
                RunID:      runID,
1✔
861
                WorkflowID: workflowID,
1✔
862
                ScheduleID: common.EmptyEventID,
1✔
863
                ActivityID: activityID,
1✔
864
        }
1✔
865
        token, err := wh.tokenSerializer.Serialize(taskToken)
1✔
866
        if err != nil {
1✔
867
                return nil, err
×
868
        }
×
869

870
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatByIDScope, heartbeatRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1✔
871
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
1✔
872
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
1✔
873

1✔
874
        if err := common.CheckEventBlobSizeLimit(
1✔
875
                len(heartbeatRequest.Details),
1✔
876
                sizeLimitWarn,
1✔
877
                sizeLimitError,
1✔
878
                taskToken.DomainID,
1✔
879
                taskToken.WorkflowID,
1✔
880
                taskToken.RunID,
1✔
881
                scope,
1✔
882
                wh.GetThrottledLogger(),
1✔
883
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeatByID"),
1✔
884
        ); err != nil {
1✔
885
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
886
                failRequest := &types.RespondActivityTaskFailedRequest{
×
887
                        TaskToken: token,
×
888
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
889
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
890
                        Identity:  heartbeatRequest.Identity,
×
891
                }
×
892
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
893
                        DomainUUID:    taskToken.DomainID,
×
894
                        FailedRequest: failRequest,
×
895
                })
×
896
                if err != nil {
×
897
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
898
                }
×
899
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
900
        } else {
1✔
901
                req := &types.RecordActivityTaskHeartbeatRequest{
1✔
902
                        TaskToken: token,
1✔
903
                        Details:   heartbeatRequest.Details,
1✔
904
                        Identity:  heartbeatRequest.Identity,
1✔
905
                }
1✔
906

1✔
907
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
1✔
908
                        DomainUUID:       taskToken.DomainID,
1✔
909
                        HeartbeatRequest: req,
1✔
910
                })
1✔
911
                if err != nil {
1✔
912
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
913
                }
×
914
        }
915

916
        return resp, nil
1✔
917
}
918

919
// RespondActivityTaskCompleted - response to an activity task
920
func (wh *WorkflowHandler) RespondActivityTaskCompleted(
921
        ctx context.Context,
922
        completeRequest *types.RespondActivityTaskCompletedRequest,
923
) (retError error) {
247✔
924
        if wh.isShuttingDown() {
247✔
925
                return validate.ErrShuttingDown
×
926
        }
×
927

928
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
247✔
929
                return err
×
930
        }
×
931

932
        if completeRequest == nil {
247✔
933
                return validate.ErrRequestNotSet
×
934
        }
×
935

936
        if completeRequest.TaskToken == nil {
247✔
937
                return validate.ErrTaskTokenNotSet
×
938
        }
×
939
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
247✔
940
        if err != nil {
247✔
941
                return err
×
942
        }
×
943
        if taskToken.DomainID == "" {
247✔
944
                return validate.ErrDomainNotSet
×
945
        }
×
946

947
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
247✔
948
        if err != nil {
247✔
949
                return err
×
950
        }
×
951

952
        dw := domainWrapper{
247✔
953
                domain: domainName,
247✔
954
        }
247✔
955
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
247✔
956
        if !common.IsValidIDLength(
247✔
957
                completeRequest.GetIdentity(),
247✔
958
                scope,
247✔
959
                wh.config.MaxIDLengthWarnLimit(),
247✔
960
                wh.config.IdentityMaxLength(domainName),
247✔
961
                metrics.CadenceErrIdentityExceededWarnLimit,
247✔
962
                domainName,
247✔
963
                wh.GetLogger(),
247✔
964
                tag.IDTypeIdentity) {
247✔
965
                return validate.ErrIdentityTooLong
×
966
        }
×
967

968
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
247✔
969
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
247✔
970

247✔
971
        if err := common.CheckEventBlobSizeLimit(
247✔
972
                len(completeRequest.Result),
247✔
973
                sizeLimitWarn,
247✔
974
                sizeLimitError,
247✔
975
                taskToken.DomainID,
247✔
976
                taskToken.WorkflowID,
247✔
977
                taskToken.RunID,
247✔
978
                scope,
247✔
979
                wh.GetThrottledLogger(),
247✔
980
                tag.BlobSizeViolationOperation("RespondActivityTaskCompleted"),
247✔
981
        ); err != nil {
247✔
982
                // result exceeds blob size limit, we would record it as failure
×
983
                failRequest := &types.RespondActivityTaskFailedRequest{
×
984
                        TaskToken: completeRequest.TaskToken,
×
985
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
986
                        Details:   completeRequest.Result[0:sizeLimitError],
×
987
                        Identity:  completeRequest.Identity,
×
988
                }
×
989
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
990
                        DomainUUID:    taskToken.DomainID,
×
991
                        FailedRequest: failRequest,
×
992
                })
×
993
                if err != nil {
×
994
                        return wh.normalizeVersionedErrors(ctx, err)
×
995
                }
×
996
        } else {
247✔
997
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
247✔
998
                        DomainUUID:      taskToken.DomainID,
247✔
999
                        CompleteRequest: completeRequest,
247✔
1000
                })
247✔
1001
                if err != nil {
292✔
1002
                        return wh.normalizeVersionedErrors(ctx, err)
45✔
1003
                }
45✔
1004
        }
1005

1006
        return nil
202✔
1007
}
1008

1009
// RespondActivityTaskCompletedByID - response to an activity task
1010
func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
1011
        ctx context.Context,
1012
        completeRequest *types.RespondActivityTaskCompletedByIDRequest,
1013
) (retError error) {
76✔
1014
        if wh.isShuttingDown() {
76✔
1015
                return validate.ErrShuttingDown
×
1016
        }
×
1017

1018
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
76✔
1019
                return err
×
1020
        }
×
1021

1022
        if completeRequest == nil {
76✔
1023
                return validate.ErrRequestNotSet
×
1024
        }
×
1025

1026
        domainName := completeRequest.GetDomain()
76✔
1027
        if domainName == "" {
76✔
1028
                return validate.ErrDomainNotSet
×
1029
        }
×
1030
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
76✔
1031
        if err != nil {
76✔
1032
                return err
×
1033
        }
×
1034
        workflowID := completeRequest.GetWorkflowID()
76✔
1035
        runID := completeRequest.GetRunID() // runID is optional so can be empty
76✔
1036
        activityID := completeRequest.GetActivityID()
76✔
1037

76✔
1038
        if domainID == "" {
76✔
1039
                return validate.ErrDomainNotSet
×
1040
        }
×
1041
        if workflowID == "" {
76✔
1042
                return validate.ErrWorkflowIDNotSet
×
1043
        }
×
1044
        if activityID == "" {
76✔
1045
                return validate.ErrActivityIDNotSet
×
1046
        }
×
1047

1048
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedByIDScope, completeRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
76✔
1049
        if !common.IsValidIDLength(
76✔
1050
                completeRequest.GetIdentity(),
76✔
1051
                scope,
76✔
1052
                wh.config.MaxIDLengthWarnLimit(),
76✔
1053
                wh.config.IdentityMaxLength(domainName),
76✔
1054
                metrics.CadenceErrIdentityExceededWarnLimit,
76✔
1055
                domainName,
76✔
1056
                wh.GetLogger(),
76✔
1057
                tag.IDTypeIdentity) {
76✔
1058
                return validate.ErrIdentityTooLong
×
1059
        }
×
1060

1061
        taskToken := &common.TaskToken{
76✔
1062
                DomainID:   domainID,
76✔
1063
                RunID:      runID,
76✔
1064
                WorkflowID: workflowID,
76✔
1065
                ScheduleID: common.EmptyEventID,
76✔
1066
                ActivityID: activityID,
76✔
1067
        }
76✔
1068
        token, err := wh.tokenSerializer.Serialize(taskToken)
76✔
1069
        if err != nil {
76✔
1070
                return err
×
1071
        }
×
1072

1073
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
76✔
1074
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
76✔
1075

76✔
1076
        if err := common.CheckEventBlobSizeLimit(
76✔
1077
                len(completeRequest.Result),
76✔
1078
                sizeLimitWarn,
76✔
1079
                sizeLimitError,
76✔
1080
                taskToken.DomainID,
76✔
1081
                taskToken.WorkflowID,
76✔
1082
                taskToken.RunID,
76✔
1083
                scope,
76✔
1084
                wh.GetThrottledLogger(),
76✔
1085
                tag.BlobSizeViolationOperation("RespondActivityTaskCompletedByID"),
76✔
1086
        ); err != nil {
76✔
1087
                // result exceeds blob size limit, we would record it as failure
×
1088
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1089
                        TaskToken: token,
×
1090
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
1091
                        Details:   completeRequest.Result[0:sizeLimitError],
×
1092
                        Identity:  completeRequest.Identity,
×
1093
                }
×
1094
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1095
                        DomainUUID:    taskToken.DomainID,
×
1096
                        FailedRequest: failRequest,
×
1097
                })
×
1098
                if err != nil {
×
1099
                        return wh.normalizeVersionedErrors(ctx, err)
×
1100
                }
×
1101
        } else {
76✔
1102
                req := &types.RespondActivityTaskCompletedRequest{
76✔
1103
                        TaskToken: token,
76✔
1104
                        Result:    completeRequest.Result,
76✔
1105
                        Identity:  completeRequest.Identity,
76✔
1106
                }
76✔
1107

76✔
1108
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
76✔
1109
                        DomainUUID:      taskToken.DomainID,
76✔
1110
                        CompleteRequest: req,
76✔
1111
                })
76✔
1112
                if err != nil {
76✔
1113
                        return wh.normalizeVersionedErrors(ctx, err)
×
1114
                }
×
1115
        }
1116

1117
        return nil
76✔
1118
}
1119

1120
// RespondActivityTaskFailed - response to an activity task failure
1121
func (wh *WorkflowHandler) RespondActivityTaskFailed(
1122
        ctx context.Context,
1123
        failedRequest *types.RespondActivityTaskFailedRequest,
1124
) (retError error) {
12✔
1125
        if wh.isShuttingDown() {
12✔
1126
                return validate.ErrShuttingDown
×
1127
        }
×
1128

1129
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
12✔
1130
                return err
×
1131
        }
×
1132

1133
        if failedRequest == nil {
12✔
1134
                return validate.ErrRequestNotSet
×
1135
        }
×
1136

1137
        if failedRequest.TaskToken == nil {
12✔
1138
                return validate.ErrTaskTokenNotSet
×
1139
        }
×
1140
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
12✔
1141
        if err != nil {
12✔
1142
                return err
×
1143
        }
×
1144
        if taskToken.DomainID == "" {
12✔
1145
                return validate.ErrDomainNotSet
×
1146
        }
×
1147

1148
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
12✔
1149
        if err != nil {
12✔
1150
                return err
×
1151
        }
×
1152

1153
        dw := domainWrapper{
12✔
1154
                domain: domainName,
12✔
1155
        }
12✔
1156
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
12✔
1157
        if !common.IsValidIDLength(
12✔
1158
                failedRequest.GetIdentity(),
12✔
1159
                scope,
12✔
1160
                wh.config.MaxIDLengthWarnLimit(),
12✔
1161
                wh.config.IdentityMaxLength(domainName),
12✔
1162
                metrics.CadenceErrIdentityExceededWarnLimit,
12✔
1163
                domainName,
12✔
1164
                wh.GetLogger(),
12✔
1165
                tag.IDTypeIdentity) {
12✔
1166
                return validate.ErrIdentityTooLong
×
1167
        }
×
1168

1169
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
12✔
1170
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
12✔
1171

12✔
1172
        if err := common.CheckEventBlobSizeLimit(
12✔
1173
                len(failedRequest.Details),
12✔
1174
                sizeLimitWarn,
12✔
1175
                sizeLimitError,
12✔
1176
                taskToken.DomainID,
12✔
1177
                taskToken.WorkflowID,
12✔
1178
                taskToken.RunID,
12✔
1179
                scope,
12✔
1180
                wh.GetThrottledLogger(),
12✔
1181
                tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
12✔
1182
        ); err != nil {
12✔
1183
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1184
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1185
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1186
        }
×
1187

1188
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
12✔
1189
                DomainUUID:    taskToken.DomainID,
12✔
1190
                FailedRequest: failedRequest,
12✔
1191
        })
12✔
1192
        if err != nil {
12✔
1193
                return wh.normalizeVersionedErrors(ctx, err)
×
1194
        }
×
1195
        return nil
12✔
1196
}
1197

1198
// RespondActivityTaskFailedByID - response to an activity task failure
1199
func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
1200
        ctx context.Context,
1201
        failedRequest *types.RespondActivityTaskFailedByIDRequest,
1202
) (retError error) {
×
1203
        if wh.isShuttingDown() {
×
1204
                return validate.ErrShuttingDown
×
1205
        }
×
1206

1207
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1208
                return err
×
1209
        }
×
1210

1211
        if failedRequest == nil {
×
1212
                return validate.ErrRequestNotSet
×
1213
        }
×
1214

1215
        domainName := failedRequest.GetDomain()
×
1216

×
1217
        if domainName == "" {
×
1218
                return validate.ErrDomainNotSet
×
1219
        }
×
1220
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1221
        if err != nil {
×
1222
                return err
×
1223
        }
×
1224
        workflowID := failedRequest.GetWorkflowID()
×
1225
        runID := failedRequest.GetRunID() // runID is optional so can be empty
×
1226
        activityID := failedRequest.GetActivityID()
×
1227

×
1228
        if domainID == "" {
×
1229
                return validate.ErrDomainNotSet
×
1230
        }
×
1231
        if workflowID == "" {
×
1232
                return validate.ErrWorkflowIDNotSet
×
1233
        }
×
1234
        if activityID == "" {
×
1235
                return validate.ErrActivityIDNotSet
×
1236
        }
×
1237

1238
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedByIDScope, failedRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1239
        if !common.IsValidIDLength(
×
1240
                failedRequest.GetIdentity(),
×
1241
                scope,
×
1242
                wh.config.MaxIDLengthWarnLimit(),
×
1243
                wh.config.IdentityMaxLength(failedRequest.GetDomain()),
×
1244
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1245
                domainName,
×
1246
                wh.GetLogger(),
×
1247
                tag.IDTypeIdentity) {
×
1248
                return validate.ErrIdentityTooLong
×
1249
        }
×
1250

1251
        taskToken := &common.TaskToken{
×
1252
                DomainID:   domainID,
×
1253
                RunID:      runID,
×
1254
                WorkflowID: workflowID,
×
1255
                ScheduleID: common.EmptyEventID,
×
1256
                ActivityID: activityID,
×
1257
        }
×
1258
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1259
        if err != nil {
×
1260
                return err
×
1261
        }
×
1262

1263
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1264
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1265

×
1266
        if err := common.CheckEventBlobSizeLimit(
×
1267
                len(failedRequest.Details),
×
1268
                sizeLimitWarn,
×
1269
                sizeLimitError,
×
1270
                taskToken.DomainID,
×
1271
                taskToken.WorkflowID,
×
1272
                taskToken.RunID,
×
1273
                scope,
×
1274
                wh.GetThrottledLogger(),
×
1275
                tag.BlobSizeViolationOperation("RespondActivityTaskFailedByID"),
×
1276
        ); err != nil {
×
1277
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1278
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1279
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1280
        }
×
1281

1282
        req := &types.RespondActivityTaskFailedRequest{
×
1283
                TaskToken: token,
×
1284
                Reason:    failedRequest.Reason,
×
1285
                Details:   failedRequest.Details,
×
1286
                Identity:  failedRequest.Identity,
×
1287
        }
×
1288

×
1289
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1290
                DomainUUID:    taskToken.DomainID,
×
1291
                FailedRequest: req,
×
1292
        })
×
1293
        if err != nil {
×
1294
                return wh.normalizeVersionedErrors(ctx, err)
×
1295
        }
×
1296
        return nil
×
1297
}
1298

1299
// RespondActivityTaskCanceled - called to cancel an activity task
1300
func (wh *WorkflowHandler) RespondActivityTaskCanceled(
1301
        ctx context.Context,
1302
        cancelRequest *types.RespondActivityTaskCanceledRequest,
1303
) (retError error) {
×
1304
        if wh.isShuttingDown() {
×
1305
                return validate.ErrShuttingDown
×
1306
        }
×
1307

1308
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1309
                return err
×
1310
        }
×
1311

1312
        if cancelRequest == nil {
×
1313
                return validate.ErrRequestNotSet
×
1314
        }
×
1315

1316
        if cancelRequest.TaskToken == nil {
×
1317
                return validate.ErrTaskTokenNotSet
×
1318
        }
×
1319

1320
        taskToken, err := wh.tokenSerializer.Deserialize(cancelRequest.TaskToken)
×
1321
        if err != nil {
×
1322
                return err
×
1323
        }
×
1324

1325
        if taskToken.DomainID == "" {
×
1326
                return validate.ErrDomainNotSet
×
1327
        }
×
1328

1329
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
×
1330
        if err != nil {
×
1331
                return err
×
1332
        }
×
1333

1334
        dw := domainWrapper{
×
1335
                domain: domainName,
×
1336
        }
×
1337
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1338
        if !common.IsValidIDLength(
×
1339
                cancelRequest.GetIdentity(),
×
1340
                scope,
×
1341
                wh.config.MaxIDLengthWarnLimit(),
×
1342
                wh.config.IdentityMaxLength(domainName),
×
1343
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1344
                domainName,
×
1345
                wh.GetLogger(),
×
1346
                tag.IDTypeIdentity) {
×
1347
                return validate.ErrIdentityTooLong
×
1348
        }
×
1349

1350
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1351
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1352

×
1353
        if err := common.CheckEventBlobSizeLimit(
×
1354
                len(cancelRequest.Details),
×
1355
                sizeLimitWarn,
×
1356
                sizeLimitError,
×
1357
                taskToken.DomainID,
×
1358
                taskToken.WorkflowID,
×
1359
                taskToken.RunID,
×
1360
                scope,
×
1361
                wh.GetThrottledLogger(),
×
1362
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceled"),
×
1363
        ); err != nil {
×
1364
                // details exceeds blob size limit, we would record it as failure
×
1365
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1366
                        TaskToken: cancelRequest.TaskToken,
×
1367
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1368
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1369
                        Identity:  cancelRequest.Identity,
×
1370
                }
×
1371
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1372
                        DomainUUID:    taskToken.DomainID,
×
1373
                        FailedRequest: failRequest,
×
1374
                })
×
1375
                if err != nil {
×
1376
                        return wh.normalizeVersionedErrors(ctx, err)
×
1377
                }
×
1378
        } else {
×
1379
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1380
                        DomainUUID:    taskToken.DomainID,
×
1381
                        CancelRequest: cancelRequest,
×
1382
                })
×
1383
                if err != nil {
×
1384
                        return wh.normalizeVersionedErrors(ctx, err)
×
1385
                }
×
1386
        }
1387

1388
        return nil
×
1389
}
1390

1391
// RespondActivityTaskCanceledByID - called to cancel an activity task
1392
func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
1393
        ctx context.Context,
1394
        cancelRequest *types.RespondActivityTaskCanceledByIDRequest,
1395
) (retError error) {
×
1396
        if wh.isShuttingDown() {
×
1397
                return validate.ErrShuttingDown
×
1398
        }
×
1399

1400
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1401
                return err
×
1402
        }
×
1403

1404
        if cancelRequest == nil {
×
1405
                return validate.ErrRequestNotSet
×
1406
        }
×
1407

1408
        domainName := cancelRequest.GetDomain()
×
1409
        if domainName == "" {
×
1410
                return validate.ErrDomainNotSet
×
1411
        }
×
1412
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1413
        if err != nil {
×
1414
                return err
×
1415
        }
×
1416
        workflowID := cancelRequest.GetWorkflowID()
×
1417
        runID := cancelRequest.GetRunID() // runID is optional so can be empty
×
1418
        activityID := cancelRequest.GetActivityID()
×
1419

×
1420
        if domainID == "" {
×
1421
                return validate.ErrDomainNotSet
×
1422
        }
×
1423
        if workflowID == "" {
×
1424
                return validate.ErrWorkflowIDNotSet
×
1425
        }
×
1426
        if activityID == "" {
×
1427
                return validate.ErrActivityIDNotSet
×
1428
        }
×
1429

1430
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledByIDScope, cancelRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1431
        if !common.IsValidIDLength(
×
1432
                cancelRequest.GetIdentity(),
×
1433
                scope,
×
1434
                wh.config.MaxIDLengthWarnLimit(),
×
1435
                wh.config.IdentityMaxLength(cancelRequest.GetDomain()),
×
1436
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1437
                domainName,
×
1438
                wh.GetLogger(),
×
1439
                tag.IDTypeIdentity) {
×
1440
                return validate.ErrIdentityTooLong
×
1441
        }
×
1442

1443
        taskToken := &common.TaskToken{
×
1444
                DomainID:   domainID,
×
1445
                RunID:      runID,
×
1446
                WorkflowID: workflowID,
×
1447
                ScheduleID: common.EmptyEventID,
×
1448
                ActivityID: activityID,
×
1449
        }
×
1450
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1451
        if err != nil {
×
1452
                return err
×
1453
        }
×
1454

1455
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1456
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1457

×
1458
        if err := common.CheckEventBlobSizeLimit(
×
1459
                len(cancelRequest.Details),
×
1460
                sizeLimitWarn,
×
1461
                sizeLimitError,
×
1462
                taskToken.DomainID,
×
1463
                taskToken.WorkflowID,
×
1464
                taskToken.RunID,
×
1465
                scope,
×
1466
                wh.GetThrottledLogger(),
×
1467
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceledByID"),
×
1468
        ); err != nil {
×
1469
                // details exceeds blob size limit, we would record it as failure
×
1470
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1471
                        TaskToken: token,
×
1472
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1473
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1474
                        Identity:  cancelRequest.Identity,
×
1475
                }
×
1476
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1477
                        DomainUUID:    taskToken.DomainID,
×
1478
                        FailedRequest: failRequest,
×
1479
                })
×
1480
                if err != nil {
×
1481
                        return wh.normalizeVersionedErrors(ctx, err)
×
1482
                }
×
1483
        } else {
×
1484
                req := &types.RespondActivityTaskCanceledRequest{
×
1485
                        TaskToken: token,
×
1486
                        Details:   cancelRequest.Details,
×
1487
                        Identity:  cancelRequest.Identity,
×
1488
                }
×
1489

×
1490
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1491
                        DomainUUID:    taskToken.DomainID,
×
1492
                        CancelRequest: req,
×
1493
                })
×
1494
                if err != nil {
×
1495
                        return wh.normalizeVersionedErrors(ctx, err)
×
1496
                }
×
1497
        }
1498

1499
        return nil
×
1500
}
1501

1502
// RespondDecisionTaskCompleted - response to a decision task
1503
func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
1504
        ctx context.Context,
1505
        completeRequest *types.RespondDecisionTaskCompletedRequest,
1506
) (resp *types.RespondDecisionTaskCompletedResponse, retError error) {
925✔
1507
        if wh.isShuttingDown() {
925✔
1508
                return nil, validate.ErrShuttingDown
×
1509
        }
×
1510

1511
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
925✔
1512
                return nil, err
×
1513
        }
×
1514

1515
        if completeRequest == nil {
925✔
1516
                return nil, validate.ErrRequestNotSet
×
1517
        }
×
1518

1519
        if completeRequest.TaskToken == nil {
925✔
1520
                return nil, validate.ErrTaskTokenNotSet
×
1521
        }
×
1522
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
925✔
1523
        if err != nil {
925✔
1524
                return nil, err
×
1525
        }
×
1526
        if taskToken.DomainID == "" {
925✔
1527
                return nil, validate.ErrDomainNotSet
×
1528
        }
×
1529

1530
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
925✔
1531
        if err != nil {
925✔
1532
                return nil, err
×
1533
        }
×
1534

1535
        dw := domainWrapper{
925✔
1536
                domain: domainName,
925✔
1537
        }
925✔
1538
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
925✔
1539
        if !common.IsValidIDLength(
925✔
1540
                completeRequest.GetIdentity(),
925✔
1541
                scope,
925✔
1542
                wh.config.MaxIDLengthWarnLimit(),
925✔
1543
                wh.config.IdentityMaxLength(domainName),
925✔
1544
                metrics.CadenceErrIdentityExceededWarnLimit,
925✔
1545
                domainName,
925✔
1546
                wh.GetLogger(),
925✔
1547
                tag.IDTypeIdentity) {
925✔
1548
                return nil, validate.ErrIdentityTooLong
×
1549
        }
×
1550

1551
        if err := common.CheckDecisionResultLimit(
925✔
1552
                len(completeRequest.Decisions),
925✔
1553
                wh.config.DecisionResultCountLimit(domainName),
925✔
1554
                scope); err != nil {
925✔
1555
                return nil, err
×
1556
        }
×
1557

1558
        histResp, err := wh.GetHistoryClient().RespondDecisionTaskCompleted(ctx, &types.HistoryRespondDecisionTaskCompletedRequest{
925✔
1559
                DomainUUID:      taskToken.DomainID,
925✔
1560
                CompleteRequest: completeRequest},
925✔
1561
        )
925✔
1562
        if err != nil {
934✔
1563
                return nil, wh.normalizeVersionedErrors(ctx, err)
9✔
1564
        }
9✔
1565

1566
        completedResp := &types.RespondDecisionTaskCompletedResponse{}
916✔
1567
        completedResp.ActivitiesToDispatchLocally = histResp.ActivitiesToDispatchLocally
916✔
1568
        if completeRequest.GetReturnNewDecisionTask() && histResp != nil && histResp.StartedResponse != nil {
976✔
1569
                taskToken := &common.TaskToken{
60✔
1570
                        DomainID:        taskToken.DomainID,
60✔
1571
                        WorkflowID:      taskToken.WorkflowID,
60✔
1572
                        RunID:           taskToken.RunID,
60✔
1573
                        ScheduleID:      histResp.StartedResponse.GetScheduledEventID(),
60✔
1574
                        ScheduleAttempt: histResp.StartedResponse.GetAttempt(),
60✔
1575
                }
60✔
1576
                token, _ := wh.tokenSerializer.Serialize(taskToken)
60✔
1577
                workflowExecution := &types.WorkflowExecution{
60✔
1578
                        WorkflowID: taskToken.WorkflowID,
60✔
1579
                        RunID:      taskToken.RunID,
60✔
1580
                }
60✔
1581
                matchingResp := common.CreateMatchingPollForDecisionTaskResponse(histResp.StartedResponse, workflowExecution, token)
60✔
1582

60✔
1583
                newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, scope, taskToken.DomainID, matchingResp, matchingResp.GetBranchToken())
60✔
1584
                if err != nil {
60✔
1585
                        return nil, err
×
1586
                }
×
1587
                completedResp.DecisionTask = newDecisionTask
60✔
1588
        }
1589

1590
        return completedResp, nil
916✔
1591
}
1592

1593
// RespondDecisionTaskFailed - failed response to a decision task
1594
func (wh *WorkflowHandler) RespondDecisionTaskFailed(
1595
        ctx context.Context,
1596
        failedRequest *types.RespondDecisionTaskFailedRequest,
1597
) (retError error) {
159✔
1598
        if wh.isShuttingDown() {
159✔
1599
                return validate.ErrShuttingDown
×
1600
        }
×
1601

1602
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
159✔
1603
                return err
×
1604
        }
×
1605

1606
        if failedRequest == nil {
159✔
1607
                return validate.ErrRequestNotSet
×
1608
        }
×
1609

1610
        if failedRequest.TaskToken == nil {
159✔
1611
                return validate.ErrTaskTokenNotSet
×
1612
        }
×
1613
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
159✔
1614
        if err != nil {
159✔
1615
                return err
×
1616
        }
×
1617
        if taskToken.DomainID == "" {
159✔
1618
                return validate.ErrDomainNotSet
×
1619
        }
×
1620

1621
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
159✔
1622
        if err != nil {
159✔
1623
                return err
×
1624
        }
×
1625

1626
        dw := domainWrapper{
159✔
1627
                domain: domainName,
159✔
1628
        }
159✔
1629
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
159✔
1630
        if !common.IsValidIDLength(
159✔
1631
                failedRequest.GetIdentity(),
159✔
1632
                scope,
159✔
1633
                wh.config.MaxIDLengthWarnLimit(),
159✔
1634
                wh.config.IdentityMaxLength(domainName),
159✔
1635
                metrics.CadenceErrIdentityExceededWarnLimit,
159✔
1636
                domainName,
159✔
1637
                wh.GetLogger(),
159✔
1638
                tag.IDTypeIdentity) {
159✔
1639
                return validate.ErrIdentityTooLong
×
1640
        }
×
1641

1642
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
159✔
1643
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
159✔
1644

159✔
1645
        if err := common.CheckEventBlobSizeLimit(
159✔
1646
                len(failedRequest.Details),
159✔
1647
                sizeLimitWarn,
159✔
1648
                sizeLimitError,
159✔
1649
                taskToken.DomainID,
159✔
1650
                taskToken.WorkflowID,
159✔
1651
                taskToken.RunID,
159✔
1652
                scope,
159✔
1653
                wh.GetThrottledLogger(),
159✔
1654
                tag.BlobSizeViolationOperation("RespondDecisionTaskFailed"),
159✔
1655
        ); err != nil {
159✔
1656
                // details exceed, we would just truncate the size for decision task failed as the details is not used anywhere by client code
×
1657
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1658
        }
×
1659

1660
        err = wh.GetHistoryClient().RespondDecisionTaskFailed(ctx, &types.HistoryRespondDecisionTaskFailedRequest{
159✔
1661
                DomainUUID:    taskToken.DomainID,
159✔
1662
                FailedRequest: failedRequest,
159✔
1663
        })
159✔
1664
        if err != nil {
159✔
1665
                return wh.normalizeVersionedErrors(ctx, err)
×
1666
        }
×
1667
        return nil
159✔
1668
}
1669

1670
// RespondQueryTaskCompleted - response to a query task
1671
func (wh *WorkflowHandler) RespondQueryTaskCompleted(
1672
        ctx context.Context,
1673
        completeRequest *types.RespondQueryTaskCompletedRequest,
1674
) (retError error) {
30✔
1675
        if wh.isShuttingDown() {
30✔
1676
                return validate.ErrShuttingDown
×
1677
        }
×
1678

1679
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
30✔
1680
                return err
×
1681
        }
×
1682

1683
        if completeRequest == nil {
30✔
1684
                return validate.ErrRequestNotSet
×
1685
        }
×
1686

1687
        if completeRequest.TaskToken == nil {
30✔
1688
                return validate.ErrTaskTokenNotSet
×
1689
        }
×
1690
        queryTaskToken, err := wh.tokenSerializer.DeserializeQueryTaskToken(completeRequest.TaskToken)
30✔
1691
        if err != nil {
30✔
1692
                return err
×
1693
        }
×
1694
        if queryTaskToken.DomainID == "" || queryTaskToken.TaskList == "" || queryTaskToken.TaskID == "" {
30✔
1695
                return validate.ErrInvalidTaskToken
×
1696
        }
×
1697

1698
        domainName, err := wh.GetDomainCache().GetDomainName(queryTaskToken.DomainID)
30✔
1699
        if err != nil {
30✔
1700
                return err
×
1701
        }
×
1702

1703
        dw := domainWrapper{
30✔
1704
                domain: domainName,
30✔
1705
        }
30✔
1706
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
30✔
1707
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
30✔
1708

30✔
1709
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondQueryTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
30✔
1710
        if err := common.CheckEventBlobSizeLimit(
30✔
1711
                len(completeRequest.GetQueryResult()),
30✔
1712
                sizeLimitWarn,
30✔
1713
                sizeLimitError,
30✔
1714
                queryTaskToken.DomainID,
30✔
1715
                "",
30✔
1716
                "",
30✔
1717
                scope,
30✔
1718
                wh.GetThrottledLogger(),
30✔
1719
                tag.BlobSizeViolationOperation("RespondQueryTaskCompleted"),
30✔
1720
        ); err != nil {
30✔
1721
                completeRequest = &types.RespondQueryTaskCompletedRequest{
×
1722
                        TaskToken:     completeRequest.TaskToken,
×
1723
                        CompletedType: types.QueryTaskCompletedTypeFailed.Ptr(),
×
1724
                        QueryResult:   nil,
×
1725
                        ErrorMessage:  err.Error(),
×
1726
                }
×
1727
        }
×
1728

1729
        call := yarpc.CallFromContext(ctx)
30✔
1730

30✔
1731
        completeRequest.WorkerVersionInfo = &types.WorkerVersionInfo{
30✔
1732
                Impl:           call.Header(common.ClientImplHeaderName),
30✔
1733
                FeatureVersion: call.Header(common.FeatureVersionHeaderName),
30✔
1734
        }
30✔
1735
        matchingRequest := &types.MatchingRespondQueryTaskCompletedRequest{
30✔
1736
                DomainUUID:       queryTaskToken.DomainID,
30✔
1737
                TaskList:         &types.TaskList{Name: queryTaskToken.TaskList},
30✔
1738
                TaskID:           queryTaskToken.TaskID,
30✔
1739
                CompletedRequest: completeRequest,
30✔
1740
        }
30✔
1741

30✔
1742
        err = wh.GetMatchingClient().RespondQueryTaskCompleted(ctx, matchingRequest)
30✔
1743
        if err != nil {
30✔
1744
                return err
×
1745
        }
×
1746
        return nil
30✔
1747
}
1748

1749
func (wh *WorkflowHandler) StartWorkflowExecutionAsync(
1750
        ctx context.Context,
1751
        startRequest *types.StartWorkflowExecutionAsyncRequest,
1752
) (resp *types.StartWorkflowExecutionAsyncResponse, retError error) {
3✔
1753
        if wh.isShuttingDown() {
3✔
1754
                return nil, validate.ErrShuttingDown
×
1755
        }
×
1756
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionAsyncScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
3✔
1757
        // validate request before pushing to queue
3✔
1758
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest.StartWorkflowExecutionRequest, scope)
3✔
1759
        if err != nil {
3✔
1760
                return nil, err
×
1761
        }
×
1762

1763
        producer, err := wh.producerManager.GetProducerByDomain(startRequest.GetDomain())
3✔
1764
        if err != nil {
4✔
1765
                return nil, err
1✔
1766
        }
1✔
1767

1768
        // Serialize the message to be sent to the queue.
1769
        // Avoid JSON because json encoding of requests excludes PII fields such as input. JSON encoded request are logged by acccess controlled api layer for audit purposes.
1770
        payload, err := wh.thriftrwEncoder.Encode(thrift.FromStartWorkflowExecutionAsyncRequest(startRequest))
2✔
1771
        if err != nil {
2✔
1772
                return nil, fmt.Errorf("failed to encode StartWorkflowExecutionAsyncRequest: %v", err)
×
1773
        }
×
1774

1775
        // propagate the headers from the context to the message
1776
        header := &shared.Header{
2✔
1777
                Fields: make(map[string][]byte),
2✔
1778
        }
2✔
1779
        for k, v := range yarpc.CallFromContext(ctx).OriginalHeaders() {
2✔
1780
                header.Fields[k] = []byte(v)
×
1781
        }
×
1782
        messageType := sqlblobs.AsyncRequestTypeStartWorkflowExecutionAsyncRequest
2✔
1783
        message := &sqlblobs.AsyncRequestMessage{
2✔
1784
                PartitionKey: common.StringPtr(startRequest.GetWorkflowID()),
2✔
1785
                Type:         &messageType,
2✔
1786
                Header:       header,
2✔
1787
                Encoding:     common.StringPtr(string(common.EncodingTypeThriftRW)),
2✔
1788
                Payload:      payload,
2✔
1789
        }
2✔
1790
        err = producer.Publish(ctx, message)
2✔
1791
        if err != nil {
3✔
1792
                return nil, err
1✔
1793
        }
1✔
1794
        return &types.StartWorkflowExecutionAsyncResponse{}, nil
1✔
1795
}
1796

1797
// StartWorkflowExecution - Creates a new workflow execution
1798
func (wh *WorkflowHandler) StartWorkflowExecution(
1799
        ctx context.Context,
1800
        startRequest *types.StartWorkflowExecutionRequest,
1801
) (resp *types.StartWorkflowExecutionResponse, retError error) {
488✔
1802
        if wh.isShuttingDown() {
488✔
1803
                return nil, validate.ErrShuttingDown
×
1804
        }
×
1805
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
488✔
1806
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest, scope)
488✔
1807
        if err != nil {
499✔
1808
                return nil, err
11✔
1809
        }
11✔
1810
        domainName := startRequest.GetDomain()
477✔
1811
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
477✔
1812
        if err != nil {
477✔
1813
                return nil, err
×
1814
        }
×
1815
        wh.GetLogger().Debug("Start workflow execution request domainID", tag.WorkflowDomainID(domainID))
477✔
1816
        historyRequest, err := common.CreateHistoryStartWorkflowRequest(
477✔
1817
                domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
477✔
1818
        if err != nil {
477✔
1819
                return nil, err
×
1820
        }
×
1821

1822
        resp, err = wh.GetHistoryClient().StartWorkflowExecution(ctx, historyRequest)
477✔
1823
        if err != nil {
510✔
1824
                return nil, err
33✔
1825
        }
33✔
1826
        return resp, nil
444✔
1827
}
1828

1829
func (wh *WorkflowHandler) validateStartWorkflowExecutionRequest(ctx context.Context, startRequest *types.StartWorkflowExecutionRequest, scope metrics.Scope) error {
491✔
1830
        if startRequest == nil {
492✔
1831
                return validate.ErrRequestNotSet
1✔
1832
        }
1✔
1833
        domainName := startRequest.GetDomain()
490✔
1834
        if domainName == "" {
491✔
1835
                return validate.ErrDomainNotSet
1✔
1836
        }
1✔
1837
        if startRequest.GetWorkflowID() == "" {
490✔
1838
                return validate.ErrWorkflowIDNotSet
1✔
1839
        }
1✔
1840
        if _, err := uuid.Parse(startRequest.RequestID); err != nil {
490✔
1841
                return &types.BadRequestError{Message: fmt.Sprintf("requestId %q is not a valid UUID", startRequest.RequestID)}
2✔
1842
        }
2✔
1843
        if startRequest.WorkflowType == nil || startRequest.WorkflowType.GetName() == "" {
487✔
1844
                return validate.ErrWorkflowTypeNotSet
1✔
1845
        }
1✔
1846
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
485✔
1847
                return err
×
1848
        }
×
1849
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
485✔
1850
        if !common.IsValidIDLength(
485✔
1851
                domainName,
485✔
1852
                scope,
485✔
1853
                idLengthWarnLimit,
485✔
1854
                wh.config.DomainNameMaxLength(domainName),
485✔
1855
                metrics.CadenceErrDomainNameExceededWarnLimit,
485✔
1856
                domainName,
485✔
1857
                wh.GetLogger(),
485✔
1858
                tag.IDTypeDomainName) {
485✔
1859
                return validate.ErrDomainTooLong
×
1860
        }
×
1861
        if !common.IsValidIDLength(
485✔
1862
                startRequest.GetWorkflowID(),
485✔
1863
                scope,
485✔
1864
                idLengthWarnLimit,
485✔
1865
                wh.config.WorkflowIDMaxLength(domainName),
485✔
1866
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
485✔
1867
                domainName,
485✔
1868
                wh.GetLogger(),
485✔
1869
                tag.IDTypeWorkflowID) {
485✔
1870
                return validate.ErrWorkflowIDTooLong
×
1871
        }
×
1872
        if err := common.ValidateRetryPolicy(startRequest.RetryPolicy); err != nil {
485✔
1873
                return err
×
1874
        }
×
1875
        wh.GetLogger().Debug(
485✔
1876
                "Received StartWorkflowExecution. WorkflowID",
485✔
1877
                tag.WorkflowID(startRequest.GetWorkflowID()))
485✔
1878
        if !common.IsValidIDLength(
485✔
1879
                startRequest.WorkflowType.GetName(),
485✔
1880
                scope,
485✔
1881
                idLengthWarnLimit,
485✔
1882
                wh.config.WorkflowTypeMaxLength(domainName),
485✔
1883
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
485✔
1884
                domainName,
485✔
1885
                wh.GetLogger(),
485✔
1886
                tag.IDTypeWorkflowType) {
485✔
1887
                return validate.ErrWorkflowTypeTooLong
×
1888
        }
×
1889
        if err := wh.validateTaskList(startRequest.TaskList, scope, domainName); err != nil {
486✔
1890
                return err
1✔
1891
        }
1✔
1892
        if startRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
485✔
1893
                return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
1✔
1894
        }
1✔
1895
        if startRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
484✔
1896
                return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
1✔
1897
        }
1✔
1898
        if startRequest.GetDelayStartSeconds() < 0 {
483✔
1899
                return validate.ErrInvalidDelayStartSeconds
1✔
1900
        }
1✔
1901
        if startRequest.GetJitterStartSeconds() < 0 {
481✔
1902
                return validate.ErrInvalidJitterStartSeconds
×
1903
        }
×
1904
        jitter := startRequest.GetJitterStartSeconds()
481✔
1905
        cron := startRequest.GetCronSchedule()
481✔
1906
        if cron != "" {
499✔
1907
                if _, err := backoff.ValidateSchedule(startRequest.GetCronSchedule()); err != nil {
18✔
1908
                        return err
×
1909
                }
×
1910
        }
1911
        if jitter > 0 && cron != "" {
481✔
1912
                // Calculate the cron duration and ensure that jitter is not greater than the cron duration,
×
1913
                // because that would be confusing to users.
×
1914

×
1915
                // Request using start/end time zero value, which will get us an exact answer (i.e. its not in the
×
1916
                // middle of a minute)
×
1917
                backoffSeconds, err := backoff.GetBackoffForNextScheduleInSeconds(cron, time.Time{}, time.Time{}, jitter)
×
1918
                if err != nil {
×
1919
                        return err
×
1920
                }
×
1921
                if jitter > backoffSeconds {
×
1922
                        return validate.ErrInvalidJitterStartSeconds2
×
1923
                }
×
1924
        }
1925
        if !common.IsValidIDLength(
481✔
1926
                startRequest.GetRequestID(),
481✔
1927
                scope,
481✔
1928
                idLengthWarnLimit,
481✔
1929
                wh.config.RequestIDMaxLength(domainName),
481✔
1930
                metrics.CadenceErrRequestIDExceededWarnLimit,
481✔
1931
                domainName,
481✔
1932
                wh.GetLogger(),
481✔
1933
                tag.IDTypeRequestID) {
481✔
1934
                return validate.ErrRequestIDTooLong
×
1935
        }
×
1936
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(startRequest.SearchAttributes, domainName); err != nil {
481✔
1937
                return err
×
1938
        }
×
1939
        wh.GetLogger().Debug("Start workflow execution request domain", tag.WorkflowDomainName(domainName))
481✔
1940
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
481✔
1941
        if err != nil {
481✔
1942
                return err
×
1943
        }
×
1944
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
481✔
1945
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
481✔
1946
        actualSize := len(startRequest.Input)
481✔
1947
        if startRequest.Memo != nil {
493✔
1948
                actualSize += common.GetSizeOfMapStringToByteArray(startRequest.Memo.GetFields())
12✔
1949
        }
12✔
1950
        if err := common.CheckEventBlobSizeLimit(
481✔
1951
                actualSize,
481✔
1952
                sizeLimitWarn,
481✔
1953
                sizeLimitError,
481✔
1954
                domainID,
481✔
1955
                startRequest.GetWorkflowID(),
481✔
1956
                "",
481✔
1957
                scope,
481✔
1958
                wh.GetThrottledLogger(),
481✔
1959
                tag.BlobSizeViolationOperation("StartWorkflowExecution"),
481✔
1960
        ); err != nil {
481✔
1961
                return err
×
1962
        }
×
1963
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
481✔
1964
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
482✔
1965
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
1966
        }
1✔
1967
        return nil
480✔
1968
}
1969

1970
// GetWorkflowExecutionHistory - retrieves the history of workflow execution
1971
func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
1972
        ctx context.Context,
1973
        getRequest *types.GetWorkflowExecutionHistoryRequest,
1974
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
459✔
1975
        if wh.isShuttingDown() {
459✔
1976
                return nil, validate.ErrShuttingDown
×
1977
        }
×
1978

1979
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
459✔
1980
                return nil, err
×
1981
        }
×
1982

1983
        if getRequest == nil {
459✔
1984
                return nil, validate.ErrRequestNotSet
×
1985
        }
×
1986

1987
        domainName := getRequest.GetDomain()
459✔
1988
        wfExecution := getRequest.GetExecution()
459✔
1989

459✔
1990
        if domainName == "" {
459✔
1991
                return nil, validate.ErrDomainNotSet
×
1992
        }
×
1993

1994
        if err := validate.CheckExecution(wfExecution); err != nil {
459✔
1995
                return nil, err
×
1996
        }
×
1997

1998
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
459✔
1999
        if err != nil {
459✔
2000
                return nil, err
×
2001
        }
×
2002

2003
        if getRequest.GetMaximumPageSize() <= 0 {
786✔
2004
                getRequest.MaximumPageSize = int32(wh.config.HistoryMaxPageSize(getRequest.GetDomain()))
327✔
2005
        }
327✔
2006
        // force limit page size if exceed
2007
        if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize {
459✔
2008
                wh.GetThrottledLogger().Warn("GetHistory page size is larger than threshold",
×
2009
                        tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2010
                        tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2011
                        tag.WorkflowDomainID(domainID),
×
2012
                        tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))
×
2013
                getRequest.MaximumPageSize = common.GetHistoryMaxPageSize
×
2014
        }
×
2015

2016
        scope := getMetricsScopeWithDomain(metrics.FrontendGetWorkflowExecutionHistoryScope, getRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
459✔
2017
        if !getRequest.GetSkipArchival() {
900✔
2018
                enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled()
441✔
2019
                historyArchived := wh.historyArchived(ctx, getRequest, domainID)
441✔
2020
                if enableArchivalRead && historyArchived {
463✔
2021
                        return wh.getArchivedHistory(ctx, getRequest, domainID)
22✔
2022
                }
22✔
2023
        }
2024

2025
        // this function return the following 6 things,
2026
        // 1. branch token
2027
        // 2. the workflow run ID
2028
        // 3. the last first event ID (the event ID of the last batch of events in the history)
2029
        // 4. the next event ID
2030
        // 5. whether the workflow is closed
2031
        // 6. error if any
2032
        queryHistory := func(
437✔
2033
                domainUUID string,
437✔
2034
                execution *types.WorkflowExecution,
437✔
2035
                expectedNextEventID int64,
437✔
2036
                currentBranchToken []byte,
437✔
2037
        ) ([]byte, string, int64, int64, bool, error) {
830✔
2038
                response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
393✔
2039
                        DomainUUID:          domainUUID,
393✔
2040
                        Execution:           execution,
393✔
2041
                        ExpectedNextEventID: expectedNextEventID,
393✔
2042
                        CurrentBranchToken:  currentBranchToken,
393✔
2043
                })
393✔
2044

393✔
2045
                if err != nil {
393✔
2046
                        return nil, "", 0, 0, false, err
×
2047
                }
×
2048
                isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
393✔
2049

393✔
2050
                return response.CurrentBranchToken,
393✔
2051
                        response.Execution.GetRunID(),
393✔
2052
                        response.GetLastFirstEventID(),
393✔
2053
                        response.GetNextEventID(),
393✔
2054
                        isWorkflowRunning,
393✔
2055
                        nil
393✔
2056
        }
2057

2058
        isLongPoll := getRequest.GetWaitForNewEvent()
437✔
2059
        isCloseEventOnly := getRequest.GetHistoryEventFilterType() == types.HistoryEventFilterTypeCloseEvent
437✔
2060
        execution := getRequest.Execution
437✔
2061
        token := &getHistoryContinuationToken{}
437✔
2062

437✔
2063
        var runID string
437✔
2064
        lastFirstEventID := common.FirstEventID
437✔
2065
        var nextEventID int64
437✔
2066
        var isWorkflowRunning bool
437✔
2067

437✔
2068
        // process the token for paging
437✔
2069
        queryNextEventID := common.EndEventID
437✔
2070
        if getRequest.NextPageToken != nil {
481✔
2071
                token, err = deserializeHistoryToken(getRequest.NextPageToken)
44✔
2072
                if err != nil {
44✔
2073
                        return nil, validate.ErrInvalidNextPageToken
×
2074
                }
×
2075
                if execution.RunID != "" && execution.GetRunID() != token.RunID {
44✔
2076
                        return nil, validate.ErrNextPageTokenRunIDMismatch
×
2077
                }
×
2078

2079
                execution.RunID = token.RunID
44✔
2080

44✔
2081
                // we need to update the current next event ID and whether workflow is running
44✔
2082
                if len(token.PersistenceToken) == 0 && isLongPoll && token.IsWorkflowRunning {
44✔
2083
                        logger := wh.GetLogger().WithTags(
×
2084
                                tag.WorkflowDomainName(getRequest.GetDomain()),
×
2085
                                tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2086
                                tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2087
                        )
×
2088
                        // TODO: for now we only log the invalid timeout (this is done inside the helper function) in case
×
2089
                        // this change breaks existing customers. Once we are sure no one is calling this API with very short timeout
×
2090
                        // we can return the error.
×
2091
                        _ = common.ValidateLongPollContextTimeout(ctx, "GetWorkflowExecutionHistory", logger)
×
2092

×
2093
                        if !isCloseEventOnly {
×
2094
                                queryNextEventID = token.NextEventID
×
2095
                        }
×
2096
                        token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
×
2097
                                queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
×
2098
                        if err != nil {
×
2099
                                return nil, err
×
2100
                        }
×
2101
                        token.FirstEventID = token.NextEventID
×
2102
                        token.NextEventID = nextEventID
×
2103
                        token.IsWorkflowRunning = isWorkflowRunning
×
2104
                }
2105
        } else {
393✔
2106
                if !isCloseEventOnly {
768✔
2107
                        queryNextEventID = common.FirstEventID
375✔
2108
                }
375✔
2109
                token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
393✔
2110
                        queryHistory(domainID, execution, queryNextEventID, nil)
393✔
2111
                if err != nil {
393✔
2112
                        return nil, err
×
2113
                }
×
2114

2115
                execution.RunID = runID
393✔
2116

393✔
2117
                token.RunID = runID
393✔
2118
                token.FirstEventID = common.FirstEventID
393✔
2119
                token.NextEventID = nextEventID
393✔
2120
                token.IsWorkflowRunning = isWorkflowRunning
393✔
2121
                token.PersistenceToken = nil
393✔
2122
        }
2123

2124
        call := yarpc.CallFromContext(ctx)
437✔
2125
        clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
437✔
2126
        clientImpl := call.Header(common.ClientImplHeaderName)
437✔
2127
        supportsRawHistoryQuery := wh.versionChecker.SupportsRawHistoryQuery(clientImpl, clientFeatureVersion) == nil
437✔
2128
        isRawHistoryEnabled := wh.config.SendRawWorkflowHistory(domainName) && supportsRawHistoryQuery
437✔
2129

437✔
2130
        history := &types.History{}
437✔
2131
        history.Events = []*types.HistoryEvent{}
437✔
2132
        var historyBlob []*types.DataBlob
437✔
2133

437✔
2134
        // helper function to just getHistory
437✔
2135
        getHistory := func(firstEventID, nextEventID int64, nextPageToken []byte) error {
874✔
2136
                if isRawHistoryEnabled {
439✔
2137
                        historyBlob, token.PersistenceToken, err = wh.getRawHistory(
2✔
2138
                                ctx,
2✔
2139
                                scope,
2✔
2140
                                domainID,
2✔
2141
                                domainName,
2✔
2142
                                *execution,
2✔
2143
                                firstEventID,
2✔
2144
                                nextEventID,
2✔
2145
                                getRequest.GetMaximumPageSize(),
2✔
2146
                                nextPageToken,
2✔
2147
                                token.TransientDecision,
2✔
2148
                                token.BranchToken,
2✔
2149
                        )
2✔
2150
                } else {
437✔
2151
                        history, token.PersistenceToken, err = wh.getHistory(
435✔
2152
                                ctx,
435✔
2153
                                scope,
435✔
2154
                                domainID,
435✔
2155
                                domainName,
435✔
2156
                                *execution,
435✔
2157
                                firstEventID,
435✔
2158
                                nextEventID,
435✔
2159
                                getRequest.GetMaximumPageSize(),
435✔
2160
                                nextPageToken,
435✔
2161
                                token.TransientDecision,
435✔
2162
                                token.BranchToken,
435✔
2163
                        )
435✔
2164
                }
435✔
2165
                if err != nil {
437✔
2166
                        return err
×
2167
                }
×
2168
                return nil
437✔
2169
        }
2170

2171
        if isCloseEventOnly {
455✔
2172
                if !isWorkflowRunning {
36✔
2173
                        if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil {
18✔
2174
                                return nil, err
×
2175
                        }
×
2176
                        if isRawHistoryEnabled {
18✔
2177
                                // since getHistory func will not return empty history, so the below is safe
×
2178
                                historyBlob = historyBlob[len(historyBlob)-1:]
×
2179
                        } else {
18✔
2180
                                // since getHistory func will not return empty history, so the below is safe
18✔
2181
                                history.Events = history.Events[len(history.Events)-1:]
18✔
2182
                        }
18✔
2183
                        token = nil
18✔
2184
                } else if isLongPoll {
×
2185
                        // set the persistence token to be nil so next time we will query history for updates
×
2186
                        token.PersistenceToken = nil
×
2187
                } else {
×
2188
                        token = nil
×
2189
                }
×
2190
        } else {
419✔
2191
                // return all events
419✔
2192
                if token.FirstEventID >= token.NextEventID {
419✔
2193
                        // currently there is no new event
×
2194
                        history.Events = []*types.HistoryEvent{}
×
2195
                        if !isWorkflowRunning {
×
2196
                                token = nil
×
2197
                        }
×
2198
                } else {
419✔
2199
                        if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil {
419✔
2200
                                return nil, err
×
2201
                        }
×
2202
                        // here, for long pull on history events, we need to intercept the paging token from cassandra
2203
                        // and do something clever
2204
                        if len(token.PersistenceToken) == 0 && (!token.IsWorkflowRunning || !isLongPoll) {
796✔
2205
                                // meaning, there is no more history to be returned
377✔
2206
                                token = nil
377✔
2207
                        }
377✔
2208
                }
2209
        }
2210

2211
        nextToken, err := serializeHistoryToken(token)
437✔
2212
        if err != nil {
437✔
2213
                return nil, err
×
2214
        }
×
2215
        return &types.GetWorkflowExecutionHistoryResponse{
437✔
2216
                History:       history,
437✔
2217
                RawHistory:    historyBlob,
437✔
2218
                NextPageToken: nextToken,
437✔
2219
                Archived:      false,
437✔
2220
        }, nil
437✔
2221
}
2222

2223
// SignalWorkflowExecution is used to send a signal event to running workflow execution.  This results in
2224
// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
2225
func (wh *WorkflowHandler) SignalWorkflowExecution(
2226
        ctx context.Context,
2227
        signalRequest *types.SignalWorkflowExecutionRequest,
2228
) (retError error) {
723✔
2229
        if wh.isShuttingDown() {
723✔
2230
                return validate.ErrShuttingDown
×
2231
        }
×
2232

2233
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
723✔
2234
                return err
×
2235
        }
×
2236

2237
        if signalRequest == nil {
723✔
2238
                return validate.ErrRequestNotSet
×
2239
        }
×
2240

2241
        domainName := signalRequest.GetDomain()
723✔
2242
        wfExecution := signalRequest.GetWorkflowExecution()
723✔
2243

723✔
2244
        if domainName == "" {
723✔
2245
                return validate.ErrDomainNotSet
×
2246
        }
×
2247
        if err := validate.CheckExecution(wfExecution); err != nil {
723✔
2248
                return err
×
2249
        }
×
2250

2251
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWorkflowExecutionScope, signalRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
723✔
2252
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
723✔
2253
        if !common.IsValidIDLength(
723✔
2254
                domainName,
723✔
2255
                scope,
723✔
2256
                idLengthWarnLimit,
723✔
2257
                wh.config.DomainNameMaxLength(domainName),
723✔
2258
                metrics.CadenceErrDomainNameExceededWarnLimit,
723✔
2259
                domainName,
723✔
2260
                wh.GetLogger(),
723✔
2261
                tag.IDTypeDomainName) {
723✔
2262
                return validate.ErrDomainTooLong
×
2263
        }
×
2264

2265
        if signalRequest.GetSignalName() == "" {
723✔
2266
                return validate.ErrSignalNameNotSet
×
2267
        }
×
2268

2269
        if !common.IsValidIDLength(
723✔
2270
                signalRequest.GetSignalName(),
723✔
2271
                scope,
723✔
2272
                idLengthWarnLimit,
723✔
2273
                wh.config.SignalNameMaxLength(domainName),
723✔
2274
                metrics.CadenceErrSignalNameExceededWarnLimit,
723✔
2275
                domainName,
723✔
2276
                wh.GetLogger(),
723✔
2277
                tag.IDTypeSignalName) {
723✔
2278
                return validate.ErrSignalNameTooLong
×
2279
        }
×
2280

2281
        if !common.IsValidIDLength(
723✔
2282
                signalRequest.GetRequestID(),
723✔
2283
                scope,
723✔
2284
                idLengthWarnLimit,
723✔
2285
                wh.config.RequestIDMaxLength(domainName),
723✔
2286
                metrics.CadenceErrRequestIDExceededWarnLimit,
723✔
2287
                domainName,
723✔
2288
                wh.GetLogger(),
723✔
2289
                tag.IDTypeRequestID) {
723✔
2290
                return validate.ErrRequestIDTooLong
×
2291
        }
×
2292

2293
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
723✔
2294
        if err != nil {
723✔
2295
                return err
×
2296
        }
×
2297

2298
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
723✔
2299
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
723✔
2300
        if err := common.CheckEventBlobSizeLimit(
723✔
2301
                len(signalRequest.Input),
723✔
2302
                sizeLimitWarn,
723✔
2303
                sizeLimitError,
723✔
2304
                domainID,
723✔
2305
                signalRequest.GetWorkflowExecution().GetWorkflowID(),
723✔
2306
                signalRequest.GetWorkflowExecution().GetRunID(),
723✔
2307
                scope,
723✔
2308
                wh.GetThrottledLogger(),
723✔
2309
                tag.BlobSizeViolationOperation("SignalWorkflowExecution"),
723✔
2310
        ); err != nil {
723✔
2311
                return err
×
2312
        }
×
2313

2314
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
723✔
2315
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
723✔
2316
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2317
        }
×
2318

2319
        err = wh.GetHistoryClient().SignalWorkflowExecution(ctx, &types.HistorySignalWorkflowExecutionRequest{
723✔
2320
                DomainUUID:    domainID,
723✔
2321
                SignalRequest: signalRequest,
723✔
2322
        })
723✔
2323
        if err != nil {
732✔
2324
                return wh.normalizeVersionedErrors(ctx, err)
9✔
2325
        }
9✔
2326

2327
        return nil
714✔
2328
}
2329

2330
func (wh *WorkflowHandler) SignalWithStartWorkflowExecutionAsync(
2331
        ctx context.Context,
2332
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionAsyncRequest,
2333
) (resp *types.SignalWithStartWorkflowExecutionAsyncResponse, retError error) {
3✔
2334
        if wh.isShuttingDown() {
3✔
2335
                return nil, validate.ErrShuttingDown
×
2336
        }
×
2337
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionAsyncScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
3✔
2338
        // validate request before pushing to queue
3✔
2339
        err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest.SignalWithStartWorkflowExecutionRequest, scope)
3✔
2340
        if err != nil {
3✔
2341
                return nil, err
×
2342
        }
×
2343
        producer, err := wh.producerManager.GetProducerByDomain(signalWithStartRequest.GetDomain())
3✔
2344
        if err != nil {
4✔
2345
                return nil, err
1✔
2346
        }
1✔
2347

2348
        // Serialize the message to be sent to the queue.
2349
        // Avoid JSON because json encoding of requests excludes PII fields such as input. JSON encoded request are logged by acccess controlled api layer for audit purposes.
2350
        payload, err := wh.thriftrwEncoder.Encode(thrift.FromSignalWithStartWorkflowExecutionAsyncRequest(signalWithStartRequest))
2✔
2351
        if err != nil {
2✔
2352
                return nil, fmt.Errorf("failed to encode SignalWithStartWorkflowExecutionAsyncRequest: %v", err)
×
2353
        }
×
2354

2355
        // propagate the headers from the context to the message
2356
        header := &shared.Header{
2✔
2357
                Fields: map[string][]byte{},
2✔
2358
        }
2✔
2359
        for k, v := range yarpc.CallFromContext(ctx).OriginalHeaders() {
2✔
2360
                header.Fields[k] = []byte(v)
×
2361
        }
×
2362
        messageType := sqlblobs.AsyncRequestTypeSignalWithStartWorkflowExecutionAsyncRequest
2✔
2363
        message := &sqlblobs.AsyncRequestMessage{
2✔
2364
                PartitionKey: common.StringPtr(signalWithStartRequest.GetWorkflowID()),
2✔
2365
                Type:         &messageType,
2✔
2366
                Header:       header,
2✔
2367
                Encoding:     common.StringPtr(string(common.EncodingTypeThriftRW)),
2✔
2368
                Payload:      payload,
2✔
2369
        }
2✔
2370
        err = producer.Publish(ctx, message)
2✔
2371
        if err != nil {
3✔
2372
                return nil, err
1✔
2373
        }
1✔
2374
        return &types.SignalWithStartWorkflowExecutionAsyncResponse{}, nil
1✔
2375
}
2376

2377
// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
2378
// If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history
2379
// and a decision task being created for the execution.
2380
// If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled
2381
// event recorded in history, and a decision task being created for the execution
2382
func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(
2383
        ctx context.Context,
2384
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest,
2385
) (resp *types.StartWorkflowExecutionResponse, retError error) {
33✔
2386
        if wh.isShuttingDown() {
33✔
2387
                return nil, validate.ErrShuttingDown
×
2388
        }
×
2389

2390
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
33✔
2391
        err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest, scope)
33✔
2392
        if err != nil {
33✔
2393
                return nil, err
×
2394
        }
×
2395

2396
        domainName := signalWithStartRequest.GetDomain()
33✔
2397
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
33✔
2398
        if err != nil {
33✔
2399
                return nil, err
×
2400
        }
×
2401
        resp, err = wh.GetHistoryClient().SignalWithStartWorkflowExecution(ctx, &types.HistorySignalWithStartWorkflowExecutionRequest{
33✔
2402
                DomainUUID:             domainID,
33✔
2403
                SignalWithStartRequest: signalWithStartRequest,
33✔
2404
                PartitionConfig:        wh.getPartitionConfig(ctx, domainName),
33✔
2405
        })
33✔
2406
        if err != nil {
39✔
2407
                return nil, err
6✔
2408
        }
6✔
2409

2410
        return resp, nil
27✔
2411
}
2412

2413
func (wh *WorkflowHandler) validateSignalWithStartWorkflowExecutionRequest(ctx context.Context, signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest, scope metrics.Scope) error {
36✔
2414
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
36✔
2415
                return err
×
2416
        }
×
2417

2418
        if signalWithStartRequest == nil {
36✔
2419
                return validate.ErrRequestNotSet
×
2420
        }
×
2421

2422
        domainName := signalWithStartRequest.GetDomain()
36✔
2423
        if domainName == "" {
36✔
2424
                return validate.ErrDomainNotSet
×
2425
        }
×
2426
        if signalWithStartRequest.GetWorkflowID() == "" {
36✔
2427
                return validate.ErrWorkflowIDNotSet
×
2428
        }
×
2429

2430
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
36✔
2431
        if !common.IsValidIDLength(
36✔
2432
                domainName,
36✔
2433
                scope,
36✔
2434
                idLengthWarnLimit,
36✔
2435
                wh.config.DomainNameMaxLength(domainName),
36✔
2436
                metrics.CadenceErrDomainNameExceededWarnLimit,
36✔
2437
                domainName,
36✔
2438
                wh.GetLogger(),
36✔
2439
                tag.IDTypeDomainName) {
36✔
2440
                return validate.ErrDomainTooLong
×
2441
        }
×
2442

2443
        if !common.IsValidIDLength(
36✔
2444
                signalWithStartRequest.GetWorkflowID(),
36✔
2445
                scope,
36✔
2446
                idLengthWarnLimit,
36✔
2447
                wh.config.WorkflowIDMaxLength(domainName),
36✔
2448
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
36✔
2449
                domainName,
36✔
2450
                wh.GetLogger(),
36✔
2451
                tag.IDTypeWorkflowID) {
36✔
2452
                return validate.ErrWorkflowIDTooLong
×
2453
        }
×
2454

2455
        if signalWithStartRequest.GetSignalName() == "" {
36✔
2456
                return validate.ErrSignalNameNotSet
×
2457
        }
×
2458

2459
        if !common.IsValidIDLength(
36✔
2460
                signalWithStartRequest.GetSignalName(),
36✔
2461
                scope,
36✔
2462
                idLengthWarnLimit,
36✔
2463
                wh.config.SignalNameMaxLength(domainName),
36✔
2464
                metrics.CadenceErrSignalNameExceededWarnLimit,
36✔
2465
                domainName,
36✔
2466
                wh.GetLogger(),
36✔
2467
                tag.IDTypeSignalName) {
36✔
2468
                return validate.ErrSignalNameTooLong
×
2469
        }
×
2470

2471
        if signalWithStartRequest.WorkflowType == nil || signalWithStartRequest.WorkflowType.GetName() == "" {
36✔
2472
                return validate.ErrWorkflowTypeNotSet
×
2473
        }
×
2474

2475
        if !common.IsValidIDLength(
36✔
2476
                signalWithStartRequest.WorkflowType.GetName(),
36✔
2477
                scope,
36✔
2478
                idLengthWarnLimit,
36✔
2479
                wh.config.WorkflowTypeMaxLength(domainName),
36✔
2480
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
36✔
2481
                domainName,
36✔
2482
                wh.GetLogger(),
36✔
2483
                tag.IDTypeWorkflowType) {
36✔
2484
                return validate.ErrWorkflowTypeTooLong
×
2485
        }
×
2486

2487
        if err := wh.validateTaskList(signalWithStartRequest.TaskList, scope, domainName); err != nil {
36✔
2488
                return err
×
2489
        }
×
2490

2491
        if !common.IsValidIDLength(
36✔
2492
                signalWithStartRequest.GetRequestID(),
36✔
2493
                scope,
36✔
2494
                idLengthWarnLimit,
36✔
2495
                wh.config.RequestIDMaxLength(domainName),
36✔
2496
                metrics.CadenceErrRequestIDExceededWarnLimit,
36✔
2497
                domainName,
36✔
2498
                wh.GetLogger(),
36✔
2499
                tag.IDTypeRequestID) {
36✔
2500
                return validate.ErrRequestIDTooLong
×
2501
        }
×
2502

2503
        if signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
36✔
2504
                return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
×
2505
        }
×
2506

2507
        if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
36✔
2508
                return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
×
2509
        }
×
2510

2511
        if err := common.ValidateRetryPolicy(signalWithStartRequest.RetryPolicy); err != nil {
36✔
2512
                return err
×
2513
        }
×
2514

2515
        if signalWithStartRequest.GetCronSchedule() != "" {
36✔
2516
                if _, err := backoff.ValidateSchedule(signalWithStartRequest.GetCronSchedule()); err != nil {
×
2517
                        return err
×
2518
                }
×
2519
        }
2520

2521
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(signalWithStartRequest.SearchAttributes, domainName); err != nil {
36✔
2522
                return err
×
2523
        }
×
2524

2525
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
36✔
2526
        if err != nil {
36✔
2527
                return err
×
2528
        }
×
2529

2530
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
36✔
2531
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
36✔
2532
        if err := common.CheckEventBlobSizeLimit(
36✔
2533
                len(signalWithStartRequest.SignalInput),
36✔
2534
                sizeLimitWarn,
36✔
2535
                sizeLimitError,
36✔
2536
                domainID,
36✔
2537
                signalWithStartRequest.GetWorkflowID(),
36✔
2538
                "",
36✔
2539
                scope,
36✔
2540
                wh.GetThrottledLogger(),
36✔
2541
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
36✔
2542
        ); err != nil {
36✔
2543
                return err
×
2544
        }
×
2545
        actualSize := len(signalWithStartRequest.Input) + common.GetSizeOfMapStringToByteArray(signalWithStartRequest.Memo.GetFields())
36✔
2546
        if err := common.CheckEventBlobSizeLimit(
36✔
2547
                actualSize,
36✔
2548
                sizeLimitWarn,
36✔
2549
                sizeLimitError,
36✔
2550
                domainID,
36✔
2551
                signalWithStartRequest.GetWorkflowID(),
36✔
2552
                "",
36✔
2553
                scope,
36✔
2554
                wh.GetThrottledLogger(),
36✔
2555
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
36✔
2556
        ); err != nil {
36✔
2557
                return err
×
2558
        }
×
2559

2560
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
36✔
2561
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
36✔
2562
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2563
        }
×
2564
        return nil
36✔
2565
}
2566

2567
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event
2568
// in the history and immediately terminating the execution instance.
2569
func (wh *WorkflowHandler) TerminateWorkflowExecution(
2570
        ctx context.Context,
2571
        terminateRequest *types.TerminateWorkflowExecutionRequest,
2572
) (retError error) {
48✔
2573
        if wh.isShuttingDown() {
48✔
2574
                return validate.ErrShuttingDown
×
2575
        }
×
2576

2577
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
48✔
2578
                return err
×
2579
        }
×
2580

2581
        if terminateRequest == nil {
48✔
2582
                return validate.ErrRequestNotSet
×
2583
        }
×
2584

2585
        domainName := terminateRequest.GetDomain()
48✔
2586
        wfExecution := terminateRequest.GetWorkflowExecution()
48✔
2587
        if terminateRequest.GetDomain() == "" {
48✔
2588
                return validate.ErrDomainNotSet
×
2589
        }
×
2590
        if err := validate.CheckExecution(wfExecution); err != nil {
48✔
2591
                return err
×
2592
        }
×
2593

2594
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
48✔
2595
        if err != nil {
48✔
2596
                return err
×
2597
        }
×
2598

2599
        err = wh.GetHistoryClient().TerminateWorkflowExecution(ctx, &types.HistoryTerminateWorkflowExecutionRequest{
48✔
2600
                DomainUUID:       domainID,
48✔
2601
                TerminateRequest: terminateRequest,
48✔
2602
        })
48✔
2603
        if err != nil {
48✔
2604
                return wh.normalizeVersionedErrors(ctx, err)
×
2605
        }
×
2606

2607
        return nil
48✔
2608
}
2609

2610
// ResetWorkflowExecution reset an existing workflow execution to the nextFirstEventID
2611
// in the history and immediately terminating the current execution instance.
2612
func (wh *WorkflowHandler) ResetWorkflowExecution(
2613
        ctx context.Context,
2614
        resetRequest *types.ResetWorkflowExecutionRequest,
2615
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
15✔
2616
        if wh.isShuttingDown() {
15✔
2617
                return nil, validate.ErrShuttingDown
×
2618
        }
×
2619

2620
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2621
                return nil, err
×
2622
        }
×
2623

2624
        if resetRequest == nil {
15✔
2625
                return nil, validate.ErrRequestNotSet
×
2626
        }
×
2627

2628
        domainName := resetRequest.GetDomain()
15✔
2629
        wfExecution := resetRequest.GetWorkflowExecution()
15✔
2630
        if domainName == "" {
15✔
2631
                return nil, validate.ErrDomainNotSet
×
2632
        }
×
2633
        if err := validate.CheckExecution(wfExecution); err != nil {
15✔
2634
                return nil, err
×
2635
        }
×
2636

2637
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
15✔
2638
        if err != nil {
15✔
2639
                return nil, err
×
2640
        }
×
2641

2642
        resp, err = wh.GetHistoryClient().ResetWorkflowExecution(ctx, &types.HistoryResetWorkflowExecutionRequest{
15✔
2643
                DomainUUID:   domainID,
15✔
2644
                ResetRequest: resetRequest,
15✔
2645
        })
15✔
2646
        if err != nil {
15✔
2647
                return nil, err
×
2648
        }
×
2649

2650
        return resp, nil
15✔
2651
}
2652

2653
// RequestCancelWorkflowExecution - requests to cancel a workflow execution
2654
func (wh *WorkflowHandler) RequestCancelWorkflowExecution(
2655
        ctx context.Context,
2656
        cancelRequest *types.RequestCancelWorkflowExecutionRequest,
2657
) (retError error) {
6✔
2658
        if wh.isShuttingDown() {
6✔
2659
                return validate.ErrShuttingDown
×
2660
        }
×
2661

2662
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
6✔
2663
                return err
×
2664
        }
×
2665

2666
        if cancelRequest == nil {
6✔
2667
                return validate.ErrRequestNotSet
×
2668
        }
×
2669

2670
        domainName := cancelRequest.GetDomain()
6✔
2671
        wfExecution := cancelRequest.GetWorkflowExecution()
6✔
2672
        if domainName == "" {
6✔
2673
                return validate.ErrDomainNotSet
×
2674
        }
×
2675
        if err := validate.CheckExecution(wfExecution); err != nil {
6✔
2676
                return err
×
2677
        }
×
2678

2679
        domainID, err := wh.GetDomainCache().GetDomainID(cancelRequest.GetDomain())
6✔
2680
        if err != nil {
6✔
2681
                return err
×
2682
        }
×
2683

2684
        err = wh.GetHistoryClient().RequestCancelWorkflowExecution(ctx, &types.HistoryRequestCancelWorkflowExecutionRequest{
6✔
2685
                DomainUUID:    domainID,
6✔
2686
                CancelRequest: cancelRequest,
6✔
2687
        })
6✔
2688
        if err != nil {
9✔
2689
                return wh.normalizeVersionedErrors(ctx, err)
3✔
2690
        }
3✔
2691

2692
        return nil
3✔
2693
}
2694

2695
// ListOpenWorkflowExecutions - retrieves info for open workflow executions in a domain
2696
func (wh *WorkflowHandler) ListOpenWorkflowExecutions(
2697
        ctx context.Context,
2698
        listRequest *types.ListOpenWorkflowExecutionsRequest,
2699
) (resp *types.ListOpenWorkflowExecutionsResponse, retError error) {
106✔
2700
        if wh.isShuttingDown() {
106✔
2701
                return nil, validate.ErrShuttingDown
×
2702
        }
×
2703

2704
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
106✔
2705
                return nil, err
×
2706
        }
×
2707

2708
        if listRequest == nil {
106✔
2709
                return nil, validate.ErrRequestNotSet
×
2710
        }
×
2711

2712
        if listRequest.GetDomain() == "" {
106✔
2713
                return nil, validate.ErrDomainNotSet
×
2714
        }
×
2715

2716
        if listRequest.StartTimeFilter == nil {
106✔
2717
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2718
        }
×
2719

2720
        if listRequest.StartTimeFilter.EarliestTime == nil {
106✔
2721
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2722
        }
×
2723

2724
        if listRequest.StartTimeFilter.LatestTime == nil {
106✔
2725
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2726
        }
×
2727

2728
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
106✔
2729
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2730
        }
×
2731

2732
        if listRequest.ExecutionFilter != nil && listRequest.TypeFilter != nil {
106✔
2733
                return nil, &types.BadRequestError{
×
2734
                        Message: "Only one of ExecutionFilter or TypeFilter is allowed"}
×
2735
        }
×
2736

2737
        if listRequest.GetMaximumPageSize() <= 0 {
167✔
2738
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
61✔
2739
        }
61✔
2740

2741
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
106✔
2742
                return nil, &types.BadRequestError{
×
2743
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2744
        }
×
2745

2746
        domain := listRequest.GetDomain()
106✔
2747
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
106✔
2748
        if err != nil {
106✔
2749
                return nil, err
×
2750
        }
×
2751

2752
        baseReq := persistence.ListWorkflowExecutionsRequest{
106✔
2753
                DomainUUID:    domainID,
106✔
2754
                Domain:        domain,
106✔
2755
                PageSize:      int(listRequest.GetMaximumPageSize()),
106✔
2756
                NextPageToken: listRequest.NextPageToken,
106✔
2757
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
106✔
2758
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
106✔
2759
        }
106✔
2760

106✔
2761
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
106✔
2762
        if listRequest.ExecutionFilter != nil {
206✔
2763
                if wh.config.DisableListVisibilityByFilter(domain) {
101✔
2764
                        err = validate.ErrNoPermission
1✔
2765
                } else {
100✔
2766
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByWorkflowID(
99✔
2767
                                ctx,
99✔
2768
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
99✔
2769
                                        ListWorkflowExecutionsRequest: baseReq,
99✔
2770
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
99✔
2771
                                })
99✔
2772
                }
99✔
2773
                wh.GetLogger().Debug("List open workflow with filter",
100✔
2774
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
100✔
2775
        } else if listRequest.TypeFilter != nil {
7✔
2776
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2777
                        err = validate.ErrNoPermission
1✔
2778
                } else {
1✔
2779
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByType(
×
2780
                                ctx,
×
2781
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2782
                                        ListWorkflowExecutionsRequest: baseReq,
×
2783
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2784
                                },
×
2785
                        )
×
2786
                }
×
2787
                wh.GetLogger().Debug("List open workflow with filter",
1✔
2788
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2789
        } else {
5✔
2790
                persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutions(ctx, &baseReq)
5✔
2791
        }
5✔
2792

2793
        if err != nil {
108✔
2794
                return nil, err
2✔
2795
        }
2✔
2796

2797
        resp = &types.ListOpenWorkflowExecutionsResponse{}
104✔
2798
        resp.Executions = persistenceResp.Executions
104✔
2799
        resp.NextPageToken = persistenceResp.NextPageToken
104✔
2800
        return resp, nil
104✔
2801
}
2802

2803
// ListArchivedWorkflowExecutions - retrieves archived info for closed workflow executions in a domain
2804
func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(
2805
        ctx context.Context,
2806
        listRequest *types.ListArchivedWorkflowExecutionsRequest,
2807
) (resp *types.ListArchivedWorkflowExecutionsResponse, retError error) {
15✔
2808
        if wh.isShuttingDown() {
15✔
2809
                return nil, validate.ErrShuttingDown
×
2810
        }
×
2811

2812
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2813
                return nil, err
×
2814
        }
×
2815

2816
        if listRequest == nil {
15✔
2817
                return nil, validate.ErrRequestNotSet
×
2818
        }
×
2819

2820
        if listRequest.GetDomain() == "" {
16✔
2821
                return nil, validate.ErrDomainNotSet
1✔
2822
        }
1✔
2823

2824
        if listRequest.GetPageSize() <= 0 {
14✔
2825
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
2826
        }
×
2827

2828
        maxPageSize := wh.config.VisibilityArchivalQueryMaxPageSize()
14✔
2829
        if int(listRequest.GetPageSize()) > maxPageSize {
14✔
2830
                return nil, &types.BadRequestError{
×
2831
                        Message: fmt.Sprintf("Pagesize is larger than allowed %d", maxPageSize)}
×
2832
        }
×
2833

2834
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() {
15✔
2835
                return nil, &types.BadRequestError{Message: "Cluster is not configured for visibility archival"}
1✔
2836
        }
1✔
2837

2838
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ReadEnabled() {
13✔
2839
                return nil, &types.BadRequestError{Message: "Cluster is not configured for reading archived visibility records"}
×
2840
        }
×
2841

2842
        entry, err := wh.GetDomainCache().GetDomain(listRequest.GetDomain())
13✔
2843
        if err != nil {
14✔
2844
                return nil, err
1✔
2845
        }
1✔
2846

2847
        if entry.GetConfig().VisibilityArchivalStatus != types.ArchivalStatusEnabled {
14✔
2848
                return nil, &types.BadRequestError{Message: "Domain is not configured for visibility archival"}
2✔
2849
        }
2✔
2850

2851
        URI, err := archiver.NewURI(entry.GetConfig().VisibilityArchivalURI)
10✔
2852
        if err != nil {
10✔
2853
                return nil, err
×
2854
        }
×
2855

2856
        visibilityArchiver, err := wh.GetArchiverProvider().GetVisibilityArchiver(URI.Scheme(), service.Frontend)
10✔
2857
        if err != nil {
10✔
2858
                return nil, err
×
2859
        }
×
2860

2861
        archiverRequest := &archiver.QueryVisibilityRequest{
10✔
2862
                DomainID:      entry.GetInfo().ID,
10✔
2863
                PageSize:      int(listRequest.GetPageSize()),
10✔
2864
                NextPageToken: listRequest.NextPageToken,
10✔
2865
                Query:         listRequest.GetQuery(),
10✔
2866
        }
10✔
2867

10✔
2868
        archiverResponse, err := visibilityArchiver.Query(ctx, URI, archiverRequest)
10✔
2869
        if err != nil {
10✔
2870
                return nil, err
×
2871
        }
×
2872

2873
        // special handling of ExecutionTime for cron or retry
2874
        for _, execution := range archiverResponse.Executions {
25✔
2875
                if execution.GetExecutionTime() == 0 {
30✔
2876
                        execution.ExecutionTime = common.Int64Ptr(execution.GetStartTime())
15✔
2877
                }
15✔
2878
        }
2879

2880
        return &types.ListArchivedWorkflowExecutionsResponse{
10✔
2881
                Executions:    archiverResponse.Executions,
10✔
2882
                NextPageToken: archiverResponse.NextPageToken,
10✔
2883
        }, nil
10✔
2884
}
2885

2886
// ListClosedWorkflowExecutions - retrieves info for closed workflow executions in a domain
2887
func (wh *WorkflowHandler) ListClosedWorkflowExecutions(
2888
        ctx context.Context,
2889
        listRequest *types.ListClosedWorkflowExecutionsRequest,
2890
) (resp *types.ListClosedWorkflowExecutionsResponse, retError error) {
29✔
2891
        if wh.isShuttingDown() {
29✔
2892
                return nil, validate.ErrShuttingDown
×
2893
        }
×
2894

2895
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
29✔
2896
                return nil, err
×
2897
        }
×
2898

2899
        if listRequest == nil {
29✔
2900
                return nil, validate.ErrRequestNotSet
×
2901
        }
×
2902

2903
        if listRequest.GetDomain() == "" {
29✔
2904
                return nil, validate.ErrDomainNotSet
×
2905
        }
×
2906

2907
        if listRequest.StartTimeFilter == nil {
29✔
2908
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2909
        }
×
2910

2911
        if listRequest.StartTimeFilter.EarliestTime == nil {
29✔
2912
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2913
        }
×
2914

2915
        if listRequest.StartTimeFilter.LatestTime == nil {
29✔
2916
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2917
        }
×
2918

2919
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
29✔
2920
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2921
        }
×
2922

2923
        filterCount := 0
29✔
2924
        if listRequest.TypeFilter != nil {
30✔
2925
                filterCount++
1✔
2926
        }
1✔
2927
        if listRequest.StatusFilter != nil {
30✔
2928
                filterCount++
1✔
2929
        }
1✔
2930

2931
        if filterCount > 1 {
29✔
2932
                return nil, &types.BadRequestError{
×
2933
                        Message: "Only one of ExecutionFilter, TypeFilter or StatusFilter is allowed"}
×
2934
        } // If ExecutionFilter is provided with one of TypeFilter or StatusFilter, use ExecutionFilter and ignore other filter
×
2935

2936
        if listRequest.GetMaximumPageSize() <= 0 {
30✔
2937
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
1✔
2938
        }
1✔
2939

2940
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
29✔
2941
                return nil, &types.BadRequestError{
×
2942
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2943
        }
×
2944

2945
        domain := listRequest.GetDomain()
29✔
2946
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
29✔
2947
        if err != nil {
29✔
2948
                return nil, err
×
2949
        }
×
2950

2951
        baseReq := persistence.ListWorkflowExecutionsRequest{
29✔
2952
                DomainUUID:    domainID,
29✔
2953
                Domain:        domain,
29✔
2954
                PageSize:      int(listRequest.GetMaximumPageSize()),
29✔
2955
                NextPageToken: listRequest.NextPageToken,
29✔
2956
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
29✔
2957
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
29✔
2958
        }
29✔
2959

29✔
2960
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
29✔
2961
        if listRequest.ExecutionFilter != nil {
45✔
2962
                if wh.config.DisableListVisibilityByFilter(domain) {
17✔
2963
                        err = validate.ErrNoPermission
1✔
2964
                } else {
16✔
2965
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByWorkflowID(
15✔
2966
                                ctx,
15✔
2967
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
15✔
2968
                                        ListWorkflowExecutionsRequest: baseReq,
15✔
2969
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
15✔
2970
                                },
15✔
2971
                        )
15✔
2972
                }
15✔
2973
                wh.GetLogger().Debug("List closed workflow with filter",
16✔
2974
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
16✔
2975
        } else if listRequest.TypeFilter != nil {
14✔
2976
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2977
                        err = validate.ErrNoPermission
1✔
2978
                } else {
1✔
2979
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByType(
×
2980
                                ctx,
×
2981
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2982
                                        ListWorkflowExecutionsRequest: baseReq,
×
2983
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2984
                                },
×
2985
                        )
×
2986
                }
×
2987
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
2988
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2989
        } else if listRequest.StatusFilter != nil {
13✔
2990
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2991
                        err = validate.ErrNoPermission
1✔
2992
                } else {
1✔
2993
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByStatus(
×
2994
                                ctx,
×
2995
                                &persistence.ListClosedWorkflowExecutionsByStatusRequest{
×
2996
                                        ListWorkflowExecutionsRequest: baseReq,
×
2997
                                        Status:                        listRequest.GetStatusFilter(),
×
2998
                                },
×
2999
                        )
×
3000
                }
×
3001
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
3002
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByStatus)
1✔
3003
        } else {
11✔
3004
                persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutions(ctx, &baseReq)
11✔
3005
        }
11✔
3006

3007
        if err != nil {
32✔
3008
                return nil, err
3✔
3009
        }
3✔
3010

3011
        resp = &types.ListClosedWorkflowExecutionsResponse{}
26✔
3012
        resp.Executions = persistenceResp.Executions
26✔
3013
        resp.NextPageToken = persistenceResp.NextPageToken
26✔
3014
        return resp, nil
26✔
3015
}
3016

3017
// ListWorkflowExecutions - retrieves info for workflow executions in a domain
3018
func (wh *WorkflowHandler) ListWorkflowExecutions(
3019
        ctx context.Context,
3020
        listRequest *types.ListWorkflowExecutionsRequest,
3021
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
146✔
3022
        if wh.isShuttingDown() {
146✔
3023
                return nil, validate.ErrShuttingDown
×
3024
        }
×
3025

3026
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
146✔
3027
                return nil, err
×
3028
        }
×
3029

3030
        if listRequest == nil {
146✔
3031
                return nil, validate.ErrRequestNotSet
×
3032
        }
×
3033

3034
        if listRequest.GetDomain() == "" {
146✔
3035
                return nil, validate.ErrDomainNotSet
×
3036
        }
×
3037

3038
        if listRequest.GetPageSize() <= 0 {
146✔
3039
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3040
        }
×
3041

3042
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
146✔
3043
                return nil, &types.BadRequestError{
×
3044
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3045
        }
×
3046

3047
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
146✔
3048
        if err != nil {
149✔
3049
                return nil, err
3✔
3050
        }
3✔
3051

3052
        domain := listRequest.GetDomain()
143✔
3053
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
143✔
3054
        if err != nil {
143✔
3055
                return nil, err
×
3056
        }
×
3057

3058
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
143✔
3059
                DomainUUID:    domainID,
143✔
3060
                Domain:        domain,
143✔
3061
                PageSize:      int(listRequest.GetPageSize()),
143✔
3062
                NextPageToken: listRequest.NextPageToken,
143✔
3063
                Query:         validatedQuery,
143✔
3064
        }
143✔
3065
        persistenceResp, err := wh.GetVisibilityManager().ListWorkflowExecutions(ctx, req)
143✔
3066
        if err != nil {
143✔
3067
                return nil, err
×
3068
        }
×
3069

3070
        resp = &types.ListWorkflowExecutionsResponse{}
143✔
3071
        resp.Executions = persistenceResp.Executions
143✔
3072
        resp.NextPageToken = persistenceResp.NextPageToken
143✔
3073
        return resp, nil
143✔
3074
}
3075

3076
// RestartWorkflowExecution - retrieves info for an existing workflow then restarts it
3077
func (wh *WorkflowHandler) RestartWorkflowExecution(ctx context.Context, request *types.RestartWorkflowExecutionRequest) (resp *types.RestartWorkflowExecutionResponse, retError error) {
2✔
3078
        if wh.isShuttingDown() {
2✔
3079
                return nil, validate.ErrShuttingDown
×
3080
        }
×
3081

3082
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
3083
                return nil, err
×
3084
        }
×
3085

3086
        if request == nil {
2✔
3087
                return nil, validate.ErrRequestNotSet
×
3088
        }
×
3089

3090
        domainName := request.GetDomain()
2✔
3091
        wfExecution := request.GetWorkflowExecution()
2✔
3092

2✔
3093
        if request.GetDomain() == "" {
2✔
3094
                return nil, validate.ErrDomainNotSet
×
3095
        }
×
3096

3097
        if err := validate.CheckExecution(wfExecution); err != nil {
2✔
3098
                return nil, err
×
3099
        }
×
3100

3101
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
2✔
3102
        if err != nil {
2✔
3103
                return nil, err
×
3104
        }
×
3105

3106
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
2✔
3107
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
3✔
3108
                return nil, &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
3109
        }
1✔
3110

3111
        history, err := wh.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
1✔
3112
                Domain: domainName,
1✔
3113
                Execution: &types.WorkflowExecution{
1✔
3114
                        WorkflowID: wfExecution.WorkflowID,
1✔
3115
                        RunID:      wfExecution.RunID,
1✔
3116
                },
1✔
3117
                SkipArchival: true,
1✔
3118
        })
1✔
3119
        if err != nil {
1✔
3120
                return nil, validate.ErrHistoryNotFound
×
3121
        }
×
3122
        startRequest := constructRestartWorkflowRequest(history.History.Events[0].WorkflowExecutionStartedEventAttributes,
1✔
3123
                domainName, request.Identity, wfExecution.WorkflowID)
1✔
3124
        req, err := common.CreateHistoryStartWorkflowRequest(domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
1✔
3125
        if err != nil {
1✔
3126
                return nil, err
×
3127
        }
×
3128
        startResp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, req)
1✔
3129
        if err != nil {
1✔
3130
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3131
        }
×
3132
        resp = &types.RestartWorkflowExecutionResponse{
1✔
3133
                RunID: startResp.RunID,
1✔
3134
        }
1✔
3135

1✔
3136
        return resp, nil
1✔
3137
}
3138

3139
// ScanWorkflowExecutions - retrieves info for large amount of workflow executions in a domain without order
3140
func (wh *WorkflowHandler) ScanWorkflowExecutions(
3141
        ctx context.Context,
3142
        listRequest *types.ListWorkflowExecutionsRequest,
3143
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
26✔
3144
        if wh.isShuttingDown() {
26✔
3145
                return nil, validate.ErrShuttingDown
×
3146
        }
×
3147

3148
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
26✔
3149
                return nil, err
×
3150
        }
×
3151

3152
        if listRequest == nil {
26✔
3153
                return nil, validate.ErrRequestNotSet
×
3154
        }
×
3155

3156
        if listRequest.GetDomain() == "" {
26✔
3157
                return nil, validate.ErrDomainNotSet
×
3158
        }
×
3159

3160
        if listRequest.GetPageSize() <= 0 {
26✔
3161
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3162
        }
×
3163

3164
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
26✔
3165
                return nil, &types.BadRequestError{
×
3166
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3167
        }
×
3168

3169
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
26✔
3170
        if err != nil {
27✔
3171
                return nil, err
1✔
3172
        }
1✔
3173

3174
        domain := listRequest.GetDomain()
25✔
3175
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
25✔
3176
        if err != nil {
25✔
3177
                return nil, err
×
3178
        }
×
3179

3180
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
25✔
3181
                DomainUUID:    domainID,
25✔
3182
                Domain:        domain,
25✔
3183
                PageSize:      int(listRequest.GetPageSize()),
25✔
3184
                NextPageToken: listRequest.NextPageToken,
25✔
3185
                Query:         validatedQuery,
25✔
3186
        }
25✔
3187
        persistenceResp, err := wh.GetVisibilityManager().ScanWorkflowExecutions(ctx, req)
25✔
3188
        if err != nil {
25✔
3189
                return nil, err
×
3190
        }
×
3191

3192
        resp = &types.ListWorkflowExecutionsResponse{}
25✔
3193
        resp.Executions = persistenceResp.Executions
25✔
3194
        resp.NextPageToken = persistenceResp.NextPageToken
25✔
3195
        return resp, nil
25✔
3196
}
3197

3198
// CountWorkflowExecutions - count number of workflow executions in a domain
3199
func (wh *WorkflowHandler) CountWorkflowExecutions(
3200
        ctx context.Context,
3201
        countRequest *types.CountWorkflowExecutionsRequest,
3202
) (resp *types.CountWorkflowExecutionsResponse, retError error) {
14✔
3203
        if wh.isShuttingDown() {
14✔
3204
                return nil, validate.ErrShuttingDown
×
3205
        }
×
3206

3207
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
14✔
3208
                return nil, err
×
3209
        }
×
3210

3211
        if countRequest == nil {
14✔
3212
                return nil, validate.ErrRequestNotSet
×
3213
        }
×
3214

3215
        if countRequest.GetDomain() == "" {
14✔
3216
                return nil, validate.ErrDomainNotSet
×
3217
        }
×
3218

3219
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(countRequest.GetQuery())
14✔
3220
        if err != nil {
15✔
3221
                return nil, err
1✔
3222
        }
1✔
3223

3224
        domain := countRequest.GetDomain()
13✔
3225
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
13✔
3226
        if err != nil {
13✔
3227
                return nil, err
×
3228
        }
×
3229

3230
        req := &persistence.CountWorkflowExecutionsRequest{
13✔
3231
                DomainUUID: domainID,
13✔
3232
                Domain:     domain,
13✔
3233
                Query:      validatedQuery,
13✔
3234
        }
13✔
3235
        persistenceResp, err := wh.GetVisibilityManager().CountWorkflowExecutions(ctx, req)
13✔
3236
        if err != nil {
13✔
3237
                return nil, err
×
3238
        }
×
3239

3240
        resp = &types.CountWorkflowExecutionsResponse{
13✔
3241
                Count: persistenceResp.Count,
13✔
3242
        }
13✔
3243
        return resp, nil
13✔
3244
}
3245

3246
// GetSearchAttributes return valid indexed keys
3247
func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context) (resp *types.GetSearchAttributesResponse, retError error) {
1✔
3248
        if wh.isShuttingDown() {
1✔
3249
                return nil, validate.ErrShuttingDown
×
3250
        }
×
3251

3252
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1✔
3253
                return nil, err
×
3254
        }
×
3255

3256
        keys := wh.config.ValidSearchAttributes()
1✔
3257
        resp = &types.GetSearchAttributesResponse{
1✔
3258
                Keys: wh.convertIndexedKeyToThrift(keys),
1✔
3259
        }
1✔
3260
        return resp, nil
1✔
3261
}
3262

3263
// ResetStickyTaskList reset the volatile information in mutable state of a given workflow.
3264
func (wh *WorkflowHandler) ResetStickyTaskList(
3265
        ctx context.Context,
3266
        resetRequest *types.ResetStickyTaskListRequest,
3267
) (resp *types.ResetStickyTaskListResponse, retError error) {
3✔
3268
        if wh.isShuttingDown() {
3✔
3269
                return nil, validate.ErrShuttingDown
×
3270
        }
×
3271

3272
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
3273
                return nil, err
×
3274
        }
×
3275

3276
        if resetRequest == nil {
3✔
3277
                return nil, validate.ErrRequestNotSet
×
3278
        }
×
3279

3280
        domainName := resetRequest.GetDomain()
3✔
3281
        wfExecution := resetRequest.GetExecution()
3✔
3282

3✔
3283
        if domainName == "" {
3✔
3284
                return nil, validate.ErrDomainNotSet
×
3285
        }
×
3286

3287
        if err := validate.CheckExecution(wfExecution); err != nil {
3✔
3288
                return nil, err
×
3289
        }
×
3290

3291
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
3✔
3292
        if err != nil {
3✔
3293
                return nil, err
×
3294
        }
×
3295

3296
        _, err = wh.GetHistoryClient().ResetStickyTaskList(ctx, &types.HistoryResetStickyTaskListRequest{
3✔
3297
                DomainUUID: domainID,
3✔
3298
                Execution:  resetRequest.Execution,
3✔
3299
        })
3✔
3300
        if err != nil {
3✔
3301
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3302
        }
×
3303
        return &types.ResetStickyTaskListResponse{}, nil
3✔
3304
}
3305

3306
// QueryWorkflow returns query result for a specified workflow execution
3307
func (wh *WorkflowHandler) QueryWorkflow(
3308
        ctx context.Context,
3309
        queryRequest *types.QueryWorkflowRequest,
3310
) (resp *types.QueryWorkflowResponse, retError error) {
45✔
3311
        if wh.isShuttingDown() {
45✔
3312
                return nil, validate.ErrShuttingDown
×
3313
        }
×
3314

3315
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
45✔
3316
                return nil, err
×
3317
        }
×
3318

3319
        if queryRequest == nil {
45✔
3320
                return nil, validate.ErrRequestNotSet
×
3321
        }
×
3322

3323
        domainName := queryRequest.GetDomain()
45✔
3324
        wfExecution := queryRequest.GetExecution()
45✔
3325

45✔
3326
        if domainName == "" {
45✔
3327
                return nil, validate.ErrDomainNotSet
×
3328
        }
×
3329

3330
        if err := validate.CheckExecution(wfExecution); err != nil {
45✔
3331
                return nil, err
×
3332
        }
×
3333

3334
        if wh.config.DisallowQuery(domainName) {
45✔
3335
                return nil, validate.ErrQueryDisallowedForDomain
×
3336
        }
×
3337

3338
        if queryRequest.Query == nil {
45✔
3339
                return nil, validate.ErrQueryNotSet
×
3340
        }
×
3341

3342
        if queryRequest.Query.GetQueryType() == "" {
45✔
3343
                return nil, validate.ErrQueryTypeNotSet
×
3344
        }
×
3345

3346
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
45✔
3347
        if err != nil {
45✔
3348
                return nil, err
×
3349
        }
×
3350

3351
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
45✔
3352
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
45✔
3353

45✔
3354
        scope := getMetricsScopeWithDomain(metrics.FrontendQueryWorkflowScope, queryRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
45✔
3355
        if err := common.CheckEventBlobSizeLimit(
45✔
3356
                len(queryRequest.GetQuery().GetQueryArgs()),
45✔
3357
                sizeLimitWarn,
45✔
3358
                sizeLimitError,
45✔
3359
                domainID,
45✔
3360
                queryRequest.GetExecution().GetWorkflowID(),
45✔
3361
                queryRequest.GetExecution().GetRunID(),
45✔
3362
                scope,
45✔
3363
                wh.GetThrottledLogger(),
45✔
3364
                tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil {
45✔
3365
                return nil, err
×
3366
        }
×
3367

3368
        req := &types.HistoryQueryWorkflowRequest{
45✔
3369
                DomainUUID: domainID,
45✔
3370
                Request:    queryRequest,
45✔
3371
        }
45✔
3372
        hResponse, err := wh.GetHistoryClient().QueryWorkflow(ctx, req)
45✔
3373
        if err != nil {
57✔
3374
                return nil, err
12✔
3375
        }
12✔
3376
        return hResponse.GetResponse(), nil
33✔
3377
}
3378

3379
// DescribeWorkflowExecution returns information about the specified workflow execution.
3380
func (wh *WorkflowHandler) DescribeWorkflowExecution(
3381
        ctx context.Context,
3382
        request *types.DescribeWorkflowExecutionRequest,
3383
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
93✔
3384
        if wh.isShuttingDown() {
93✔
3385
                return nil, validate.ErrShuttingDown
×
3386
        }
×
3387

3388
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
93✔
3389
                return nil, err
×
3390
        }
×
3391

3392
        if request == nil {
93✔
3393
                return nil, validate.ErrRequestNotSet
×
3394
        }
×
3395

3396
        domainName := request.GetDomain()
93✔
3397
        wfExecution := request.GetExecution()
93✔
3398
        if domainName == "" {
93✔
3399
                return nil, validate.ErrDomainNotSet
×
3400
        }
×
3401

3402
        if err := validate.CheckExecution(wfExecution); err != nil {
93✔
3403
                return nil, err
×
3404
        }
×
3405

3406
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
93✔
3407
        if err != nil {
93✔
3408
                return nil, err
×
3409
        }
×
3410

3411
        response, err := wh.GetHistoryClient().DescribeWorkflowExecution(ctx, &types.HistoryDescribeWorkflowExecutionRequest{
93✔
3412
                DomainUUID: domainID,
93✔
3413
                Request:    request,
93✔
3414
        })
93✔
3415

93✔
3416
        if err != nil {
93✔
3417
                return nil, err
×
3418
        }
×
3419

3420
        return response, nil
93✔
3421
}
3422

3423
// DescribeTaskList returns information about the target tasklist, right now this API returns the
3424
// pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true,
3425
// it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
3426
func (wh *WorkflowHandler) DescribeTaskList(
3427
        ctx context.Context,
3428
        request *types.DescribeTaskListRequest,
3429
) (resp *types.DescribeTaskListResponse, retError error) {
18✔
3430
        if wh.isShuttingDown() {
18✔
3431
                return nil, validate.ErrShuttingDown
×
3432
        }
×
3433

3434
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
18✔
3435
                return nil, err
×
3436
        }
×
3437

3438
        if request == nil {
18✔
3439
                return nil, validate.ErrRequestNotSet
×
3440
        }
×
3441

3442
        if request.GetDomain() == "" {
18✔
3443
                return nil, validate.ErrDomainNotSet
×
3444
        }
×
3445

3446
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
18✔
3447
        if err != nil {
18✔
3448
                return nil, err
×
3449
        }
×
3450

3451
        scope := getMetricsScopeWithDomain(metrics.FrontendDescribeTaskListScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
18✔
3452
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
18✔
3453
                return nil, err
×
3454
        }
×
3455

3456
        if request.TaskListType == nil {
18✔
3457
                return nil, validate.ErrTaskListTypeNotSet
×
3458
        }
×
3459

3460
        response, err := wh.GetMatchingClient().DescribeTaskList(ctx, &types.MatchingDescribeTaskListRequest{
18✔
3461
                DomainUUID:  domainID,
18✔
3462
                DescRequest: request,
18✔
3463
        })
18✔
3464
        if err != nil {
18✔
3465
                return nil, err
×
3466
        }
×
3467

3468
        return response, nil
18✔
3469
}
3470

3471
// ListTaskListPartitions returns all the partition and host for a taskList
3472
func (wh *WorkflowHandler) ListTaskListPartitions(
3473
        ctx context.Context,
3474
        request *types.ListTaskListPartitionsRequest,
3475
) (resp *types.ListTaskListPartitionsResponse, retError error) {
×
3476
        if wh.isShuttingDown() {
×
3477
                return nil, validate.ErrShuttingDown
×
3478
        }
×
3479

3480
        if request == nil {
×
3481
                return nil, validate.ErrRequestNotSet
×
3482
        }
×
3483

3484
        if request.GetDomain() == "" {
×
3485
                return nil, validate.ErrDomainNotSet
×
3486
        }
×
3487

3488
        scope := getMetricsScopeWithDomain(metrics.FrontendListTaskListPartitionsScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
3489
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
×
3490
                return nil, err
×
3491
        }
×
3492

3493
        resp, err := wh.GetMatchingClient().ListTaskListPartitions(ctx, &types.MatchingListTaskListPartitionsRequest{
×
3494
                Domain:   request.Domain,
×
3495
                TaskList: request.TaskList,
×
3496
        })
×
3497
        return resp, err
×
3498
}
3499

3500
// GetTaskListsByDomain returns all the partition and host for a taskList
3501
func (wh *WorkflowHandler) GetTaskListsByDomain(
3502
        ctx context.Context,
3503
        request *types.GetTaskListsByDomainRequest,
3504
) (resp *types.GetTaskListsByDomainResponse, retError error) {
×
3505
        if wh.isShuttingDown() {
×
3506
                return nil, validate.ErrShuttingDown
×
3507
        }
×
3508

3509
        if request == nil {
×
3510
                return nil, validate.ErrRequestNotSet
×
3511
        }
×
3512

3513
        if request.GetDomain() == "" {
×
3514
                return nil, validate.ErrDomainNotSet
×
3515
        }
×
3516

3517
        resp, err := wh.GetMatchingClient().GetTaskListsByDomain(ctx, &types.GetTaskListsByDomainRequest{
×
3518
                Domain: request.Domain,
×
3519
        })
×
3520
        return resp, err
×
3521
}
3522

3523
// RefreshWorkflowTasks re-generates the workflow tasks
3524
func (wh *WorkflowHandler) RefreshWorkflowTasks(
3525
        ctx context.Context,
3526
        request *types.RefreshWorkflowTasksRequest,
3527
) (err error) {
×
3528
        if request == nil {
×
3529
                return validate.ErrRequestNotSet
×
3530
        }
×
3531
        if err := validate.CheckExecution(request.Execution); err != nil {
×
3532
                return err
×
3533
        }
×
3534
        domainEntry, err := wh.GetDomainCache().GetDomain(request.GetDomain())
×
3535
        if err != nil {
×
3536
                return err
×
3537
        }
×
3538

3539
        err = wh.GetHistoryClient().RefreshWorkflowTasks(ctx, &types.HistoryRefreshWorkflowTasksRequest{
×
3540
                DomainUIID: domainEntry.GetInfo().ID,
×
3541
                Request:    request,
×
3542
        })
×
3543
        if err != nil {
×
3544
                return err
×
3545
        }
×
3546
        return nil
×
3547
}
3548

3549
func (wh *WorkflowHandler) getRawHistory(
3550
        ctx context.Context,
3551
        scope metrics.Scope,
3552
        domainID string,
3553
        domainName string,
3554
        execution types.WorkflowExecution,
3555
        firstEventID int64,
3556
        nextEventID int64,
3557
        pageSize int32,
3558
        nextPageToken []byte,
3559
        transientDecision *types.TransientDecisionInfo,
3560
        branchToken []byte,
3561
) ([]*types.DataBlob, []byte, error) {
2✔
3562
        rawHistory := []*types.DataBlob{}
2✔
3563
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
2✔
3564

2✔
3565
        resp, err := wh.GetHistoryManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
2✔
3566
                BranchToken:   branchToken,
2✔
3567
                MinEventID:    firstEventID,
2✔
3568
                MaxEventID:    nextEventID,
2✔
3569
                PageSize:      int(pageSize),
2✔
3570
                NextPageToken: nextPageToken,
2✔
3571
                ShardID:       common.IntPtr(shardID),
2✔
3572
                DomainName:    domainName,
2✔
3573
        })
2✔
3574
        if err != nil {
2✔
3575
                return nil, nil, err
×
3576
        }
×
3577

3578
        var encoding *types.EncodingType
2✔
3579
        for _, data := range resp.HistoryEventBlobs {
4✔
3580
                switch data.Encoding {
2✔
3581
                case common.EncodingTypeJSON:
×
3582
                        encoding = types.EncodingTypeJSON.Ptr()
×
3583
                case common.EncodingTypeThriftRW:
2✔
3584
                        encoding = types.EncodingTypeThriftRW.Ptr()
2✔
3585
                default:
×
3586
                        panic(fmt.Sprintf("Invalid encoding type for raw history, encoding type: %s", data.Encoding))
×
3587
                }
3588
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3589
                        EncodingType: encoding,
2✔
3590
                        Data:         data.Data,
2✔
3591
                })
2✔
3592
        }
3593

3594
        if len(resp.NextPageToken) == 0 && transientDecision != nil {
4✔
3595
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
2✔
3596
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3597
                        wh.GetLogger().Error("getHistory error",
×
3598
                                tag.WorkflowDomainID(domainID),
×
3599
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3600
                                tag.WorkflowRunID(execution.GetRunID()),
×
3601
                                tag.Error(err))
×
3602
                }
×
3603
                blob, err := wh.GetPayloadSerializer().SerializeBatchEvents(
2✔
3604
                        []*types.HistoryEvent{transientDecision.ScheduledEvent, transientDecision.StartedEvent}, common.EncodingTypeThriftRW)
2✔
3605
                if err != nil {
2✔
3606
                        return nil, nil, err
×
3607
                }
×
3608
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3609
                        EncodingType: types.EncodingTypeThriftRW.Ptr(),
2✔
3610
                        Data:         blob.Data,
2✔
3611
                })
2✔
3612
        }
3613

3614
        return rawHistory, resp.NextPageToken, nil
2✔
3615
}
3616

3617
func (wh *WorkflowHandler) getHistory(
3618
        ctx context.Context,
3619
        scope metrics.Scope,
3620
        domainID string,
3621
        domainName string,
3622
        execution types.WorkflowExecution,
3623
        firstEventID, nextEventID int64,
3624
        pageSize int32,
3625
        nextPageToken []byte,
3626
        transientDecision *types.TransientDecisionInfo,
3627
        branchToken []byte,
3628
) (*types.History, []byte, error) {
1,589✔
3629

1,589✔
3630
        var size int
1,589✔
3631

1,589✔
3632
        isFirstPage := len(nextPageToken) == 0
1,589✔
3633
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
1,589✔
3634
        var err error
1,589✔
3635
        historyEvents, size, nextPageToken, err := persistenceutils.ReadFullPageV2Events(ctx, wh.GetHistoryManager(), &persistence.ReadHistoryBranchRequest{
1,589✔
3636
                BranchToken:   branchToken,
1,589✔
3637
                MinEventID:    firstEventID,
1,589✔
3638
                MaxEventID:    nextEventID,
1,589✔
3639
                PageSize:      int(pageSize),
1,589✔
3640
                NextPageToken: nextPageToken,
1,589✔
3641
                ShardID:       common.IntPtr(shardID),
1,589✔
3642
                DomainName:    domainName,
1,589✔
3643
        })
1,589✔
3644

1,589✔
3645
        if err != nil {
1,589✔
3646
                return nil, nil, err
×
3647
        }
×
3648

3649
        scope.RecordTimer(metrics.HistorySize, time.Duration(size))
1,589✔
3650

1,589✔
3651
        isLastPage := len(nextPageToken) == 0
1,589✔
3652
        if err := verifyHistoryIsComplete(
1,589✔
3653
                historyEvents,
1,589✔
3654
                firstEventID,
1,589✔
3655
                nextEventID-1,
1,589✔
3656
                isFirstPage,
1,589✔
3657
                isLastPage,
1,589✔
3658
                int(pageSize)); err != nil {
1,589✔
3659
                scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3660
                wh.GetLogger().Error("getHistory: incomplete history",
×
3661
                        tag.WorkflowDomainID(domainID),
×
3662
                        tag.WorkflowID(execution.GetWorkflowID()),
×
3663
                        tag.WorkflowRunID(execution.GetRunID()),
×
3664
                        tag.Error(err))
×
3665
                return nil, nil, err
×
3666
        }
×
3667

3668
        if len(nextPageToken) == 0 && transientDecision != nil {
1,760✔
3669
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
171✔
3670
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3671
                        wh.GetLogger().Error("getHistory error",
×
3672
                                tag.WorkflowDomainID(domainID),
×
3673
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3674
                                tag.WorkflowRunID(execution.GetRunID()),
×
3675
                                tag.Error(err))
×
3676
                }
×
3677
                // Append the transient decision events once we are done enumerating everything from the events table
3678
                historyEvents = append(historyEvents, transientDecision.ScheduledEvent, transientDecision.StartedEvent)
171✔
3679
        }
3680

3681
        executionHistory := &types.History{}
1,589✔
3682
        executionHistory.Events = historyEvents
1,589✔
3683
        return executionHistory, nextPageToken, nil
1,589✔
3684
}
3685

3686
func (wh *WorkflowHandler) validateTransientDecisionEvents(
3687
        expectedNextEventID int64,
3688
        decision *types.TransientDecisionInfo,
3689
) error {
173✔
3690

173✔
3691
        if decision.ScheduledEvent.ID == expectedNextEventID &&
173✔
3692
                decision.StartedEvent.ID == expectedNextEventID+1 {
346✔
3693
                return nil
173✔
3694
        }
173✔
3695

3696
        return fmt.Errorf(
×
3697
                "invalid transient decision: "+
×
3698
                        "expectedScheduledEventID=%v expectedStartedEventID=%v but have scheduledEventID=%v startedEventID=%v",
×
3699
                expectedNextEventID,
×
3700
                expectedNextEventID+1,
×
3701
                decision.ScheduledEvent.ID,
×
3702
                decision.StartedEvent.ID)
×
3703
}
3704

3705
func (wh *WorkflowHandler) validateTaskList(t *types.TaskList, scope metrics.Scope, domain string) error {
2,716✔
3706
        if t == nil || t.GetName() == "" {
2,717✔
3707
                return validate.ErrTaskListNotSet
1✔
3708
        }
1✔
3709

3710
        if !common.IsValidIDLength(
2,715✔
3711
                t.GetName(),
2,715✔
3712
                scope,
2,715✔
3713
                wh.config.MaxIDLengthWarnLimit(),
2,715✔
3714
                wh.config.TaskListNameMaxLength(domain),
2,715✔
3715
                metrics.CadenceErrTaskListNameExceededWarnLimit,
2,715✔
3716
                domain,
2,715✔
3717
                wh.GetLogger(),
2,715✔
3718
                tag.IDTypeTaskListName) {
2,715✔
3719
                return validate.ErrTaskListTooLong
×
3720
        }
×
3721
        return nil
2,715✔
3722
}
3723

3724
func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
3725
        ctx context.Context,
3726
        scope metrics.Scope,
3727
        domainID string,
3728
        matchingResp *types.MatchingPollForDecisionTaskResponse,
3729
        branchToken []byte,
3730
) (*types.PollForDecisionTaskResponse, error) {
1,208✔
3731

1,208✔
3732
        if matchingResp.WorkflowExecution == nil {
1,257✔
3733
                // this will happen if there is no decision task to be send to worker / caller
49✔
3734
                return &types.PollForDecisionTaskResponse{}, nil
49✔
3735
        }
49✔
3736

3737
        var history *types.History
1,159✔
3738
        var continuation []byte
1,159✔
3739
        var err error
1,159✔
3740

1,159✔
3741
        if matchingResp.GetStickyExecutionEnabled() && matchingResp.Query != nil {
1,165✔
3742
                // meaning sticky query, we should not return any events to worker
6✔
3743
                // since query task only check the current status
6✔
3744
                history = &types.History{
6✔
3745
                        Events: []*types.HistoryEvent{},
6✔
3746
                }
6✔
3747
        } else {
1,159✔
3748
                // here we have 3 cases:
1,153✔
3749
                // 1. sticky && non query task
1,153✔
3750
                // 2. non sticky &&  non query task
1,153✔
3751
                // 3. non sticky && query task
1,153✔
3752
                // for 1, partial history have to be send back
1,153✔
3753
                // for 2 and 3, full history have to be send back
1,153✔
3754

1,153✔
3755
                var persistenceToken []byte
1,153✔
3756

1,153✔
3757
                firstEventID := common.FirstEventID
1,153✔
3758
                nextEventID := matchingResp.GetNextEventID()
1,153✔
3759
                if matchingResp.GetStickyExecutionEnabled() {
1,249✔
3760
                        firstEventID = matchingResp.GetPreviousStartedEventID() + 1
96✔
3761
                }
96✔
3762
                domainName, dErr := wh.GetDomainCache().GetDomainName(domainID)
1,153✔
3763
                if dErr != nil {
1,153✔
3764
                        return nil, dErr
×
3765
                }
×
3766
                scope = scope.Tagged(metrics.DomainTag(domainName))
1,153✔
3767
                history, persistenceToken, err = wh.getHistory(
1,153✔
3768
                        ctx,
1,153✔
3769
                        scope,
1,153✔
3770
                        domainID,
1,153✔
3771
                        domainName,
1,153✔
3772
                        *matchingResp.WorkflowExecution,
1,153✔
3773
                        firstEventID,
1,153✔
3774
                        nextEventID,
1,153✔
3775
                        int32(wh.config.HistoryMaxPageSize(domainName)),
1,153✔
3776
                        nil,
1,153✔
3777
                        matchingResp.DecisionInfo,
1,153✔
3778
                        branchToken,
1,153✔
3779
                )
1,153✔
3780
                if err != nil {
1,153✔
3781
                        return nil, err
×
3782
                }
×
3783

3784
                if len(persistenceToken) != 0 {
1,153✔
3785
                        continuation, err = serializeHistoryToken(&getHistoryContinuationToken{
×
3786
                                RunID:             matchingResp.WorkflowExecution.GetRunID(),
×
3787
                                FirstEventID:      firstEventID,
×
3788
                                NextEventID:       nextEventID,
×
3789
                                PersistenceToken:  persistenceToken,
×
3790
                                TransientDecision: matchingResp.DecisionInfo,
×
3791
                                BranchToken:       branchToken,
×
3792
                        })
×
3793
                        if err != nil {
×
3794
                                return nil, err
×
3795
                        }
×
3796
                }
3797
        }
3798

3799
        resp := &types.PollForDecisionTaskResponse{
1,159✔
3800
                TaskToken:                 matchingResp.TaskToken,
1,159✔
3801
                WorkflowExecution:         matchingResp.WorkflowExecution,
1,159✔
3802
                WorkflowType:              matchingResp.WorkflowType,
1,159✔
3803
                PreviousStartedEventID:    matchingResp.PreviousStartedEventID,
1,159✔
3804
                StartedEventID:            matchingResp.StartedEventID, // this field is not set for query tasks as there's no decision task started event
1,159✔
3805
                Query:                     matchingResp.Query,
1,159✔
3806
                BacklogCountHint:          matchingResp.BacklogCountHint,
1,159✔
3807
                Attempt:                   matchingResp.Attempt,
1,159✔
3808
                History:                   history,
1,159✔
3809
                NextPageToken:             continuation,
1,159✔
3810
                WorkflowExecutionTaskList: matchingResp.WorkflowExecutionTaskList,
1,159✔
3811
                ScheduledTimestamp:        matchingResp.ScheduledTimestamp,
1,159✔
3812
                StartedTimestamp:          matchingResp.StartedTimestamp,
1,159✔
3813
                Queries:                   matchingResp.Queries,
1,159✔
3814
                NextEventID:               matchingResp.NextEventID,
1,159✔
3815
                TotalHistoryBytes:         matchingResp.TotalHistoryBytes,
1,159✔
3816
        }
1,159✔
3817

1,159✔
3818
        return resp, nil
1,159✔
3819
}
3820

3821
func verifyHistoryIsComplete(
3822
        events []*types.HistoryEvent,
3823
        expectedFirstEventID int64,
3824
        expectedLastEventID int64,
3825
        isFirstPage bool,
3826
        isLastPage bool,
3827
        pageSize int,
3828
) error {
1,608✔
3829

1,608✔
3830
        nEvents := len(events)
1,608✔
3831
        if nEvents == 0 {
1,620✔
3832
                if isLastPage {
24✔
3833
                        // we seem to be returning a non-nil pageToken on the lastPage which
12✔
3834
                        // in turn cases the client to call getHistory again - only to find
12✔
3835
                        // there are no more events to consume - bail out if this is the case here
12✔
3836
                        return nil
12✔
3837
                }
12✔
3838
                return fmt.Errorf("invalid history: contains zero events")
×
3839
        }
3840

3841
        firstEventID := events[0].ID
1,596✔
3842
        lastEventID := events[nEvents-1].ID
1,596✔
3843

1,596✔
3844
        if !isFirstPage { // atleast one page of history has been read previously
1,632✔
3845
                if firstEventID <= expectedFirstEventID {
36✔
3846
                        // not first page and no events have been read in the previous pages - not possible
×
3847
                        return &types.InternalServiceError{
×
3848
                                Message: fmt.Sprintf(
×
3849
                                        "invalid history: expected first eventID to be > %v but got %v", expectedFirstEventID, firstEventID),
×
3850
                        }
×
3851
                }
×
3852
                expectedFirstEventID = firstEventID
36✔
3853
        }
3854

3855
        if !isLastPage {
1,645✔
3856
                // estimate lastEventID based on pageSize. This is a lower bound
49✔
3857
                // since the persistence layer counts "batch of events" as a single page
49✔
3858
                expectedLastEventID = expectedFirstEventID + int64(pageSize) - 1
49✔
3859
        }
49✔
3860

3861
        nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1
1,596✔
3862

1,596✔
3863
        if firstEventID == expectedFirstEventID &&
1,596✔
3864
                ((isLastPage && lastEventID == expectedLastEventID && int64(nEvents) == nExpectedEvents) ||
1,596✔
3865
                        (!isLastPage && lastEventID >= expectedLastEventID && int64(nEvents) >= nExpectedEvents)) {
3,180✔
3866
                return nil
1,584✔
3867
        }
1,584✔
3868

3869
        return &types.InternalServiceError{
12✔
3870
                Message: fmt.Sprintf(
12✔
3871
                        "incomplete history: "+
12✔
3872
                                "expected events [%v-%v] but got events [%v-%v] of length %v:"+
12✔
3873
                                "isFirstPage=%v,isLastPage=%v,pageSize=%v",
12✔
3874
                        expectedFirstEventID,
12✔
3875
                        expectedLastEventID,
12✔
3876
                        firstEventID,
12✔
3877
                        lastEventID,
12✔
3878
                        nEvents,
12✔
3879
                        isFirstPage,
12✔
3880
                        isLastPage,
12✔
3881
                        pageSize),
12✔
3882
        }
12✔
3883
}
3884

3885
func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) {
44✔
3886
        token := &getHistoryContinuationToken{}
44✔
3887
        err := json.Unmarshal(bytes, token)
44✔
3888
        return token, err
44✔
3889
}
44✔
3890

3891
func serializeHistoryToken(token *getHistoryContinuationToken) ([]byte, error) {
437✔
3892
        if token == nil {
832✔
3893
                return nil, nil
395✔
3894
        }
395✔
3895

3896
        bytes, err := json.Marshal(token)
42✔
3897
        return bytes, err
42✔
3898
}
3899

3900
func isFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3901
        return updateRequest.ActiveClusterName != nil
9✔
3902
}
9✔
3903

3904
func isGraceFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3905
        return updateRequest.FailoverTimeoutInSeconds != nil
9✔
3906
}
9✔
3907

3908
func (wh *WorkflowHandler) checkOngoingFailover(
3909
        ctx context.Context,
3910
        domainName *string,
3911
) error {
1✔
3912

1✔
3913
        enabledClusters := wh.GetClusterMetadata().GetEnabledClusterInfo()
1✔
3914
        respChan := make(chan *types.DescribeDomainResponse, len(enabledClusters))
1✔
3915

1✔
3916
        g := &errgroup.Group{}
1✔
3917
        for clusterName := range enabledClusters {
3✔
3918
                frontendClient := wh.GetRemoteFrontendClient(clusterName)
2✔
3919
                g.Go(func() (e error) {
4✔
3920
                        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &e) }()
4✔
3921

3922
                        resp, _ := frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{Name: domainName})
2✔
3923
                        respChan <- resp
2✔
3924
                        return nil
2✔
3925
                })
3926
        }
3927
        g.Wait()
1✔
3928
        close(respChan)
1✔
3929

1✔
3930
        var failoverVersion *int64
1✔
3931
        for resp := range respChan {
3✔
3932
                if resp == nil {
2✔
3933
                        return &types.InternalServiceError{
×
3934
                                Message: "Failed to verify failover version from all clusters",
×
3935
                        }
×
3936
                }
×
3937
                if failoverVersion == nil {
3✔
3938
                        failoverVersion = &resp.FailoverVersion
1✔
3939
                }
1✔
3940
                if *failoverVersion != resp.GetFailoverVersion() {
2✔
3941
                        return &types.BadRequestError{
×
3942
                                Message: "Concurrent failover is not allow.",
×
3943
                        }
×
3944
                }
×
3945
        }
3946
        return nil
1✔
3947
}
3948

3949
func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *types.GetWorkflowExecutionHistoryRequest, domainID string) bool {
446✔
3950
        if request.GetExecution() == nil || request.GetExecution().GetRunID() == "" {
481✔
3951
                return false
35✔
3952
        }
35✔
3953
        getMutableStateRequest := &types.GetMutableStateRequest{
411✔
3954
                DomainUUID: domainID,
411✔
3955
                Execution:  request.Execution,
411✔
3956
        }
411✔
3957
        _, err := wh.GetHistoryClient().GetMutableState(ctx, getMutableStateRequest)
411✔
3958
        if err == nil {
798✔
3959
                return false
387✔
3960
        }
387✔
3961
        switch err.(type) {
24✔
3962
        case *types.EntityNotExistsError:
23✔
3963
                // the only case in which history is assumed to be archived is if getting mutable state returns entity not found error
23✔
3964
                return true
23✔
3965
        }
3966
        return false
1✔
3967
}
3968

3969
func (wh *WorkflowHandler) getArchivedHistory(
3970
        ctx context.Context,
3971
        request *types.GetWorkflowExecutionHistoryRequest,
3972
        domainID string,
3973
) (*types.GetWorkflowExecutionHistoryResponse, error) {
26✔
3974
        entry, err := wh.GetDomainCache().GetDomainByID(domainID)
26✔
3975
        if err != nil {
27✔
3976
                return nil, err
1✔
3977
        }
1✔
3978

3979
        URIString := entry.GetConfig().HistoryArchivalURI
25✔
3980
        if URIString == "" {
26✔
3981
                // if URI is empty, it means the domain has never enabled for archival.
1✔
3982
                // the error is not "workflow has passed retention period", because
1✔
3983
                // we have no way to tell if the requested workflow exists or not.
1✔
3984
                return nil, validate.ErrHistoryNotFound
1✔
3985
        }
1✔
3986

3987
        URI, err := archiver.NewURI(URIString)
24✔
3988
        if err != nil {
25✔
3989
                return nil, err
1✔
3990
        }
1✔
3991

3992
        historyArchiver, err := wh.GetArchiverProvider().GetHistoryArchiver(URI.Scheme(), service.Frontend)
23✔
3993
        if err != nil {
23✔
3994
                return nil, err
×
3995
        }
×
3996

3997
        resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{
23✔
3998
                DomainID:      domainID,
23✔
3999
                WorkflowID:    request.GetExecution().GetWorkflowID(),
23✔
4000
                RunID:         request.GetExecution().GetRunID(),
23✔
4001
                NextPageToken: request.GetNextPageToken(),
23✔
4002
                PageSize:      int(request.GetMaximumPageSize()),
23✔
4003
        })
23✔
4004
        if err != nil {
24✔
4005
                return nil, err
1✔
4006
        }
1✔
4007

4008
        history := &types.History{}
22✔
4009
        for _, batch := range resp.HistoryBatches {
279✔
4010
                history.Events = append(history.Events, batch.Events...)
257✔
4011
        }
257✔
4012
        return &types.GetWorkflowExecutionHistoryResponse{
22✔
4013
                History:       history,
22✔
4014
                NextPageToken: resp.NextPageToken,
22✔
4015
                Archived:      true,
22✔
4016
        }, nil
22✔
4017
}
4018

4019
func (wh *WorkflowHandler) convertIndexedKeyToThrift(keys map[string]interface{}) map[string]types.IndexedValueType {
3✔
4020
        converted := make(map[string]types.IndexedValueType)
3✔
4021
        for k, v := range keys {
51✔
4022
                converted[k] = common.ConvertIndexedValueTypeToInternalType(v, wh.GetLogger())
48✔
4023
        }
48✔
4024
        return converted
2✔
4025
}
4026

4027
func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain string) bool {
307✔
4028
        return common.IsAdvancedVisibilityReadingEnabled(wh.config.EnableReadVisibilityFromES(domain), wh.config.IsAdvancedVisConfigExist) &&
307✔
4029
                pageSize > int32(wh.config.ESIndexMaxResultWindow())
307✔
4030
}
307✔
4031

4032
// GetClusterInfo return information about cadence deployment
4033
func (wh *WorkflowHandler) GetClusterInfo(
4034
        ctx context.Context,
4035
) (resp *types.ClusterInfo, err error) {
×
4036
        return &types.ClusterInfo{
×
4037
                SupportedClientVersions: &types.SupportedClientVersions{
×
4038
                        GoSdk:   client.SupportedGoSDKVersion,
×
4039
                        JavaSdk: client.SupportedJavaSDKVersion,
×
4040
                },
×
4041
        }, nil
×
4042
}
×
4043

4044
func checkFailOverPermission(config *config.Config, domainName string) error {
2✔
4045
        if config.Lockdown(domainName) {
3✔
4046
                return validate.ErrDomainInLockdown
1✔
4047
        }
1✔
4048
        return nil
1✔
4049
}
4050

4051
type domainWrapper struct {
4052
        domain string
4053
}
4054

4055
func (d domainWrapper) GetDomain() string {
1,755✔
4056
        return d.domain
1,755✔
4057
}
1,755✔
4058

4059
func (hs HealthStatus) String() string {
2✔
4060
        switch hs {
2✔
4061
        case HealthStatusOK:
1✔
4062
                return "OK"
1✔
4063
        case HealthStatusWarmingUp:
1✔
4064
                return "WarmingUp"
1✔
4065
        case HealthStatusShuttingDown:
×
4066
                return "ShuttingDown"
×
4067
        default:
×
4068
                return "unknown"
×
4069
        }
4070
}
4071

4072
func getDomainWfIDRunIDTags(
4073
        domainName string,
4074
        wf *types.WorkflowExecution,
4075
) []tag.Tag {
1,478✔
4076
        tags := []tag.Tag{tag.WorkflowDomainName(domainName)}
1,478✔
4077
        if wf == nil {
2,956✔
4078
                return tags
1,478✔
4079
        }
1,478✔
4080
        return append(
×
4081
                tags,
×
4082
                tag.WorkflowID(wf.GetWorkflowID()),
×
4083
                tag.WorkflowRunID(wf.GetRunID()),
×
4084
        )
×
4085
}
4086

4087
func checkRequiredDomainDataKVs(requiredDomainDataKeys map[string]interface{}, domainData map[string]string) error {
48✔
4088
        // check requiredDomainDataKeys
48✔
4089
        for k := range requiredDomainDataKeys {
49✔
4090
                _, ok := domainData[k]
1✔
4091
                if !ok {
2✔
4092
                        return fmt.Errorf("domain data error, missing required key %v . All required keys: %v", k, requiredDomainDataKeys)
1✔
4093
                }
1✔
4094
        }
4095
        return nil
47✔
4096
}
4097

4098
// Some error types are introduced later that some clients might not support
4099
// To make them backward compatible, we continue returning the legacy error types
4100
// for older clients
4101
func (wh *WorkflowHandler) normalizeVersionedErrors(ctx context.Context, err error) error {
66✔
4102
        switch err.(type) {
66✔
4103
        case *types.WorkflowExecutionAlreadyCompletedError:
31✔
4104
                call := yarpc.CallFromContext(ctx)
31✔
4105
                clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
31✔
4106
                clientImpl := call.Header(common.ClientImplHeaderName)
31✔
4107
                featureFlags := client.GetFeatureFlagsFromHeader(call)
31✔
4108

31✔
4109
                vErr := wh.versionChecker.SupportsWorkflowAlreadyCompletedError(clientImpl, clientFeatureVersion, featureFlags)
31✔
4110
                if vErr == nil {
34✔
4111
                        return err
3✔
4112
                }
3✔
4113
                return &types.EntityNotExistsError{Message: "Workflow execution already completed."}
28✔
4114
        default:
35✔
4115
                return err
35✔
4116
        }
4117
}
4118
func constructRestartWorkflowRequest(w *types.WorkflowExecutionStartedEventAttributes, domain string, identity string, workflowID string) *types.StartWorkflowExecutionRequest {
1✔
4119

1✔
4120
        startRequest := &types.StartWorkflowExecutionRequest{
1✔
4121
                RequestID:  uuid.New().String(),
1✔
4122
                Domain:     domain,
1✔
4123
                WorkflowID: workflowID,
1✔
4124
                WorkflowType: &types.WorkflowType{
1✔
4125
                        Name: w.WorkflowType.Name,
1✔
4126
                },
1✔
4127
                TaskList: &types.TaskList{
1✔
4128
                        Name: w.TaskList.Name,
1✔
4129
                },
1✔
4130
                Input:                               w.Input,
1✔
4131
                ExecutionStartToCloseTimeoutSeconds: w.ExecutionStartToCloseTimeoutSeconds,
1✔
4132
                TaskStartToCloseTimeoutSeconds:      w.TaskStartToCloseTimeoutSeconds,
1✔
4133
                Identity:                            identity,
1✔
4134
                WorkflowIDReusePolicy:               types.WorkflowIDReusePolicyTerminateIfRunning.Ptr(),
1✔
4135
        }
1✔
4136
        startRequest.CronSchedule = w.CronSchedule
1✔
4137
        startRequest.RetryPolicy = w.RetryPolicy
1✔
4138
        startRequest.DelayStartSeconds = w.FirstDecisionTaskBackoffSeconds
1✔
4139
        startRequest.Header = w.Header
1✔
4140
        startRequest.Memo = w.Memo
1✔
4141
        startRequest.SearchAttributes = w.SearchAttributes
1✔
4142

1✔
4143
        return startRequest
1✔
4144
}
1✔
4145

4146
func getMetricsScopeWithDomain(
4147
        scope int,
4148
        d domainGetter,
4149
        metricsClient metrics.Client,
4150
) metrics.Scope {
5,785✔
4151
        var metricsScope metrics.Scope
5,785✔
4152
        if d != nil {
11,570✔
4153
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainTag(d.GetDomain()))
5,785✔
4154
        } else {
5,785✔
4155
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainUnknownTag())
×
4156
        }
×
4157
        return metricsScope
5,785✔
4158
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc