• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018d80e7-5bee-4baa-80ad-40642f0b7456

07 Feb 2024 12:10AM UTC coverage: 62.642% (-0.04%) from 62.683%
018d80e7-5bee-4baa-80ad-40642f0b7456

Pull #5646

buildkite

bowenxia
log msg change
Pull Request #5646: Added 2 more tags in log for comparator to use.

0 of 66 new or added lines in 2 files covered. (0.0%)

99 existing lines in 20 files now uncovered.

92159 of 147120 relevant lines covered (62.64%)

2308.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.71
/service/frontend/api/handler.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package api
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "fmt"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/google/uuid"
31
        "go.uber.org/yarpc"
32
        "golang.org/x/sync/errgroup"
33

34
        "github.com/uber/cadence/.gen/go/shared"
35
        "github.com/uber/cadence/.gen/go/sqlblobs"
36
        "github.com/uber/cadence/common"
37
        "github.com/uber/cadence/common/archiver"
38
        "github.com/uber/cadence/common/backoff"
39
        "github.com/uber/cadence/common/cache"
40
        "github.com/uber/cadence/common/client"
41
        "github.com/uber/cadence/common/domain"
42
        "github.com/uber/cadence/common/elasticsearch/validator"
43
        "github.com/uber/cadence/common/log"
44
        "github.com/uber/cadence/common/log/tag"
45
        "github.com/uber/cadence/common/metrics"
46
        "github.com/uber/cadence/common/partition"
47
        "github.com/uber/cadence/common/persistence"
48
        persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
49
        "github.com/uber/cadence/common/resource"
50
        "github.com/uber/cadence/common/service"
51
        "github.com/uber/cadence/common/types"
52
        "github.com/uber/cadence/service/frontend/config"
53
        "github.com/uber/cadence/service/frontend/validate"
54
)
55

56
const (
57
        // HealthStatusOK is used when this node is healthy and rpc requests are allowed
58
        HealthStatusOK HealthStatus = iota + 1
59
        // HealthStatusWarmingUp is used when the rpc handler is warming up
60
        HealthStatusWarmingUp
61
        // HealthStatusShuttingDown is used when the rpc handler is shutting down
62
        HealthStatusShuttingDown
63
)
64

65
var _ Handler = (*WorkflowHandler)(nil)
66

67
type (
68
        // WorkflowHandler - Thrift handler interface for workflow service
69
        WorkflowHandler struct {
70
                resource.Resource
71

72
                shuttingDown              int32
73
                healthStatus              int32
74
                tokenSerializer           common.TaskTokenSerializer
75
                config                    *config.Config
76
                versionChecker            client.VersionChecker
77
                domainHandler             domain.Handler
78
                visibilityQueryValidator  *validator.VisibilityQueryValidator
79
                searchAttributesValidator *validator.SearchAttributesValidator
80
                throttleRetry             *backoff.ThrottleRetry
81
        }
82

83
        getHistoryContinuationToken struct {
84
                RunID             string
85
                FirstEventID      int64
86
                NextEventID       int64
87
                IsWorkflowRunning bool
88
                PersistenceToken  []byte
89
                TransientDecision *types.TransientDecisionInfo
90
                BranchToken       []byte
91
        }
92

93
        domainGetter interface {
94
                GetDomain() string
95
        }
96

97
        // HealthStatus is an enum that refers to the rpc handler health status
98
        HealthStatus int32
99
)
100

101
var (
102
        frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
103
)
104

105
// NewWorkflowHandler creates a thrift handler for the cadence service
106
func NewWorkflowHandler(
107
        resource resource.Resource,
108
        config *config.Config,
109
        versionChecker client.VersionChecker,
110
        domainHandler domain.Handler,
111
) *WorkflowHandler {
81✔
112
        return &WorkflowHandler{
81✔
113
                Resource:        resource,
81✔
114
                config:          config,
81✔
115
                healthStatus:    int32(HealthStatusWarmingUp),
81✔
116
                tokenSerializer: common.NewJSONTaskTokenSerializer(),
81✔
117
                versionChecker:  versionChecker,
81✔
118
                domainHandler:   domainHandler,
81✔
119
                visibilityQueryValidator: validator.NewQueryValidator(
81✔
120
                        config.ValidSearchAttributes,
81✔
121
                        config.EnableQueryAttributeValidation,
81✔
122
                ),
81✔
123
                searchAttributesValidator: validator.NewSearchAttributesValidator(
81✔
124
                        resource.GetLogger(),
81✔
125
                        config.EnableQueryAttributeValidation,
81✔
126
                        config.ValidSearchAttributes,
81✔
127
                        config.SearchAttributesNumberOfKeysLimit,
81✔
128
                        config.SearchAttributesSizeOfValueLimit,
81✔
129
                        config.SearchAttributesTotalSizeLimit,
81✔
130
                ),
81✔
131
                throttleRetry: backoff.NewThrottleRetry(
81✔
132
                        backoff.WithRetryPolicy(frontendServiceRetryPolicy),
81✔
133
                        backoff.WithRetryableError(common.IsServiceTransientError),
81✔
134
                ),
81✔
135
        }
81✔
136
}
81✔
137

138
// Start starts the handler
139
func (wh *WorkflowHandler) Start() {
15✔
140
        // TODO: Get warmup duration from config. Even better, run proactive checks such as probing downstream connections.
15✔
141
        const warmUpDuration = 30 * time.Second
15✔
142

15✔
143
        warmupTimer := time.NewTimer(warmUpDuration)
15✔
144
        go func() {
30✔
145
                <-warmupTimer.C
15✔
146
                wh.GetLogger().Warn("Service warmup duration has elapsed.")
15✔
147
                if atomic.CompareAndSwapInt32(&wh.healthStatus, int32(HealthStatusWarmingUp), int32(HealthStatusOK)) {
26✔
148
                        wh.GetLogger().Warn("Warmup time has elapsed. Service is healthy.")
11✔
149
                } else {
11✔
150
                        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
×
151
                        wh.GetLogger().Warn(fmt.Sprintf("Warmup time has elapsed. Service status is: %v", status.String()))
×
152
                }
×
153
        }()
154
}
155

156
// Stop stops the handler
157
func (wh *WorkflowHandler) Stop() {
15✔
158
        atomic.StoreInt32(&wh.shuttingDown, 1)
15✔
159
}
15✔
160

161
// UpdateHealthStatus sets the health status for this rpc handler.
162
// This health status will be used within the rpc health check handler
163
func (wh *WorkflowHandler) UpdateHealthStatus(status HealthStatus) {
16✔
164
        atomic.StoreInt32(&wh.healthStatus, int32(status))
16✔
165
}
16✔
166

167
func (wh *WorkflowHandler) isShuttingDown() bool {
6,456✔
168
        return atomic.LoadInt32(&wh.shuttingDown) != 0
6,456✔
169
}
6,456✔
170

171
// Health is for health check
172
func (wh *WorkflowHandler) Health(ctx context.Context) (*types.HealthStatus, error) {
2✔
173
        status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
2✔
174
        msg := status.String()
2✔
175

2✔
176
        if status != HealthStatusOK {
3✔
177
                wh.GetLogger().Warn(fmt.Sprintf("Service status is: %v", msg))
1✔
178
        }
1✔
179

180
        return &types.HealthStatus{
2✔
181
                Ok:  status == HealthStatusOK,
2✔
182
                Msg: msg,
2✔
183
        }, nil
2✔
184
}
185

186
// RegisterDomain creates a new domain which can be used as a container for all resources.  Domain is a top level
187
// entity within Cadence, used as a container for all resources like workflow executions, tasklists, etc.  Domain
188
// acts as a sandbox and provides isolation for all resources within the domain.  All resources belongs to exactly one
189
// domain.
190
func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *types.RegisterDomainRequest) (retError error) {
45✔
191
        if wh.isShuttingDown() {
45✔
192
                return validate.ErrShuttingDown
×
193
        }
×
194

195
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
45✔
196
                return err
×
197
        }
×
198

199
        if registerRequest == nil {
45✔
200
                return validate.ErrRequestNotSet
×
201
        }
×
202

203
        if registerRequest.GetWorkflowExecutionRetentionPeriodInDays() > int32(wh.config.DomainConfig.MaxRetentionDays()) {
45✔
204
                return validate.ErrInvalidRetention
×
205
        }
×
206

207
        if err := validate.CheckPermission(wh.config, registerRequest.SecurityToken); err != nil {
45✔
208
                return err
×
209
        }
×
210

211
        if err := checkRequiredDomainDataKVs(wh.config.DomainConfig.RequiredDomainDataKeys(), registerRequest.GetData()); err != nil {
46✔
212
                return err
1✔
213
        }
1✔
214

215
        if registerRequest.GetName() == "" {
44✔
216
                return validate.ErrDomainNotSet
×
217
        }
×
218

219
        return wh.domainHandler.RegisterDomain(ctx, registerRequest)
44✔
220
}
221

222
// ListDomains returns the information and configuration for a registered domain.
223
func (wh *WorkflowHandler) ListDomains(
224
        ctx context.Context,
225
        listRequest *types.ListDomainsRequest,
226
) (response *types.ListDomainsResponse, retError error) {
2✔
227
        if wh.isShuttingDown() {
2✔
228
                return nil, validate.ErrShuttingDown
×
229
        }
×
230

231
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
232
                return nil, err
×
233
        }
×
234

235
        if listRequest == nil {
3✔
236
                return nil, validate.ErrRequestNotSet
1✔
237
        }
1✔
238

239
        return wh.domainHandler.ListDomains(ctx, listRequest)
1✔
240
}
241

242
// DescribeDomain returns the information and configuration for a registered domain.
243
func (wh *WorkflowHandler) DescribeDomain(
244
        ctx context.Context,
245
        describeRequest *types.DescribeDomainRequest,
246
) (response *types.DescribeDomainResponse, retError error) {
134✔
247
        if wh.isShuttingDown() {
134✔
248
                return nil, validate.ErrShuttingDown
×
249
        }
×
250

251
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
134✔
252
                return nil, err
×
253
        }
×
254

255
        if describeRequest == nil {
134✔
256
                return nil, validate.ErrRequestNotSet
×
257
        }
×
258

259
        if describeRequest.GetName() == "" && describeRequest.GetUUID() == "" {
134✔
260
                return nil, validate.ErrDomainNotSet
×
261
        }
×
262

263
        resp, err := wh.domainHandler.DescribeDomain(ctx, describeRequest)
134✔
264
        if err != nil {
134✔
265
                return nil, err
×
266
        }
×
267

268
        if resp.GetFailoverInfo() != nil && resp.GetFailoverInfo().GetFailoverExpireTimestamp() > 0 {
134✔
269
                // fetch ongoing failover info from history service
×
270
                failoverResp, err := wh.GetHistoryClient().GetFailoverInfo(ctx, &types.GetFailoverInfoRequest{
×
271
                        DomainID: resp.GetDomainInfo().UUID,
×
272
                })
×
273
                if err != nil {
×
274
                        // despite the error from history, return describe domain response
×
275
                        wh.GetLogger().Error(
×
276
                                fmt.Sprintf("Failed to get failover info for domain %s", resp.DomainInfo.GetName()),
×
277
                                tag.Error(err),
×
278
                        )
×
279
                        return resp, nil
×
280
                }
×
281
                resp.FailoverInfo.CompletedShardCount = failoverResp.GetCompletedShardCount()
×
282
                resp.FailoverInfo.PendingShards = failoverResp.GetPendingShards()
×
283
        }
284
        return resp, nil
134✔
285
}
286

287
// UpdateDomain is used to update the information and configuration for a registered domain.
288
func (wh *WorkflowHandler) UpdateDomain(
289
        ctx context.Context,
290
        updateRequest *types.UpdateDomainRequest,
291
) (resp *types.UpdateDomainResponse, retError error) {
9✔
292
        domainName := ""
9✔
293
        if updateRequest != nil {
18✔
294
                domainName = updateRequest.GetName()
9✔
295
        }
9✔
296

297
        logger := wh.GetLogger().WithTags(
9✔
298
                tag.WorkflowDomainName(domainName),
9✔
299
                tag.OperationName("DomainUpdate"))
9✔
300

9✔
301
        if updateRequest == nil {
9✔
302
                logger.Error("Nil domain update request.",
×
303
                        tag.Error(validate.ErrRequestNotSet))
×
304
                return nil, validate.ErrRequestNotSet
×
305
        }
×
306

307
        isFailover := isFailoverRequest(updateRequest)
9✔
308
        isGraceFailover := isGraceFailoverRequest(updateRequest)
9✔
309
        logger.Info(fmt.Sprintf(
9✔
310
                "Domain Update requested. isFailover: %v, isGraceFailover: %v, Request: %#v.",
9✔
311
                isFailover,
9✔
312
                isGraceFailover,
9✔
313
                updateRequest))
9✔
314

9✔
315
        if wh.isShuttingDown() {
9✔
316
                logger.Error("Won't apply the domain update since workflowHandler is shutting down.",
×
317
                        tag.Error(validate.ErrShuttingDown))
×
318
                return nil, validate.ErrShuttingDown
×
319
        }
×
320

321
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
9✔
322
                logger.Error("Won't apply the domain update since client version is not supported.",
×
323
                        tag.Error(err))
×
324
                return nil, err
×
325
        }
×
326

327
        // don't require permission for failover request
328
        if isFailover {
11✔
329
                // reject the failover if the cluster is in lockdown
2✔
330
                if err := checkFailOverPermission(wh.config, updateRequest.GetName()); err != nil {
3✔
331
                        logger.Error("Domain failover request rejected since domain is in lockdown.",
1✔
332
                                tag.Error(err))
1✔
333
                        return nil, err
1✔
334
                }
1✔
335
        } else {
7✔
336
                if err := validate.CheckPermission(wh.config, updateRequest.SecurityToken); err != nil {
7✔
337
                        logger.Error("Domain update request rejected due to failing permissions.",
×
338
                                tag.Error(err))
×
339
                        return nil, err
×
340
                }
×
341
        }
342

343
        if isGraceFailover {
9✔
344
                if err := wh.checkOngoingFailover(
1✔
345
                        ctx,
1✔
346
                        &updateRequest.Name,
1✔
347
                ); err != nil {
1✔
348
                        logger.Error("Graceful domain failover request failed. Not able to check ongoing failovers.",
×
349
                                tag.Error(err))
×
350
                        return nil, err
×
351
                }
×
352
        }
353

354
        if updateRequest.GetName() == "" {
8✔
355
                logger.Error("Domain not set on request.",
×
356
                        tag.Error(validate.ErrDomainNotSet))
×
357
                return nil, validate.ErrDomainNotSet
×
358
        }
×
359
        // TODO: call remote clusters to verify domain data
360
        resp, err := wh.domainHandler.UpdateDomain(ctx, updateRequest)
8✔
361
        if err != nil {
10✔
362
                logger.Error("Domain update operation failed.",
2✔
363
                        tag.Error(err))
2✔
364
                return nil, err
2✔
365
        }
2✔
366
        logger.Info("Domain update operation succeeded.")
6✔
367
        return resp, nil
6✔
368
}
369

370
// DeprecateDomain us used to update status of a registered domain to DEPRECATED. Once the domain is deprecated
371
// it cannot be used to start new workflow executions.  Existing workflow executions will continue to run on
372
// deprecated domains.
373
func (wh *WorkflowHandler) DeprecateDomain(ctx context.Context, deprecateRequest *types.DeprecateDomainRequest) (retError error) {
×
374
        if wh.isShuttingDown() {
×
375
                return validate.ErrShuttingDown
×
376
        }
×
377

378
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
379
                return err
×
380
        }
×
381

382
        if deprecateRequest == nil {
×
383
                return validate.ErrRequestNotSet
×
384
        }
×
385

386
        if err := validate.CheckPermission(wh.config, deprecateRequest.SecurityToken); err != nil {
×
387
                return err
×
388
        }
×
389

390
        if deprecateRequest.GetName() == "" {
×
391
                return validate.ErrDomainNotSet
×
392
        }
×
393

394
        return wh.domainHandler.DeprecateDomain(ctx, deprecateRequest)
×
395
}
396

397
// PollForActivityTask - Poll for an activity task.
398
func (wh *WorkflowHandler) PollForActivityTask(
399
        ctx context.Context,
400
        pollRequest *types.PollForActivityTaskRequest,
401
) (resp *types.PollForActivityTaskResponse, retError error) {
710✔
402
        callTime := time.Now()
710✔
403

710✔
404
        if wh.isShuttingDown() {
710✔
405
                return nil, validate.ErrShuttingDown
×
406
        }
×
407

408
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
710✔
409
                return nil, err
×
410
        }
×
411

412
        if pollRequest == nil {
710✔
413
                return nil, validate.ErrRequestNotSet
×
414
        }
×
415

416
        domainName := pollRequest.GetDomain()
710✔
417
        if domainName == "" {
710✔
418
                return nil, validate.ErrDomainNotSet
×
419
        }
×
420

421
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForActivityTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
710✔
422
        wh.GetLogger().Debug("Received PollForActivityTask")
710✔
423
        if err := common.ValidateLongPollContextTimeout(
710✔
424
                ctx,
710✔
425
                "PollForActivityTask",
710✔
426
                wh.GetThrottledLogger(),
710✔
427
        ); err != nil {
712✔
428
                return nil, err
2✔
429
        }
2✔
430

431
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
708✔
432
        if !common.IsValidIDLength(
708✔
433
                domainName,
708✔
434
                scope,
708✔
435
                idLengthWarnLimit,
708✔
436
                wh.config.DomainNameMaxLength(domainName),
708✔
437
                metrics.CadenceErrDomainNameExceededWarnLimit,
708✔
438
                domainName,
708✔
439
                wh.GetLogger(),
708✔
440
                tag.IDTypeDomainName) {
708✔
441
                return nil, validate.ErrDomainTooLong
×
442
        }
×
443

444
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
708✔
445
                return nil, err
×
446
        }
×
447

448
        if !common.IsValidIDLength(
708✔
449
                pollRequest.GetIdentity(),
708✔
450
                scope,
708✔
451
                idLengthWarnLimit,
708✔
452
                wh.config.IdentityMaxLength(domainName),
708✔
453
                metrics.CadenceErrIdentityExceededWarnLimit,
708✔
454
                domainName,
708✔
455
                wh.GetLogger(),
708✔
456
                tag.IDTypeIdentity) {
708✔
457
                return nil, validate.ErrIdentityTooLong
×
458
        }
×
459

460
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
708✔
461
        if err != nil {
977✔
462
                return nil, err
269✔
463
        }
269✔
464

465
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
439✔
466
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
440✔
467
                return &types.PollForActivityTaskResponse{}, nil
1✔
468
        }
1✔
469
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
470
        // in this case, return an empty response
471
        if err := common.ValidateLongPollContextTimeout(
438✔
472
                ctx,
438✔
473
                "PollForActivityTask",
438✔
474
                wh.GetThrottledLogger(),
438✔
475
        ); err != nil {
438✔
476
                return &types.PollForActivityTaskResponse{}, nil
×
477
        }
×
478
        pollerID := uuid.New().String()
438✔
479
        op := func() error {
876✔
480
                resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{
438✔
481
                        DomainUUID:     domainID,
438✔
482
                        PollerID:       pollerID,
438✔
483
                        PollRequest:    pollRequest,
438✔
484
                        IsolationGroup: isolationGroup,
438✔
485
                })
438✔
486
                return err
438✔
487
        }
438✔
488

489
        err = wh.throttleRetry.Do(ctx, op)
438✔
490
        if err != nil {
504✔
491
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeActivity, pollRequest.TaskList, pollerID)
66✔
492
                if err != nil {
66✔
493
                        // For all other errors log an error and return it back to client.
×
494
                        ctxTimeout := "not-set"
×
495
                        ctxDeadline, ok := ctx.Deadline()
×
496
                        if ok {
×
497
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
498
                        }
×
499
                        wh.GetLogger().Error("PollForActivityTask failed.",
×
500
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
501
                                tag.Value(ctxTimeout),
×
502
                                tag.Error(err))
×
503
                        return nil, err
×
504
                }
505
        }
506
        return resp, nil
438✔
507
}
508

509
// PollForDecisionTask - Poll for a decision task.
510
func (wh *WorkflowHandler) PollForDecisionTask(
511
        ctx context.Context,
512
        pollRequest *types.PollForDecisionTaskRequest,
513
) (resp *types.PollForDecisionTaskResponse, retError error) {
1,487✔
514
        callTime := time.Now()
1,487✔
515

1,487✔
516
        if wh.isShuttingDown() {
1,487✔
517
                return nil, validate.ErrShuttingDown
×
518
        }
×
519

520
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1,487✔
521
                return nil, err
×
522
        }
×
523

524
        if pollRequest == nil {
1,487✔
525
                return nil, validate.ErrRequestNotSet
×
526
        }
×
527

528
        domainName := pollRequest.GetDomain()
1,487✔
529
        tags := getDomainWfIDRunIDTags(domainName, nil)
1,487✔
530

1,487✔
531
        if domainName == "" {
1,487✔
532
                return nil, validate.ErrDomainNotSet
×
533
        }
×
534

535
        scope := getMetricsScopeWithDomain(metrics.FrontendPollForDecisionTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1,487✔
536
        wh.GetLogger().Debug("Received PollForDecisionTask")
1,487✔
537
        if err := common.ValidateLongPollContextTimeout(
1,487✔
538
                ctx,
1,487✔
539
                "PollForDecisionTask",
1,487✔
540
                wh.GetThrottledLogger(),
1,487✔
541
        ); err != nil {
1,489✔
542
                return nil, err
2✔
543
        }
2✔
544

545
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
1,485✔
546
        if !common.IsValidIDLength(
1,485✔
547
                domainName,
1,485✔
548
                scope,
1,485✔
549
                idLengthWarnLimit,
1,485✔
550
                wh.config.DomainNameMaxLength(domainName),
1,485✔
551
                metrics.CadenceErrDomainNameExceededWarnLimit,
1,485✔
552
                domainName,
1,485✔
553
                wh.GetLogger(),
1,485✔
554
                tag.IDTypeDomainName) {
1,485✔
555
                return nil, validate.ErrDomainTooLong
×
556
        }
×
557

558
        if !common.IsValidIDLength(
1,485✔
559
                pollRequest.GetIdentity(),
1,485✔
560
                scope,
1,485✔
561
                idLengthWarnLimit,
1,485✔
562
                wh.config.IdentityMaxLength(domainName),
1,485✔
563
                metrics.CadenceErrIdentityExceededWarnLimit,
1,485✔
564
                domainName,
1,485✔
565
                wh.GetLogger(),
1,485✔
566
                tag.IDTypeIdentity) {
1,485✔
567
                return nil, validate.ErrIdentityTooLong
×
568
        }
×
569

570
        if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
1,485✔
571
                return nil, err
×
572
        }
×
573

574
        domainEntry, err := wh.GetDomainCache().GetDomain(domainName)
1,485✔
575
        if err != nil {
1,753✔
576
                return nil, err
268✔
577
        }
268✔
578
        domainID := domainEntry.GetInfo().ID
1,217✔
579

1,217✔
580
        wh.GetLogger().Debug("Poll for decision.", tag.WorkflowDomainName(domainName), tag.WorkflowDomainID(domainID))
1,217✔
581
        if err := wh.checkBadBinary(domainEntry, pollRequest.GetBinaryChecksum()); err != nil {
1,217✔
582
                return nil, err
×
583
        }
×
584

585
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
1,217✔
586
        if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
1,218✔
587
                return &types.PollForDecisionTaskResponse{}, nil
1✔
588
        }
1✔
589
        // it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
590
        // in this case, return an empty response
591
        if err := common.ValidateLongPollContextTimeout(
1,216✔
592
                ctx,
1,216✔
593
                "PollForDecisionTask",
1,216✔
594
                wh.GetThrottledLogger(),
1,216✔
595
        ); err != nil {
1,216✔
596
                return &types.PollForDecisionTaskResponse{}, nil
×
597
        }
×
598

599
        pollerID := uuid.New().String()
1,216✔
600
        var matchingResp *types.MatchingPollForDecisionTaskResponse
1,216✔
601
        op := func() error {
2,432✔
602
                matchingResp, err = wh.GetMatchingClient().PollForDecisionTask(ctx, &types.MatchingPollForDecisionTaskRequest{
1,216✔
603
                        DomainUUID:     domainID,
1,216✔
604
                        PollerID:       pollerID,
1,216✔
605
                        PollRequest:    pollRequest,
1,216✔
606
                        IsolationGroup: isolationGroup,
1,216✔
607
                })
1,216✔
608
                return err
1,216✔
609
        }
1,216✔
610

611
        err = wh.throttleRetry.Do(ctx, op)
1,216✔
612
        if err != nil {
1,282✔
613
                err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeDecision, pollRequest.TaskList, pollerID)
66✔
614
                if err != nil {
66✔
615
                        // For all other errors log an error and return it back to client.
×
616
                        ctxTimeout := "not-set"
×
617
                        ctxDeadline, ok := ctx.Deadline()
×
618
                        if ok {
×
619
                                ctxTimeout = ctxDeadline.Sub(callTime).String()
×
620
                        }
×
621
                        wh.GetLogger().Error("PollForDecisionTask failed.",
×
622
                                tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
×
623
                                tag.Value(ctxTimeout),
×
624
                                tag.Error(err))
×
625
                        return nil, err
×
626
                }
627

628
                // Must be cancellation error.  Does'nt matter what we return here.  Client already went away.
629
                return nil, nil
66✔
630
        }
631

632
        tags = append(tags, []tag.Tag{tag.WorkflowID(
1,150✔
633
                matchingResp.GetWorkflowExecution().GetWorkflowID()),
1,150✔
634
                tag.WorkflowRunID(matchingResp.GetWorkflowExecution().GetRunID())}...)
1,150✔
635
        resp, err = wh.createPollForDecisionTaskResponse(ctx, scope, domainID, matchingResp, matchingResp.GetBranchToken())
1,150✔
636
        if err != nil {
1,150✔
637
                return nil, err
×
638
        }
×
639
        return resp, nil
1,150✔
640
}
641

642
func (wh *WorkflowHandler) getIsolationGroup(ctx context.Context, domainName string) string {
2,862✔
643
        return partition.IsolationGroupFromContext(ctx)
2,862✔
644
}
2,862✔
645

646
func (wh *WorkflowHandler) getPartitionConfig(ctx context.Context, domainName string) map[string]string {
478✔
647
        return partition.ConfigFromContext(ctx)
478✔
648
}
478✔
649

650
func (wh *WorkflowHandler) isIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,206✔
651
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,208✔
652
                isDrained, err := wh.GetIsolationGroupState().IsDrained(ctx, domainName, isolationGroup)
2✔
653
                if err != nil {
2✔
654
                        wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
655
                        return true
×
656
                }
×
657
                return !isDrained
2✔
658
        }
659
        return true
1,204✔
660
}
661

662
func (wh *WorkflowHandler) waitUntilIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
1,656✔
663
        if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
1,658✔
664
                ticker := time.NewTicker(time.Second * 30)
2✔
665
                defer ticker.Stop()
2✔
666
                childCtx, cancel := common.CreateChildContext(ctx, 0.05)
2✔
667
                defer cancel()
2✔
668
                for {
4✔
669
                        isDrained, err := wh.GetIsolationGroupState().IsDrained(childCtx, domainName, isolationGroup)
2✔
670
                        if err != nil {
2✔
671
                                wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
×
672
                                return true
×
673
                        }
×
674
                        if !isDrained {
2✔
675
                                break
×
676
                        }
677
                        select {
2✔
678
                        case <-childCtx.Done():
2✔
679
                                return false
2✔
680
                        case <-ticker.C:
×
681
                        }
682
                }
683
        }
684
        return true
1,654✔
685
}
686

687
func (wh *WorkflowHandler) checkBadBinary(domainEntry *cache.DomainCacheEntry, binaryChecksum string) error {
1,217✔
688
        if domainEntry.GetConfig().BadBinaries.Binaries != nil {
2,433✔
689
                badBinaries := domainEntry.GetConfig().BadBinaries.Binaries
1,216✔
690
                _, ok := badBinaries[binaryChecksum]
1,216✔
691
                if ok {
1,216✔
692
                        wh.GetMetricsClient().IncCounter(metrics.FrontendPollForDecisionTaskScope, metrics.CadenceErrBadBinaryCounter)
×
693
                        return &types.BadRequestError{
×
694
                                Message: fmt.Sprintf("binary %v already marked as bad deployment", binaryChecksum),
×
695
                        }
×
696
                }
×
697
        }
698
        return nil
1,217✔
699
}
700

701
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error, domainID string, taskListType int32,
702
        taskList *types.TaskList, pollerID string) error {
132✔
703
        // First check if this err is due to context cancellation.  This means client connection to frontend is closed.
132✔
704
        if ctx.Err() == context.Canceled {
264✔
705
                // Our rpc stack does not propagates context cancellation to the other service.  Lets make an explicit
132✔
706
                // call to matching to notify this poller is gone to prevent any tasks being dispatched to zombie pollers.
132✔
707
                err = wh.GetMatchingClient().CancelOutstandingPoll(context.Background(), &types.CancelOutstandingPollRequest{
132✔
708
                        DomainUUID:   domainID,
132✔
709
                        TaskListType: common.Int32Ptr(taskListType),
132✔
710
                        TaskList:     taskList,
132✔
711
                        PollerID:     pollerID,
132✔
712
                })
132✔
713
                // We can not do much if this call fails.  Just log the error and move on
132✔
714
                if err != nil {
132✔
715
                        wh.GetLogger().Warn("Failed to cancel outstanding poller.",
×
716
                                tag.WorkflowTaskListName(taskList.GetName()), tag.Error(err))
×
717
                }
×
718

719
                // Clear error as we don't want to report context cancellation error to count against our SLA
720
                return nil
132✔
721
        }
722

723
        return err
×
724
}
725

726
// RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
727
func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
728
        ctx context.Context,
729
        heartbeatRequest *types.RecordActivityTaskHeartbeatRequest,
730
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
384✔
731
        if wh.isShuttingDown() {
384✔
732
                return nil, validate.ErrShuttingDown
×
733
        }
×
734

735
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
384✔
736
                return nil, err
×
737
        }
×
738

739
        if heartbeatRequest == nil {
385✔
740
                return nil, validate.ErrRequestNotSet
1✔
741
        }
1✔
742

743
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeat")
383✔
744
        if heartbeatRequest.TaskToken == nil {
384✔
745
                return nil, validate.ErrTaskTokenNotSet
1✔
746
        }
1✔
747
        taskToken, err := wh.tokenSerializer.Deserialize(heartbeatRequest.TaskToken)
382✔
748
        if err != nil {
382✔
749
                return nil, err
×
750
        }
×
751
        if taskToken.DomainID == "" {
382✔
752
                return nil, validate.ErrDomainNotSet
×
753
        }
×
754

755
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
382✔
756
        if err != nil {
382✔
757
                return nil, err
×
758
        }
×
759

760
        dw := domainWrapper{
382✔
761
                domain: domainName,
382✔
762
        }
382✔
763
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
382✔
764
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
382✔
765
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
382✔
766

382✔
767
        if err := common.CheckEventBlobSizeLimit(
382✔
768
                len(heartbeatRequest.Details),
382✔
769
                sizeLimitWarn,
382✔
770
                sizeLimitError,
382✔
771
                taskToken.DomainID,
382✔
772
                taskToken.WorkflowID,
382✔
773
                taskToken.RunID,
382✔
774
                scope,
382✔
775
                wh.GetThrottledLogger(),
382✔
776
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeat"),
382✔
777
        ); err != nil {
382✔
778
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
779
                failRequest := &types.RespondActivityTaskFailedRequest{
×
780
                        TaskToken: heartbeatRequest.TaskToken,
×
781
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
782
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
783
                        Identity:  heartbeatRequest.Identity,
×
784
                }
×
785
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
786
                        DomainUUID:    taskToken.DomainID,
×
787
                        FailedRequest: failRequest,
×
788
                })
×
789
                if err != nil {
×
790
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
791
                }
×
792
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
793
        } else {
382✔
794
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
382✔
795
                        DomainUUID:       taskToken.DomainID,
382✔
796
                        HeartbeatRequest: heartbeatRequest,
382✔
797
                })
382✔
798
                if err != nil {
382✔
799
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
800
                }
×
801
        }
802

803
        return resp, nil
382✔
804
}
805

806
// RecordActivityTaskHeartbeatByID - Record Activity Task Heart beat.
807
func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
808
        ctx context.Context,
809
        heartbeatRequest *types.RecordActivityTaskHeartbeatByIDRequest,
810
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
3✔
811
        if wh.isShuttingDown() {
3✔
812
                return nil, validate.ErrShuttingDown
×
813
        }
×
814

815
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
816
                return nil, err
×
817
        }
×
818

819
        if heartbeatRequest == nil {
4✔
820
                return nil, validate.ErrRequestNotSet
1✔
821
        }
1✔
822

823
        domainName := heartbeatRequest.GetDomain()
2✔
824
        if domainName == "" {
3✔
825
                return nil, validate.ErrDomainNotSet
1✔
826
        }
1✔
827

828
        wh.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID")
1✔
829
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
1✔
830
        if err != nil {
1✔
831
                return nil, err
×
832
        }
×
833
        workflowID := heartbeatRequest.GetWorkflowID()
1✔
834
        runID := heartbeatRequest.GetRunID() // runID is optional so can be empty
1✔
835
        activityID := heartbeatRequest.GetActivityID()
1✔
836

1✔
837
        if domainID == "" {
1✔
838
                return nil, validate.ErrDomainNotSet
×
839
        }
×
840
        if workflowID == "" {
1✔
841
                return nil, validate.ErrWorkflowIDNotSet
×
842
        }
×
843
        if activityID == "" {
1✔
844
                return nil, validate.ErrActivityIDNotSet
×
845
        }
×
846

847
        taskToken := &common.TaskToken{
1✔
848
                DomainID:   domainID,
1✔
849
                RunID:      runID,
1✔
850
                WorkflowID: workflowID,
1✔
851
                ScheduleID: common.EmptyEventID,
1✔
852
                ActivityID: activityID,
1✔
853
        }
1✔
854
        token, err := wh.tokenSerializer.Serialize(taskToken)
1✔
855
        if err != nil {
1✔
856
                return nil, err
×
857
        }
×
858

859
        scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatByIDScope, heartbeatRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
1✔
860
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
1✔
861
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
1✔
862

1✔
863
        if err := common.CheckEventBlobSizeLimit(
1✔
864
                len(heartbeatRequest.Details),
1✔
865
                sizeLimitWarn,
1✔
866
                sizeLimitError,
1✔
867
                taskToken.DomainID,
1✔
868
                taskToken.WorkflowID,
1✔
869
                taskToken.RunID,
1✔
870
                scope,
1✔
871
                wh.GetThrottledLogger(),
1✔
872
                tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeatByID"),
1✔
873
        ); err != nil {
1✔
874
                // heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
×
875
                failRequest := &types.RespondActivityTaskFailedRequest{
×
876
                        TaskToken: token,
×
877
                        Reason:    common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
×
878
                        Details:   heartbeatRequest.Details[0:sizeLimitError],
×
879
                        Identity:  heartbeatRequest.Identity,
×
880
                }
×
881
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
882
                        DomainUUID:    taskToken.DomainID,
×
883
                        FailedRequest: failRequest,
×
884
                })
×
885
                if err != nil {
×
886
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
887
                }
×
888
                resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
×
889
        } else {
1✔
890
                req := &types.RecordActivityTaskHeartbeatRequest{
1✔
891
                        TaskToken: token,
1✔
892
                        Details:   heartbeatRequest.Details,
1✔
893
                        Identity:  heartbeatRequest.Identity,
1✔
894
                }
1✔
895

1✔
896
                resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
1✔
897
                        DomainUUID:       taskToken.DomainID,
1✔
898
                        HeartbeatRequest: req,
1✔
899
                })
1✔
900
                if err != nil {
1✔
901
                        return nil, wh.normalizeVersionedErrors(ctx, err)
×
902
                }
×
903
        }
904

905
        return resp, nil
1✔
906
}
907

908
// RespondActivityTaskCompleted - response to an activity task
909
func (wh *WorkflowHandler) RespondActivityTaskCompleted(
910
        ctx context.Context,
911
        completeRequest *types.RespondActivityTaskCompletedRequest,
912
) (retError error) {
247✔
913
        if wh.isShuttingDown() {
247✔
914
                return validate.ErrShuttingDown
×
915
        }
×
916

917
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
247✔
918
                return err
×
919
        }
×
920

921
        if completeRequest == nil {
247✔
922
                return validate.ErrRequestNotSet
×
923
        }
×
924

925
        if completeRequest.TaskToken == nil {
247✔
926
                return validate.ErrTaskTokenNotSet
×
927
        }
×
928
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
247✔
929
        if err != nil {
247✔
930
                return err
×
931
        }
×
932
        if taskToken.DomainID == "" {
247✔
933
                return validate.ErrDomainNotSet
×
934
        }
×
935

936
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
247✔
937
        if err != nil {
247✔
938
                return err
×
939
        }
×
940

941
        dw := domainWrapper{
247✔
942
                domain: domainName,
247✔
943
        }
247✔
944
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
247✔
945
        if !common.IsValidIDLength(
247✔
946
                completeRequest.GetIdentity(),
247✔
947
                scope,
247✔
948
                wh.config.MaxIDLengthWarnLimit(),
247✔
949
                wh.config.IdentityMaxLength(domainName),
247✔
950
                metrics.CadenceErrIdentityExceededWarnLimit,
247✔
951
                domainName,
247✔
952
                wh.GetLogger(),
247✔
953
                tag.IDTypeIdentity) {
247✔
954
                return validate.ErrIdentityTooLong
×
955
        }
×
956

957
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
247✔
958
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
247✔
959

247✔
960
        if err := common.CheckEventBlobSizeLimit(
247✔
961
                len(completeRequest.Result),
247✔
962
                sizeLimitWarn,
247✔
963
                sizeLimitError,
247✔
964
                taskToken.DomainID,
247✔
965
                taskToken.WorkflowID,
247✔
966
                taskToken.RunID,
247✔
967
                scope,
247✔
968
                wh.GetThrottledLogger(),
247✔
969
                tag.BlobSizeViolationOperation("RespondActivityTaskCompleted"),
247✔
970
        ); err != nil {
247✔
971
                // result exceeds blob size limit, we would record it as failure
×
972
                failRequest := &types.RespondActivityTaskFailedRequest{
×
973
                        TaskToken: completeRequest.TaskToken,
×
974
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
975
                        Details:   completeRequest.Result[0:sizeLimitError],
×
976
                        Identity:  completeRequest.Identity,
×
977
                }
×
978
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
979
                        DomainUUID:    taskToken.DomainID,
×
980
                        FailedRequest: failRequest,
×
981
                })
×
982
                if err != nil {
×
983
                        return wh.normalizeVersionedErrors(ctx, err)
×
984
                }
×
985
        } else {
247✔
986
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
247✔
987
                        DomainUUID:      taskToken.DomainID,
247✔
988
                        CompleteRequest: completeRequest,
247✔
989
                })
247✔
990
                if err != nil {
292✔
991
                        return wh.normalizeVersionedErrors(ctx, err)
45✔
992
                }
45✔
993
        }
994

995
        return nil
202✔
996
}
997

998
// RespondActivityTaskCompletedByID - response to an activity task
999
func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
1000
        ctx context.Context,
1001
        completeRequest *types.RespondActivityTaskCompletedByIDRequest,
1002
) (retError error) {
76✔
1003
        if wh.isShuttingDown() {
76✔
1004
                return validate.ErrShuttingDown
×
1005
        }
×
1006

1007
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
76✔
1008
                return err
×
1009
        }
×
1010

1011
        if completeRequest == nil {
76✔
1012
                return validate.ErrRequestNotSet
×
1013
        }
×
1014

1015
        domainName := completeRequest.GetDomain()
76✔
1016
        if domainName == "" {
76✔
1017
                return validate.ErrDomainNotSet
×
1018
        }
×
1019
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
76✔
1020
        if err != nil {
76✔
1021
                return err
×
1022
        }
×
1023
        workflowID := completeRequest.GetWorkflowID()
76✔
1024
        runID := completeRequest.GetRunID() // runID is optional so can be empty
76✔
1025
        activityID := completeRequest.GetActivityID()
76✔
1026

76✔
1027
        if domainID == "" {
76✔
1028
                return validate.ErrDomainNotSet
×
1029
        }
×
1030
        if workflowID == "" {
76✔
1031
                return validate.ErrWorkflowIDNotSet
×
1032
        }
×
1033
        if activityID == "" {
76✔
1034
                return validate.ErrActivityIDNotSet
×
1035
        }
×
1036

1037
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedByIDScope, completeRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
76✔
1038
        if !common.IsValidIDLength(
76✔
1039
                completeRequest.GetIdentity(),
76✔
1040
                scope,
76✔
1041
                wh.config.MaxIDLengthWarnLimit(),
76✔
1042
                wh.config.IdentityMaxLength(domainName),
76✔
1043
                metrics.CadenceErrIdentityExceededWarnLimit,
76✔
1044
                domainName,
76✔
1045
                wh.GetLogger(),
76✔
1046
                tag.IDTypeIdentity) {
76✔
1047
                return validate.ErrIdentityTooLong
×
1048
        }
×
1049

1050
        taskToken := &common.TaskToken{
76✔
1051
                DomainID:   domainID,
76✔
1052
                RunID:      runID,
76✔
1053
                WorkflowID: workflowID,
76✔
1054
                ScheduleID: common.EmptyEventID,
76✔
1055
                ActivityID: activityID,
76✔
1056
        }
76✔
1057
        token, err := wh.tokenSerializer.Serialize(taskToken)
76✔
1058
        if err != nil {
76✔
1059
                return err
×
1060
        }
×
1061

1062
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
76✔
1063
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
76✔
1064

76✔
1065
        if err := common.CheckEventBlobSizeLimit(
76✔
1066
                len(completeRequest.Result),
76✔
1067
                sizeLimitWarn,
76✔
1068
                sizeLimitError,
76✔
1069
                taskToken.DomainID,
76✔
1070
                taskToken.WorkflowID,
76✔
1071
                taskToken.RunID,
76✔
1072
                scope,
76✔
1073
                wh.GetThrottledLogger(),
76✔
1074
                tag.BlobSizeViolationOperation("RespondActivityTaskCompletedByID"),
76✔
1075
        ); err != nil {
76✔
1076
                // result exceeds blob size limit, we would record it as failure
×
1077
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1078
                        TaskToken: token,
×
1079
                        Reason:    common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
×
1080
                        Details:   completeRequest.Result[0:sizeLimitError],
×
1081
                        Identity:  completeRequest.Identity,
×
1082
                }
×
1083
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1084
                        DomainUUID:    taskToken.DomainID,
×
1085
                        FailedRequest: failRequest,
×
1086
                })
×
1087
                if err != nil {
×
1088
                        return wh.normalizeVersionedErrors(ctx, err)
×
1089
                }
×
1090
        } else {
76✔
1091
                req := &types.RespondActivityTaskCompletedRequest{
76✔
1092
                        TaskToken: token,
76✔
1093
                        Result:    completeRequest.Result,
76✔
1094
                        Identity:  completeRequest.Identity,
76✔
1095
                }
76✔
1096

76✔
1097
                err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
76✔
1098
                        DomainUUID:      taskToken.DomainID,
76✔
1099
                        CompleteRequest: req,
76✔
1100
                })
76✔
1101
                if err != nil {
76✔
1102
                        return wh.normalizeVersionedErrors(ctx, err)
×
1103
                }
×
1104
        }
1105

1106
        return nil
76✔
1107
}
1108

1109
// RespondActivityTaskFailed - response to an activity task failure
1110
func (wh *WorkflowHandler) RespondActivityTaskFailed(
1111
        ctx context.Context,
1112
        failedRequest *types.RespondActivityTaskFailedRequest,
1113
) (retError error) {
12✔
1114
        if wh.isShuttingDown() {
12✔
1115
                return validate.ErrShuttingDown
×
1116
        }
×
1117

1118
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
12✔
1119
                return err
×
1120
        }
×
1121

1122
        if failedRequest == nil {
12✔
1123
                return validate.ErrRequestNotSet
×
1124
        }
×
1125

1126
        if failedRequest.TaskToken == nil {
12✔
1127
                return validate.ErrTaskTokenNotSet
×
1128
        }
×
1129
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
12✔
1130
        if err != nil {
12✔
1131
                return err
×
1132
        }
×
1133
        if taskToken.DomainID == "" {
12✔
1134
                return validate.ErrDomainNotSet
×
1135
        }
×
1136

1137
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
12✔
1138
        if err != nil {
12✔
1139
                return err
×
1140
        }
×
1141

1142
        dw := domainWrapper{
12✔
1143
                domain: domainName,
12✔
1144
        }
12✔
1145
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
12✔
1146
        if !common.IsValidIDLength(
12✔
1147
                failedRequest.GetIdentity(),
12✔
1148
                scope,
12✔
1149
                wh.config.MaxIDLengthWarnLimit(),
12✔
1150
                wh.config.IdentityMaxLength(domainName),
12✔
1151
                metrics.CadenceErrIdentityExceededWarnLimit,
12✔
1152
                domainName,
12✔
1153
                wh.GetLogger(),
12✔
1154
                tag.IDTypeIdentity) {
12✔
1155
                return validate.ErrIdentityTooLong
×
1156
        }
×
1157

1158
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
12✔
1159
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
12✔
1160

12✔
1161
        if err := common.CheckEventBlobSizeLimit(
12✔
1162
                len(failedRequest.Details),
12✔
1163
                sizeLimitWarn,
12✔
1164
                sizeLimitError,
12✔
1165
                taskToken.DomainID,
12✔
1166
                taskToken.WorkflowID,
12✔
1167
                taskToken.RunID,
12✔
1168
                scope,
12✔
1169
                wh.GetThrottledLogger(),
12✔
1170
                tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
12✔
1171
        ); err != nil {
12✔
1172
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1173
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1174
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1175
        }
×
1176

1177
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
12✔
1178
                DomainUUID:    taskToken.DomainID,
12✔
1179
                FailedRequest: failedRequest,
12✔
1180
        })
12✔
1181
        if err != nil {
12✔
1182
                return wh.normalizeVersionedErrors(ctx, err)
×
1183
        }
×
1184
        return nil
12✔
1185
}
1186

1187
// RespondActivityTaskFailedByID - response to an activity task failure
1188
func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
1189
        ctx context.Context,
1190
        failedRequest *types.RespondActivityTaskFailedByIDRequest,
1191
) (retError error) {
×
1192
        if wh.isShuttingDown() {
×
1193
                return validate.ErrShuttingDown
×
1194
        }
×
1195

1196
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1197
                return err
×
1198
        }
×
1199

1200
        if failedRequest == nil {
×
1201
                return validate.ErrRequestNotSet
×
1202
        }
×
1203

1204
        domainName := failedRequest.GetDomain()
×
1205

×
1206
        if domainName == "" {
×
1207
                return validate.ErrDomainNotSet
×
1208
        }
×
1209
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1210
        if err != nil {
×
1211
                return err
×
1212
        }
×
1213
        workflowID := failedRequest.GetWorkflowID()
×
1214
        runID := failedRequest.GetRunID() // runID is optional so can be empty
×
1215
        activityID := failedRequest.GetActivityID()
×
1216

×
1217
        if domainID == "" {
×
1218
                return validate.ErrDomainNotSet
×
1219
        }
×
1220
        if workflowID == "" {
×
1221
                return validate.ErrWorkflowIDNotSet
×
1222
        }
×
1223
        if activityID == "" {
×
1224
                return validate.ErrActivityIDNotSet
×
1225
        }
×
1226

1227
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedByIDScope, failedRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1228
        if !common.IsValidIDLength(
×
1229
                failedRequest.GetIdentity(),
×
1230
                scope,
×
1231
                wh.config.MaxIDLengthWarnLimit(),
×
1232
                wh.config.IdentityMaxLength(failedRequest.GetDomain()),
×
1233
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1234
                domainName,
×
1235
                wh.GetLogger(),
×
1236
                tag.IDTypeIdentity) {
×
1237
                return validate.ErrIdentityTooLong
×
1238
        }
×
1239

1240
        taskToken := &common.TaskToken{
×
1241
                DomainID:   domainID,
×
1242
                RunID:      runID,
×
1243
                WorkflowID: workflowID,
×
1244
                ScheduleID: common.EmptyEventID,
×
1245
                ActivityID: activityID,
×
1246
        }
×
1247
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1248
        if err != nil {
×
1249
                return err
×
1250
        }
×
1251

1252
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1253
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1254

×
1255
        if err := common.CheckEventBlobSizeLimit(
×
1256
                len(failedRequest.Details),
×
1257
                sizeLimitWarn,
×
1258
                sizeLimitError,
×
1259
                taskToken.DomainID,
×
1260
                taskToken.WorkflowID,
×
1261
                taskToken.RunID,
×
1262
                scope,
×
1263
                wh.GetThrottledLogger(),
×
1264
                tag.BlobSizeViolationOperation("RespondActivityTaskFailedByID"),
×
1265
        ); err != nil {
×
1266
                // details exceeds blob size limit, we would truncate the details and put a specific error reason
×
1267
                failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
×
1268
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1269
        }
×
1270

1271
        req := &types.RespondActivityTaskFailedRequest{
×
1272
                TaskToken: token,
×
1273
                Reason:    failedRequest.Reason,
×
1274
                Details:   failedRequest.Details,
×
1275
                Identity:  failedRequest.Identity,
×
1276
        }
×
1277

×
1278
        err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1279
                DomainUUID:    taskToken.DomainID,
×
1280
                FailedRequest: req,
×
1281
        })
×
1282
        if err != nil {
×
1283
                return wh.normalizeVersionedErrors(ctx, err)
×
1284
        }
×
1285
        return nil
×
1286
}
1287

1288
// RespondActivityTaskCanceled - called to cancel an activity task
1289
func (wh *WorkflowHandler) RespondActivityTaskCanceled(
1290
        ctx context.Context,
1291
        cancelRequest *types.RespondActivityTaskCanceledRequest,
1292
) (retError error) {
×
1293
        if wh.isShuttingDown() {
×
1294
                return validate.ErrShuttingDown
×
1295
        }
×
1296

1297
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1298
                return err
×
1299
        }
×
1300

1301
        if cancelRequest == nil {
×
1302
                return validate.ErrRequestNotSet
×
1303
        }
×
1304

1305
        if cancelRequest.TaskToken == nil {
×
1306
                return validate.ErrTaskTokenNotSet
×
1307
        }
×
1308

1309
        taskToken, err := wh.tokenSerializer.Deserialize(cancelRequest.TaskToken)
×
1310
        if err != nil {
×
1311
                return err
×
1312
        }
×
1313

1314
        if taskToken.DomainID == "" {
×
1315
                return validate.ErrDomainNotSet
×
1316
        }
×
1317

1318
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
×
1319
        if err != nil {
×
1320
                return err
×
1321
        }
×
1322

1323
        dw := domainWrapper{
×
1324
                domain: domainName,
×
1325
        }
×
1326
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1327
        if !common.IsValidIDLength(
×
1328
                cancelRequest.GetIdentity(),
×
1329
                scope,
×
1330
                wh.config.MaxIDLengthWarnLimit(),
×
1331
                wh.config.IdentityMaxLength(domainName),
×
1332
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1333
                domainName,
×
1334
                wh.GetLogger(),
×
1335
                tag.IDTypeIdentity) {
×
1336
                return validate.ErrIdentityTooLong
×
1337
        }
×
1338

1339
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1340
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1341

×
1342
        if err := common.CheckEventBlobSizeLimit(
×
1343
                len(cancelRequest.Details),
×
1344
                sizeLimitWarn,
×
1345
                sizeLimitError,
×
1346
                taskToken.DomainID,
×
1347
                taskToken.WorkflowID,
×
1348
                taskToken.RunID,
×
1349
                scope,
×
1350
                wh.GetThrottledLogger(),
×
1351
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceled"),
×
1352
        ); err != nil {
×
1353
                // details exceeds blob size limit, we would record it as failure
×
1354
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1355
                        TaskToken: cancelRequest.TaskToken,
×
1356
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1357
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1358
                        Identity:  cancelRequest.Identity,
×
1359
                }
×
1360
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1361
                        DomainUUID:    taskToken.DomainID,
×
1362
                        FailedRequest: failRequest,
×
1363
                })
×
1364
                if err != nil {
×
1365
                        return wh.normalizeVersionedErrors(ctx, err)
×
1366
                }
×
1367
        } else {
×
1368
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1369
                        DomainUUID:    taskToken.DomainID,
×
1370
                        CancelRequest: cancelRequest,
×
1371
                })
×
1372
                if err != nil {
×
1373
                        return wh.normalizeVersionedErrors(ctx, err)
×
1374
                }
×
1375
        }
1376

1377
        return nil
×
1378
}
1379

1380
// RespondActivityTaskCanceledByID - called to cancel an activity task
1381
func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
1382
        ctx context.Context,
1383
        cancelRequest *types.RespondActivityTaskCanceledByIDRequest,
1384
) (retError error) {
×
1385
        if wh.isShuttingDown() {
×
1386
                return validate.ErrShuttingDown
×
1387
        }
×
1388

1389
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
×
1390
                return err
×
1391
        }
×
1392

1393
        if cancelRequest == nil {
×
1394
                return validate.ErrRequestNotSet
×
1395
        }
×
1396

1397
        domainName := cancelRequest.GetDomain()
×
1398
        if domainName == "" {
×
1399
                return validate.ErrDomainNotSet
×
1400
        }
×
1401
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
×
1402
        if err != nil {
×
1403
                return err
×
1404
        }
×
1405
        workflowID := cancelRequest.GetWorkflowID()
×
1406
        runID := cancelRequest.GetRunID() // runID is optional so can be empty
×
1407
        activityID := cancelRequest.GetActivityID()
×
1408

×
1409
        if domainID == "" {
×
1410
                return validate.ErrDomainNotSet
×
1411
        }
×
1412
        if workflowID == "" {
×
1413
                return validate.ErrWorkflowIDNotSet
×
1414
        }
×
1415
        if activityID == "" {
×
1416
                return validate.ErrActivityIDNotSet
×
1417
        }
×
1418

1419
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledByIDScope, cancelRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
1420
        if !common.IsValidIDLength(
×
1421
                cancelRequest.GetIdentity(),
×
1422
                scope,
×
1423
                wh.config.MaxIDLengthWarnLimit(),
×
1424
                wh.config.IdentityMaxLength(cancelRequest.GetDomain()),
×
1425
                metrics.CadenceErrIdentityExceededWarnLimit,
×
1426
                domainName,
×
1427
                wh.GetLogger(),
×
1428
                tag.IDTypeIdentity) {
×
1429
                return validate.ErrIdentityTooLong
×
1430
        }
×
1431

1432
        taskToken := &common.TaskToken{
×
1433
                DomainID:   domainID,
×
1434
                RunID:      runID,
×
1435
                WorkflowID: workflowID,
×
1436
                ScheduleID: common.EmptyEventID,
×
1437
                ActivityID: activityID,
×
1438
        }
×
1439
        token, err := wh.tokenSerializer.Serialize(taskToken)
×
1440
        if err != nil {
×
1441
                return err
×
1442
        }
×
1443

1444
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
×
1445
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
×
1446

×
1447
        if err := common.CheckEventBlobSizeLimit(
×
1448
                len(cancelRequest.Details),
×
1449
                sizeLimitWarn,
×
1450
                sizeLimitError,
×
1451
                taskToken.DomainID,
×
1452
                taskToken.WorkflowID,
×
1453
                taskToken.RunID,
×
1454
                scope,
×
1455
                wh.GetThrottledLogger(),
×
1456
                tag.BlobSizeViolationOperation("RespondActivityTaskCanceledByID"),
×
1457
        ); err != nil {
×
1458
                // details exceeds blob size limit, we would record it as failure
×
1459
                failRequest := &types.RespondActivityTaskFailedRequest{
×
1460
                        TaskToken: token,
×
1461
                        Reason:    common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
×
1462
                        Details:   cancelRequest.Details[0:sizeLimitError],
×
1463
                        Identity:  cancelRequest.Identity,
×
1464
                }
×
1465
                err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
×
1466
                        DomainUUID:    taskToken.DomainID,
×
1467
                        FailedRequest: failRequest,
×
1468
                })
×
1469
                if err != nil {
×
1470
                        return wh.normalizeVersionedErrors(ctx, err)
×
1471
                }
×
1472
        } else {
×
1473
                req := &types.RespondActivityTaskCanceledRequest{
×
1474
                        TaskToken: token,
×
1475
                        Details:   cancelRequest.Details,
×
1476
                        Identity:  cancelRequest.Identity,
×
1477
                }
×
1478

×
1479
                err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
×
1480
                        DomainUUID:    taskToken.DomainID,
×
1481
                        CancelRequest: req,
×
1482
                })
×
1483
                if err != nil {
×
1484
                        return wh.normalizeVersionedErrors(ctx, err)
×
1485
                }
×
1486
        }
1487

1488
        return nil
×
1489
}
1490

1491
// RespondDecisionTaskCompleted - response to a decision task
1492
func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
1493
        ctx context.Context,
1494
        completeRequest *types.RespondDecisionTaskCompletedRequest,
1495
) (resp *types.RespondDecisionTaskCompletedResponse, retError error) {
928✔
1496
        if wh.isShuttingDown() {
928✔
1497
                return nil, validate.ErrShuttingDown
×
1498
        }
×
1499

1500
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
928✔
1501
                return nil, err
×
1502
        }
×
1503

1504
        if completeRequest == nil {
928✔
1505
                return nil, validate.ErrRequestNotSet
×
1506
        }
×
1507

1508
        if completeRequest.TaskToken == nil {
928✔
1509
                return nil, validate.ErrTaskTokenNotSet
×
1510
        }
×
1511
        taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
928✔
1512
        if err != nil {
928✔
1513
                return nil, err
×
1514
        }
×
1515
        if taskToken.DomainID == "" {
928✔
1516
                return nil, validate.ErrDomainNotSet
×
1517
        }
×
1518

1519
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
928✔
1520
        if err != nil {
928✔
1521
                return nil, err
×
1522
        }
×
1523

1524
        dw := domainWrapper{
928✔
1525
                domain: domainName,
928✔
1526
        }
928✔
1527
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
928✔
1528
        if !common.IsValidIDLength(
928✔
1529
                completeRequest.GetIdentity(),
928✔
1530
                scope,
928✔
1531
                wh.config.MaxIDLengthWarnLimit(),
928✔
1532
                wh.config.IdentityMaxLength(domainName),
928✔
1533
                metrics.CadenceErrIdentityExceededWarnLimit,
928✔
1534
                domainName,
928✔
1535
                wh.GetLogger(),
928✔
1536
                tag.IDTypeIdentity) {
928✔
1537
                return nil, validate.ErrIdentityTooLong
×
1538
        }
×
1539

1540
        if err := common.CheckDecisionResultLimit(
928✔
1541
                len(completeRequest.Decisions),
928✔
1542
                wh.config.DecisionResultCountLimit(domainName),
928✔
1543
                scope); err != nil {
928✔
1544
                return nil, err
×
1545
        }
×
1546

1547
        histResp, err := wh.GetHistoryClient().RespondDecisionTaskCompleted(ctx, &types.HistoryRespondDecisionTaskCompletedRequest{
928✔
1548
                DomainUUID:      taskToken.DomainID,
928✔
1549
                CompleteRequest: completeRequest},
928✔
1550
        )
928✔
1551
        if err != nil {
937✔
1552
                return nil, wh.normalizeVersionedErrors(ctx, err)
9✔
1553
        }
9✔
1554

1555
        completedResp := &types.RespondDecisionTaskCompletedResponse{}
919✔
1556
        completedResp.ActivitiesToDispatchLocally = histResp.ActivitiesToDispatchLocally
919✔
1557
        if completeRequest.GetReturnNewDecisionTask() && histResp != nil && histResp.StartedResponse != nil {
979✔
1558
                taskToken := &common.TaskToken{
60✔
1559
                        DomainID:        taskToken.DomainID,
60✔
1560
                        WorkflowID:      taskToken.WorkflowID,
60✔
1561
                        RunID:           taskToken.RunID,
60✔
1562
                        ScheduleID:      histResp.StartedResponse.GetScheduledEventID(),
60✔
1563
                        ScheduleAttempt: histResp.StartedResponse.GetAttempt(),
60✔
1564
                }
60✔
1565
                token, _ := wh.tokenSerializer.Serialize(taskToken)
60✔
1566
                workflowExecution := &types.WorkflowExecution{
60✔
1567
                        WorkflowID: taskToken.WorkflowID,
60✔
1568
                        RunID:      taskToken.RunID,
60✔
1569
                }
60✔
1570
                matchingResp := common.CreateMatchingPollForDecisionTaskResponse(histResp.StartedResponse, workflowExecution, token)
60✔
1571

60✔
1572
                newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, scope, taskToken.DomainID, matchingResp, matchingResp.GetBranchToken())
60✔
1573
                if err != nil {
60✔
1574
                        return nil, err
×
1575
                }
×
1576
                completedResp.DecisionTask = newDecisionTask
60✔
1577
        }
1578

1579
        return completedResp, nil
919✔
1580
}
1581

1582
// RespondDecisionTaskFailed - failed response to a decision task
1583
func (wh *WorkflowHandler) RespondDecisionTaskFailed(
1584
        ctx context.Context,
1585
        failedRequest *types.RespondDecisionTaskFailedRequest,
1586
) (retError error) {
159✔
1587
        if wh.isShuttingDown() {
159✔
1588
                return validate.ErrShuttingDown
×
1589
        }
×
1590

1591
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
159✔
1592
                return err
×
1593
        }
×
1594

1595
        if failedRequest == nil {
159✔
1596
                return validate.ErrRequestNotSet
×
1597
        }
×
1598

1599
        if failedRequest.TaskToken == nil {
159✔
1600
                return validate.ErrTaskTokenNotSet
×
1601
        }
×
1602
        taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
159✔
1603
        if err != nil {
159✔
1604
                return err
×
1605
        }
×
1606
        if taskToken.DomainID == "" {
159✔
1607
                return validate.ErrDomainNotSet
×
1608
        }
×
1609

1610
        domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
159✔
1611
        if err != nil {
159✔
1612
                return err
×
1613
        }
×
1614

1615
        dw := domainWrapper{
159✔
1616
                domain: domainName,
159✔
1617
        }
159✔
1618
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
159✔
1619
        if !common.IsValidIDLength(
159✔
1620
                failedRequest.GetIdentity(),
159✔
1621
                scope,
159✔
1622
                wh.config.MaxIDLengthWarnLimit(),
159✔
1623
                wh.config.IdentityMaxLength(domainName),
159✔
1624
                metrics.CadenceErrIdentityExceededWarnLimit,
159✔
1625
                domainName,
159✔
1626
                wh.GetLogger(),
159✔
1627
                tag.IDTypeIdentity) {
159✔
1628
                return validate.ErrIdentityTooLong
×
1629
        }
×
1630

1631
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
159✔
1632
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
159✔
1633

159✔
1634
        if err := common.CheckEventBlobSizeLimit(
159✔
1635
                len(failedRequest.Details),
159✔
1636
                sizeLimitWarn,
159✔
1637
                sizeLimitError,
159✔
1638
                taskToken.DomainID,
159✔
1639
                taskToken.WorkflowID,
159✔
1640
                taskToken.RunID,
159✔
1641
                scope,
159✔
1642
                wh.GetThrottledLogger(),
159✔
1643
                tag.BlobSizeViolationOperation("RespondDecisionTaskFailed"),
159✔
1644
        ); err != nil {
159✔
1645
                // details exceed, we would just truncate the size for decision task failed as the details is not used anywhere by client code
×
1646
                failedRequest.Details = failedRequest.Details[0:sizeLimitError]
×
1647
        }
×
1648

1649
        err = wh.GetHistoryClient().RespondDecisionTaskFailed(ctx, &types.HistoryRespondDecisionTaskFailedRequest{
159✔
1650
                DomainUUID:    taskToken.DomainID,
159✔
1651
                FailedRequest: failedRequest,
159✔
1652
        })
159✔
1653
        if err != nil {
159✔
1654
                return wh.normalizeVersionedErrors(ctx, err)
×
1655
        }
×
1656
        return nil
159✔
1657
}
1658

1659
// RespondQueryTaskCompleted - response to a query task
1660
func (wh *WorkflowHandler) RespondQueryTaskCompleted(
1661
        ctx context.Context,
1662
        completeRequest *types.RespondQueryTaskCompletedRequest,
1663
) (retError error) {
30✔
1664
        if wh.isShuttingDown() {
30✔
1665
                return validate.ErrShuttingDown
×
1666
        }
×
1667

1668
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
30✔
1669
                return err
×
1670
        }
×
1671

1672
        if completeRequest == nil {
30✔
1673
                return validate.ErrRequestNotSet
×
1674
        }
×
1675

1676
        if completeRequest.TaskToken == nil {
30✔
1677
                return validate.ErrTaskTokenNotSet
×
1678
        }
×
1679
        queryTaskToken, err := wh.tokenSerializer.DeserializeQueryTaskToken(completeRequest.TaskToken)
30✔
1680
        if err != nil {
30✔
1681
                return err
×
1682
        }
×
1683
        if queryTaskToken.DomainID == "" || queryTaskToken.TaskList == "" || queryTaskToken.TaskID == "" {
30✔
1684
                return validate.ErrInvalidTaskToken
×
1685
        }
×
1686

1687
        domainName, err := wh.GetDomainCache().GetDomainName(queryTaskToken.DomainID)
30✔
1688
        if err != nil {
30✔
1689
                return err
×
1690
        }
×
1691

1692
        dw := domainWrapper{
30✔
1693
                domain: domainName,
30✔
1694
        }
30✔
1695
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
30✔
1696
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
30✔
1697

30✔
1698
        scope := getMetricsScopeWithDomain(metrics.FrontendRespondQueryTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
30✔
1699
        if err := common.CheckEventBlobSizeLimit(
30✔
1700
                len(completeRequest.GetQueryResult()),
30✔
1701
                sizeLimitWarn,
30✔
1702
                sizeLimitError,
30✔
1703
                queryTaskToken.DomainID,
30✔
1704
                "",
30✔
1705
                "",
30✔
1706
                scope,
30✔
1707
                wh.GetThrottledLogger(),
30✔
1708
                tag.BlobSizeViolationOperation("RespondQueryTaskCompleted"),
30✔
1709
        ); err != nil {
30✔
1710
                completeRequest = &types.RespondQueryTaskCompletedRequest{
×
1711
                        TaskToken:     completeRequest.TaskToken,
×
1712
                        CompletedType: types.QueryTaskCompletedTypeFailed.Ptr(),
×
1713
                        QueryResult:   nil,
×
1714
                        ErrorMessage:  err.Error(),
×
1715
                }
×
1716
        }
×
1717

1718
        call := yarpc.CallFromContext(ctx)
30✔
1719

30✔
1720
        completeRequest.WorkerVersionInfo = &types.WorkerVersionInfo{
30✔
1721
                Impl:           call.Header(common.ClientImplHeaderName),
30✔
1722
                FeatureVersion: call.Header(common.FeatureVersionHeaderName),
30✔
1723
        }
30✔
1724
        matchingRequest := &types.MatchingRespondQueryTaskCompletedRequest{
30✔
1725
                DomainUUID:       queryTaskToken.DomainID,
30✔
1726
                TaskList:         &types.TaskList{Name: queryTaskToken.TaskList},
30✔
1727
                TaskID:           queryTaskToken.TaskID,
30✔
1728
                CompletedRequest: completeRequest,
30✔
1729
        }
30✔
1730

30✔
1731
        err = wh.GetMatchingClient().RespondQueryTaskCompleted(ctx, matchingRequest)
30✔
1732
        if err != nil {
30✔
1733
                return err
×
1734
        }
×
1735
        return nil
30✔
1736
}
1737

1738
func (wh *WorkflowHandler) StartWorkflowExecutionAsync(
1739
        ctx context.Context,
1740
        startRequest *types.StartWorkflowExecutionAsyncRequest,
1741
) (resp *types.StartWorkflowExecutionAsyncResponse, retError error) {
3✔
1742
        if wh.isShuttingDown() {
3✔
1743
                return nil, validate.ErrShuttingDown
×
1744
        }
×
1745
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionAsyncScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
3✔
1746
        // validate request before pushing to queue
3✔
1747
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest.StartWorkflowExecutionRequest, scope)
3✔
1748
        if err != nil {
3✔
1749
                return nil, err
×
1750
        }
×
1751

1752
        producer, err := wh.GetAsyncWorkflowQueueProvider().GetAsyncQueueProducer(startRequest.GetDomain())
3✔
1753
        if err != nil {
4✔
1754
                return nil, err
1✔
1755
        }
1✔
1756
        // serialize the message to be sent to the queue
1757
        payload, err := json.Marshal(startRequest)
2✔
1758
        if err != nil {
2✔
1759
                return nil, err
×
1760
        }
×
1761
        // propagate the headers from the context to the message
1762
        clientHeaders := common.GetClientHeaders(ctx)
2✔
1763
        header := &shared.Header{
2✔
1764
                Fields: map[string][]byte{},
2✔
1765
        }
2✔
1766
        for k, v := range clientHeaders {
12✔
1767
                header.Fields[k] = []byte(v)
10✔
1768
        }
10✔
1769
        messageType := sqlblobs.AsyncRequestTypeStartWorkflowExecutionAsyncRequest
2✔
1770
        message := &sqlblobs.AsyncRequestMessage{
2✔
1771
                PartitionKey: common.StringPtr(startRequest.GetWorkflowID()),
2✔
1772
                Type:         &messageType,
2✔
1773
                Header:       header,
2✔
1774
                Encoding:     common.StringPtr(string(common.EncodingTypeJSON)),
2✔
1775
                Payload:      payload,
2✔
1776
        }
2✔
1777
        err = producer.Publish(ctx, message)
2✔
1778
        if err != nil {
3✔
1779
                return nil, err
1✔
1780
        }
1✔
1781
        return &types.StartWorkflowExecutionAsyncResponse{}, nil
1✔
1782
}
1783

1784
// StartWorkflowExecution - Creates a new workflow execution
1785
func (wh *WorkflowHandler) StartWorkflowExecution(
1786
        ctx context.Context,
1787
        startRequest *types.StartWorkflowExecutionRequest,
1788
) (resp *types.StartWorkflowExecutionResponse, retError error) {
455✔
1789
        if wh.isShuttingDown() {
455✔
1790
                return nil, validate.ErrShuttingDown
×
1791
        }
×
1792
        scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
455✔
1793
        err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest, scope)
455✔
1794
        if err != nil {
466✔
1795
                return nil, err
11✔
1796
        }
11✔
1797
        domainName := startRequest.GetDomain()
444✔
1798
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
444✔
1799
        if err != nil {
444✔
1800
                return nil, err
×
1801
        }
×
1802
        wh.GetLogger().Debug("Start workflow execution request domainID", tag.WorkflowDomainID(domainID))
444✔
1803
        historyRequest, err := common.CreateHistoryStartWorkflowRequest(
444✔
1804
                domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
444✔
1805
        if err != nil {
444✔
1806
                return nil, err
×
1807
        }
×
1808

1809
        resp, err = wh.GetHistoryClient().StartWorkflowExecution(ctx, historyRequest)
444✔
1810
        if err != nil {
462✔
1811
                return nil, err
18✔
1812
        }
18✔
1813
        return resp, nil
426✔
1814
}
1815

1816
func (wh *WorkflowHandler) validateStartWorkflowExecutionRequest(ctx context.Context, startRequest *types.StartWorkflowExecutionRequest, scope metrics.Scope) error {
458✔
1817
        if startRequest == nil {
459✔
1818
                return validate.ErrRequestNotSet
1✔
1819
        }
1✔
1820
        domainName := startRequest.GetDomain()
457✔
1821
        if domainName == "" {
458✔
1822
                return validate.ErrDomainNotSet
1✔
1823
        }
1✔
1824
        if startRequest.GetWorkflowID() == "" {
457✔
1825
                return validate.ErrWorkflowIDNotSet
1✔
1826
        }
1✔
1827
        if _, err := uuid.Parse(startRequest.RequestID); err != nil {
457✔
1828
                return &types.BadRequestError{Message: fmt.Sprintf("requestId %q is not a valid UUID", startRequest.RequestID)}
2✔
1829
        }
2✔
1830
        if startRequest.WorkflowType == nil || startRequest.WorkflowType.GetName() == "" {
454✔
1831
                return validate.ErrWorkflowTypeNotSet
1✔
1832
        }
1✔
1833
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
452✔
1834
                return err
×
1835
        }
×
1836
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
452✔
1837
        if !common.IsValidIDLength(
452✔
1838
                domainName,
452✔
1839
                scope,
452✔
1840
                idLengthWarnLimit,
452✔
1841
                wh.config.DomainNameMaxLength(domainName),
452✔
1842
                metrics.CadenceErrDomainNameExceededWarnLimit,
452✔
1843
                domainName,
452✔
1844
                wh.GetLogger(),
452✔
1845
                tag.IDTypeDomainName) {
452✔
1846
                return validate.ErrDomainTooLong
×
1847
        }
×
1848
        if !common.IsValidIDLength(
452✔
1849
                startRequest.GetWorkflowID(),
452✔
1850
                scope,
452✔
1851
                idLengthWarnLimit,
452✔
1852
                wh.config.WorkflowIDMaxLength(domainName),
452✔
1853
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
452✔
1854
                domainName,
452✔
1855
                wh.GetLogger(),
452✔
1856
                tag.IDTypeWorkflowID) {
452✔
1857
                return validate.ErrWorkflowIDTooLong
×
1858
        }
×
1859
        if err := common.ValidateRetryPolicy(startRequest.RetryPolicy); err != nil {
452✔
1860
                return err
×
1861
        }
×
1862
        wh.GetLogger().Debug(
452✔
1863
                "Received StartWorkflowExecution. WorkflowID",
452✔
1864
                tag.WorkflowID(startRequest.GetWorkflowID()))
452✔
1865
        if !common.IsValidIDLength(
452✔
1866
                startRequest.WorkflowType.GetName(),
452✔
1867
                scope,
452✔
1868
                idLengthWarnLimit,
452✔
1869
                wh.config.WorkflowTypeMaxLength(domainName),
452✔
1870
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
452✔
1871
                domainName,
452✔
1872
                wh.GetLogger(),
452✔
1873
                tag.IDTypeWorkflowType) {
452✔
1874
                return validate.ErrWorkflowTypeTooLong
×
1875
        }
×
1876
        if err := wh.validateTaskList(startRequest.TaskList, scope, domainName); err != nil {
453✔
1877
                return err
1✔
1878
        }
1✔
1879
        if startRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
452✔
1880
                return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
1✔
1881
        }
1✔
1882
        if startRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
451✔
1883
                return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
1✔
1884
        }
1✔
1885
        if startRequest.GetDelayStartSeconds() < 0 {
450✔
1886
                return validate.ErrInvalidDelayStartSeconds
1✔
1887
        }
1✔
1888
        if startRequest.GetJitterStartSeconds() < 0 {
448✔
1889
                return validate.ErrInvalidJitterStartSeconds
×
1890
        }
×
1891
        jitter := startRequest.GetJitterStartSeconds()
448✔
1892
        cron := startRequest.GetCronSchedule()
448✔
1893
        if cron != "" {
466✔
1894
                if _, err := backoff.ValidateSchedule(startRequest.GetCronSchedule()); err != nil {
18✔
1895
                        return err
×
1896
                }
×
1897
        }
1898
        if jitter > 0 && cron != "" {
448✔
1899
                // Calculate the cron duration and ensure that jitter is not greater than the cron duration,
×
1900
                // because that would be confusing to users.
×
1901

×
1902
                // Request using start/end time zero value, which will get us an exact answer (i.e. its not in the
×
1903
                // middle of a minute)
×
1904
                backoffSeconds, err := backoff.GetBackoffForNextScheduleInSeconds(cron, time.Time{}, time.Time{}, jitter)
×
1905
                if err != nil {
×
1906
                        return err
×
1907
                }
×
1908
                if jitter > backoffSeconds {
×
1909
                        return validate.ErrInvalidJitterStartSeconds2
×
1910
                }
×
1911
        }
1912
        if !common.IsValidIDLength(
448✔
1913
                startRequest.GetRequestID(),
448✔
1914
                scope,
448✔
1915
                idLengthWarnLimit,
448✔
1916
                wh.config.RequestIDMaxLength(domainName),
448✔
1917
                metrics.CadenceErrRequestIDExceededWarnLimit,
448✔
1918
                domainName,
448✔
1919
                wh.GetLogger(),
448✔
1920
                tag.IDTypeRequestID) {
448✔
1921
                return validate.ErrRequestIDTooLong
×
1922
        }
×
1923
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(startRequest.SearchAttributes, domainName); err != nil {
448✔
1924
                return err
×
1925
        }
×
1926
        wh.GetLogger().Debug("Start workflow execution request domain", tag.WorkflowDomainName(domainName))
448✔
1927
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
448✔
1928
        if err != nil {
448✔
1929
                return err
×
1930
        }
×
1931
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
448✔
1932
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
448✔
1933
        actualSize := len(startRequest.Input)
448✔
1934
        if startRequest.Memo != nil {
460✔
1935
                actualSize += common.GetSizeOfMapStringToByteArray(startRequest.Memo.GetFields())
12✔
1936
        }
12✔
1937
        if err := common.CheckEventBlobSizeLimit(
448✔
1938
                actualSize,
448✔
1939
                sizeLimitWarn,
448✔
1940
                sizeLimitError,
448✔
1941
                domainID,
448✔
1942
                startRequest.GetWorkflowID(),
448✔
1943
                "",
448✔
1944
                scope,
448✔
1945
                wh.GetThrottledLogger(),
448✔
1946
                tag.BlobSizeViolationOperation("StartWorkflowExecution"),
448✔
1947
        ); err != nil {
448✔
1948
                return err
×
1949
        }
×
1950
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
448✔
1951
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
449✔
1952
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
1953
        }
1✔
1954
        return nil
447✔
1955
}
1956

1957
// GetWorkflowExecutionHistory - retrieves the history of workflow execution
1958
func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
1959
        ctx context.Context,
1960
        getRequest *types.GetWorkflowExecutionHistoryRequest,
1961
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
458✔
1962
        if wh.isShuttingDown() {
458✔
1963
                return nil, validate.ErrShuttingDown
×
1964
        }
×
1965

1966
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
458✔
1967
                return nil, err
×
1968
        }
×
1969

1970
        if getRequest == nil {
458✔
1971
                return nil, validate.ErrRequestNotSet
×
1972
        }
×
1973

1974
        domainName := getRequest.GetDomain()
458✔
1975
        wfExecution := getRequest.GetExecution()
458✔
1976

458✔
1977
        if domainName == "" {
458✔
1978
                return nil, validate.ErrDomainNotSet
×
1979
        }
×
1980

1981
        if err := validate.CheckExecution(wfExecution); err != nil {
458✔
1982
                return nil, err
×
1983
        }
×
1984

1985
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
458✔
1986
        if err != nil {
458✔
1987
                return nil, err
×
1988
        }
×
1989

1990
        if getRequest.GetMaximumPageSize() <= 0 {
784✔
1991
                getRequest.MaximumPageSize = int32(wh.config.HistoryMaxPageSize(getRequest.GetDomain()))
326✔
1992
        }
326✔
1993
        // force limit page size if exceed
1994
        if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize {
458✔
1995
                wh.GetThrottledLogger().Warn("GetHistory page size is larger than threshold",
×
1996
                        tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
1997
                        tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
1998
                        tag.WorkflowDomainID(domainID),
×
1999
                        tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))
×
2000
                getRequest.MaximumPageSize = common.GetHistoryMaxPageSize
×
2001
        }
×
2002

2003
        scope := getMetricsScopeWithDomain(metrics.FrontendGetWorkflowExecutionHistoryScope, getRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
458✔
2004
        if !getRequest.GetSkipArchival() {
898✔
2005
                enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled()
440✔
2006
                historyArchived := wh.historyArchived(ctx, getRequest, domainID)
440✔
2007
                if enableArchivalRead && historyArchived {
461✔
2008
                        return wh.getArchivedHistory(ctx, getRequest, domainID)
21✔
2009
                }
21✔
2010
        }
2011

2012
        // this function return the following 6 things,
2013
        // 1. branch token
2014
        // 2. the workflow run ID
2015
        // 3. the last first event ID (the event ID of the last batch of events in the history)
2016
        // 4. the next event ID
2017
        // 5. whether the workflow is closed
2018
        // 6. error if any
2019
        queryHistory := func(
437✔
2020
                domainUUID string,
437✔
2021
                execution *types.WorkflowExecution,
437✔
2022
                expectedNextEventID int64,
437✔
2023
                currentBranchToken []byte,
437✔
2024
        ) ([]byte, string, int64, int64, bool, error) {
830✔
2025
                response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
393✔
2026
                        DomainUUID:          domainUUID,
393✔
2027
                        Execution:           execution,
393✔
2028
                        ExpectedNextEventID: expectedNextEventID,
393✔
2029
                        CurrentBranchToken:  currentBranchToken,
393✔
2030
                })
393✔
2031

393✔
2032
                if err != nil {
393✔
2033
                        return nil, "", 0, 0, false, err
×
2034
                }
×
2035
                isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
393✔
2036

393✔
2037
                return response.CurrentBranchToken,
393✔
2038
                        response.Execution.GetRunID(),
393✔
2039
                        response.GetLastFirstEventID(),
393✔
2040
                        response.GetNextEventID(),
393✔
2041
                        isWorkflowRunning,
393✔
2042
                        nil
393✔
2043
        }
2044

2045
        isLongPoll := getRequest.GetWaitForNewEvent()
437✔
2046
        isCloseEventOnly := getRequest.GetHistoryEventFilterType() == types.HistoryEventFilterTypeCloseEvent
437✔
2047
        execution := getRequest.Execution
437✔
2048
        token := &getHistoryContinuationToken{}
437✔
2049

437✔
2050
        var runID string
437✔
2051
        lastFirstEventID := common.FirstEventID
437✔
2052
        var nextEventID int64
437✔
2053
        var isWorkflowRunning bool
437✔
2054

437✔
2055
        // process the token for paging
437✔
2056
        queryNextEventID := common.EndEventID
437✔
2057
        if getRequest.NextPageToken != nil {
481✔
2058
                token, err = deserializeHistoryToken(getRequest.NextPageToken)
44✔
2059
                if err != nil {
44✔
2060
                        return nil, validate.ErrInvalidNextPageToken
×
2061
                }
×
2062
                if execution.RunID != "" && execution.GetRunID() != token.RunID {
44✔
2063
                        return nil, validate.ErrNextPageTokenRunIDMismatch
×
2064
                }
×
2065

2066
                execution.RunID = token.RunID
44✔
2067

44✔
2068
                // we need to update the current next event ID and whether workflow is running
44✔
2069
                if len(token.PersistenceToken) == 0 && isLongPoll && token.IsWorkflowRunning {
44✔
2070
                        logger := wh.GetLogger().WithTags(
×
2071
                                tag.WorkflowDomainName(getRequest.GetDomain()),
×
2072
                                tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
×
2073
                                tag.WorkflowRunID(getRequest.Execution.GetRunID()),
×
2074
                        )
×
2075
                        // TODO: for now we only log the invalid timeout (this is done inside the helper function) in case
×
2076
                        // this change breaks existing customers. Once we are sure no one is calling this API with very short timeout
×
2077
                        // we can return the error.
×
2078
                        _ = common.ValidateLongPollContextTimeout(ctx, "GetWorkflowExecutionHistory", logger)
×
2079

×
2080
                        if !isCloseEventOnly {
×
2081
                                queryNextEventID = token.NextEventID
×
2082
                        }
×
2083
                        token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
×
2084
                                queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
×
2085
                        if err != nil {
×
2086
                                return nil, err
×
2087
                        }
×
2088
                        token.FirstEventID = token.NextEventID
×
2089
                        token.NextEventID = nextEventID
×
2090
                        token.IsWorkflowRunning = isWorkflowRunning
×
2091
                }
2092
        } else {
393✔
2093
                if !isCloseEventOnly {
768✔
2094
                        queryNextEventID = common.FirstEventID
375✔
2095
                }
375✔
2096
                token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
393✔
2097
                        queryHistory(domainID, execution, queryNextEventID, nil)
393✔
2098
                if err != nil {
393✔
2099
                        return nil, err
×
2100
                }
×
2101

2102
                execution.RunID = runID
393✔
2103

393✔
2104
                token.RunID = runID
393✔
2105
                token.FirstEventID = common.FirstEventID
393✔
2106
                token.NextEventID = nextEventID
393✔
2107
                token.IsWorkflowRunning = isWorkflowRunning
393✔
2108
                token.PersistenceToken = nil
393✔
2109
        }
2110

2111
        call := yarpc.CallFromContext(ctx)
437✔
2112
        clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
437✔
2113
        clientImpl := call.Header(common.ClientImplHeaderName)
437✔
2114
        supportsRawHistoryQuery := wh.versionChecker.SupportsRawHistoryQuery(clientImpl, clientFeatureVersion) == nil
437✔
2115
        isRawHistoryEnabled := wh.config.SendRawWorkflowHistory(domainName) && supportsRawHistoryQuery
437✔
2116

437✔
2117
        history := &types.History{}
437✔
2118
        history.Events = []*types.HistoryEvent{}
437✔
2119
        var historyBlob []*types.DataBlob
437✔
2120

437✔
2121
        // helper function to just getHistory
437✔
2122
        getHistory := func(firstEventID, nextEventID int64, nextPageToken []byte) error {
874✔
2123
                if isRawHistoryEnabled {
439✔
2124
                        historyBlob, token.PersistenceToken, err = wh.getRawHistory(
2✔
2125
                                ctx,
2✔
2126
                                scope,
2✔
2127
                                domainID,
2✔
2128
                                domainName,
2✔
2129
                                *execution,
2✔
2130
                                firstEventID,
2✔
2131
                                nextEventID,
2✔
2132
                                getRequest.GetMaximumPageSize(),
2✔
2133
                                nextPageToken,
2✔
2134
                                token.TransientDecision,
2✔
2135
                                token.BranchToken,
2✔
2136
                        )
2✔
2137
                } else {
437✔
2138
                        history, token.PersistenceToken, err = wh.getHistory(
435✔
2139
                                ctx,
435✔
2140
                                scope,
435✔
2141
                                domainID,
435✔
2142
                                domainName,
435✔
2143
                                *execution,
435✔
2144
                                firstEventID,
435✔
2145
                                nextEventID,
435✔
2146
                                getRequest.GetMaximumPageSize(),
435✔
2147
                                nextPageToken,
435✔
2148
                                token.TransientDecision,
435✔
2149
                                token.BranchToken,
435✔
2150
                        )
435✔
2151
                }
435✔
2152
                if err != nil {
437✔
2153
                        return err
×
2154
                }
×
2155
                return nil
437✔
2156
        }
2157

2158
        if isCloseEventOnly {
455✔
2159
                if !isWorkflowRunning {
36✔
2160
                        if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil {
18✔
2161
                                return nil, err
×
2162
                        }
×
2163
                        if isRawHistoryEnabled {
18✔
2164
                                // since getHistory func will not return empty history, so the below is safe
×
2165
                                historyBlob = historyBlob[len(historyBlob)-1:]
×
2166
                        } else {
18✔
2167
                                // since getHistory func will not return empty history, so the below is safe
18✔
2168
                                history.Events = history.Events[len(history.Events)-1:]
18✔
2169
                        }
18✔
2170
                        token = nil
18✔
2171
                } else if isLongPoll {
×
2172
                        // set the persistence token to be nil so next time we will query history for updates
×
2173
                        token.PersistenceToken = nil
×
2174
                } else {
×
2175
                        token = nil
×
2176
                }
×
2177
        } else {
419✔
2178
                // return all events
419✔
2179
                if token.FirstEventID >= token.NextEventID {
419✔
2180
                        // currently there is no new event
×
2181
                        history.Events = []*types.HistoryEvent{}
×
2182
                        if !isWorkflowRunning {
×
2183
                                token = nil
×
2184
                        }
×
2185
                } else {
419✔
2186
                        if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil {
419✔
2187
                                return nil, err
×
2188
                        }
×
2189
                        // here, for long pull on history events, we need to intercept the paging token from cassandra
2190
                        // and do something clever
2191
                        if len(token.PersistenceToken) == 0 && (!token.IsWorkflowRunning || !isLongPoll) {
796✔
2192
                                // meaning, there is no more history to be returned
377✔
2193
                                token = nil
377✔
2194
                        }
377✔
2195
                }
2196
        }
2197

2198
        nextToken, err := serializeHistoryToken(token)
437✔
2199
        if err != nil {
437✔
2200
                return nil, err
×
2201
        }
×
2202
        return &types.GetWorkflowExecutionHistoryResponse{
437✔
2203
                History:       history,
437✔
2204
                RawHistory:    historyBlob,
437✔
2205
                NextPageToken: nextToken,
437✔
2206
                Archived:      false,
437✔
2207
        }, nil
437✔
2208
}
2209

2210
// SignalWorkflowExecution is used to send a signal event to running workflow execution.  This results in
2211
// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
2212
func (wh *WorkflowHandler) SignalWorkflowExecution(
2213
        ctx context.Context,
2214
        signalRequest *types.SignalWorkflowExecutionRequest,
2215
) (retError error) {
723✔
2216
        if wh.isShuttingDown() {
723✔
2217
                return validate.ErrShuttingDown
×
2218
        }
×
2219

2220
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
723✔
2221
                return err
×
2222
        }
×
2223

2224
        if signalRequest == nil {
723✔
2225
                return validate.ErrRequestNotSet
×
2226
        }
×
2227

2228
        domainName := signalRequest.GetDomain()
723✔
2229
        wfExecution := signalRequest.GetWorkflowExecution()
723✔
2230

723✔
2231
        if domainName == "" {
723✔
2232
                return validate.ErrDomainNotSet
×
2233
        }
×
2234
        if err := validate.CheckExecution(wfExecution); err != nil {
723✔
2235
                return err
×
2236
        }
×
2237

2238
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWorkflowExecutionScope, signalRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
723✔
2239
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
723✔
2240
        if !common.IsValidIDLength(
723✔
2241
                domainName,
723✔
2242
                scope,
723✔
2243
                idLengthWarnLimit,
723✔
2244
                wh.config.DomainNameMaxLength(domainName),
723✔
2245
                metrics.CadenceErrDomainNameExceededWarnLimit,
723✔
2246
                domainName,
723✔
2247
                wh.GetLogger(),
723✔
2248
                tag.IDTypeDomainName) {
723✔
2249
                return validate.ErrDomainTooLong
×
2250
        }
×
2251

2252
        if signalRequest.GetSignalName() == "" {
723✔
2253
                return validate.ErrSignalNameNotSet
×
2254
        }
×
2255

2256
        if !common.IsValidIDLength(
723✔
2257
                signalRequest.GetSignalName(),
723✔
2258
                scope,
723✔
2259
                idLengthWarnLimit,
723✔
2260
                wh.config.SignalNameMaxLength(domainName),
723✔
2261
                metrics.CadenceErrSignalNameExceededWarnLimit,
723✔
2262
                domainName,
723✔
2263
                wh.GetLogger(),
723✔
2264
                tag.IDTypeSignalName) {
723✔
2265
                return validate.ErrSignalNameTooLong
×
2266
        }
×
2267

2268
        if !common.IsValidIDLength(
723✔
2269
                signalRequest.GetRequestID(),
723✔
2270
                scope,
723✔
2271
                idLengthWarnLimit,
723✔
2272
                wh.config.RequestIDMaxLength(domainName),
723✔
2273
                metrics.CadenceErrRequestIDExceededWarnLimit,
723✔
2274
                domainName,
723✔
2275
                wh.GetLogger(),
723✔
2276
                tag.IDTypeRequestID) {
723✔
2277
                return validate.ErrRequestIDTooLong
×
2278
        }
×
2279

2280
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
723✔
2281
        if err != nil {
723✔
2282
                return err
×
2283
        }
×
2284

2285
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
723✔
2286
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
723✔
2287
        if err := common.CheckEventBlobSizeLimit(
723✔
2288
                len(signalRequest.Input),
723✔
2289
                sizeLimitWarn,
723✔
2290
                sizeLimitError,
723✔
2291
                domainID,
723✔
2292
                signalRequest.GetWorkflowExecution().GetWorkflowID(),
723✔
2293
                signalRequest.GetWorkflowExecution().GetRunID(),
723✔
2294
                scope,
723✔
2295
                wh.GetThrottledLogger(),
723✔
2296
                tag.BlobSizeViolationOperation("SignalWorkflowExecution"),
723✔
2297
        ); err != nil {
723✔
2298
                return err
×
2299
        }
×
2300

2301
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
723✔
2302
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
723✔
2303
                return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2304
        }
×
2305

2306
        err = wh.GetHistoryClient().SignalWorkflowExecution(ctx, &types.HistorySignalWorkflowExecutionRequest{
723✔
2307
                DomainUUID:    domainID,
723✔
2308
                SignalRequest: signalRequest,
723✔
2309
        })
723✔
2310
        if err != nil {
732✔
2311
                return wh.normalizeVersionedErrors(ctx, err)
9✔
2312
        }
9✔
2313

2314
        return nil
714✔
2315
}
2316

2317
// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
2318
// If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history
2319
// and a decision task being created for the execution.
2320
// If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled
2321
// event recorded in history, and a decision task being created for the execution
2322
func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(
2323
        ctx context.Context,
2324
        signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest,
2325
) (resp *types.StartWorkflowExecutionResponse, retError error) {
33✔
2326
        if wh.isShuttingDown() {
33✔
2327
                return nil, validate.ErrShuttingDown
×
2328
        }
×
2329

2330
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
33✔
2331
                return nil, err
×
2332
        }
×
2333

2334
        if signalWithStartRequest == nil {
33✔
2335
                return nil, validate.ErrRequestNotSet
×
2336
        }
×
2337

2338
        domainName := signalWithStartRequest.GetDomain()
33✔
2339
        if domainName == "" {
33✔
2340
                return nil, validate.ErrDomainNotSet
×
2341
        }
×
2342
        if signalWithStartRequest.GetWorkflowID() == "" {
33✔
2343
                return nil, validate.ErrWorkflowIDNotSet
×
2344
        }
×
2345

2346
        scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
33✔
2347
        idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
33✔
2348
        if !common.IsValidIDLength(
33✔
2349
                domainName,
33✔
2350
                scope,
33✔
2351
                idLengthWarnLimit,
33✔
2352
                wh.config.DomainNameMaxLength(domainName),
33✔
2353
                metrics.CadenceErrDomainNameExceededWarnLimit,
33✔
2354
                domainName,
33✔
2355
                wh.GetLogger(),
33✔
2356
                tag.IDTypeDomainName) {
33✔
2357
                return nil, validate.ErrDomainTooLong
×
2358
        }
×
2359

2360
        if !common.IsValidIDLength(
33✔
2361
                signalWithStartRequest.GetWorkflowID(),
33✔
2362
                scope,
33✔
2363
                idLengthWarnLimit,
33✔
2364
                wh.config.WorkflowIDMaxLength(domainName),
33✔
2365
                metrics.CadenceErrWorkflowIDExceededWarnLimit,
33✔
2366
                domainName,
33✔
2367
                wh.GetLogger(),
33✔
2368
                tag.IDTypeWorkflowID) {
33✔
2369
                return nil, validate.ErrWorkflowIDTooLong
×
2370
        }
×
2371

2372
        if signalWithStartRequest.GetSignalName() == "" {
33✔
2373
                return nil, validate.ErrSignalNameNotSet
×
2374
        }
×
2375

2376
        if !common.IsValidIDLength(
33✔
2377
                signalWithStartRequest.GetSignalName(),
33✔
2378
                scope,
33✔
2379
                idLengthWarnLimit,
33✔
2380
                wh.config.SignalNameMaxLength(domainName),
33✔
2381
                metrics.CadenceErrSignalNameExceededWarnLimit,
33✔
2382
                domainName,
33✔
2383
                wh.GetLogger(),
33✔
2384
                tag.IDTypeSignalName) {
33✔
2385
                return nil, validate.ErrSignalNameTooLong
×
2386
        }
×
2387

2388
        if signalWithStartRequest.WorkflowType == nil || signalWithStartRequest.WorkflowType.GetName() == "" {
33✔
2389
                return nil, validate.ErrWorkflowTypeNotSet
×
2390
        }
×
2391

2392
        if !common.IsValidIDLength(
33✔
2393
                signalWithStartRequest.WorkflowType.GetName(),
33✔
2394
                scope,
33✔
2395
                idLengthWarnLimit,
33✔
2396
                wh.config.WorkflowTypeMaxLength(domainName),
33✔
2397
                metrics.CadenceErrWorkflowTypeExceededWarnLimit,
33✔
2398
                domainName,
33✔
2399
                wh.GetLogger(),
33✔
2400
                tag.IDTypeWorkflowType) {
33✔
2401
                return nil, validate.ErrWorkflowTypeTooLong
×
2402
        }
×
2403

2404
        if err := wh.validateTaskList(signalWithStartRequest.TaskList, scope, domainName); err != nil {
33✔
2405
                return nil, err
×
2406
        }
×
2407

2408
        if !common.IsValidIDLength(
33✔
2409
                signalWithStartRequest.GetRequestID(),
33✔
2410
                scope,
33✔
2411
                idLengthWarnLimit,
33✔
2412
                wh.config.RequestIDMaxLength(domainName),
33✔
2413
                metrics.CadenceErrRequestIDExceededWarnLimit,
33✔
2414
                domainName,
33✔
2415
                wh.GetLogger(),
33✔
2416
                tag.IDTypeRequestID) {
33✔
2417
                return nil, validate.ErrRequestIDTooLong
×
2418
        }
×
2419

2420
        if signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
33✔
2421
                return nil, validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
×
2422
        }
×
2423

2424
        if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
33✔
2425
                return nil, validate.ErrInvalidTaskStartToCloseTimeoutSeconds
×
2426
        }
×
2427

2428
        if err := common.ValidateRetryPolicy(signalWithStartRequest.RetryPolicy); err != nil {
33✔
2429
                return nil, err
×
2430
        }
×
2431

2432
        if signalWithStartRequest.GetCronSchedule() != "" {
33✔
2433
                if _, err := backoff.ValidateSchedule(signalWithStartRequest.GetCronSchedule()); err != nil {
×
2434
                        return nil, err
×
2435
                }
×
2436
        }
2437

2438
        if err := wh.searchAttributesValidator.ValidateSearchAttributes(signalWithStartRequest.SearchAttributes, domainName); err != nil {
33✔
2439
                return nil, err
×
2440
        }
×
2441

2442
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
33✔
2443
        if err != nil {
33✔
2444
                return nil, err
×
2445
        }
×
2446

2447
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
33✔
2448
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
33✔
2449
        if err := common.CheckEventBlobSizeLimit(
33✔
2450
                len(signalWithStartRequest.SignalInput),
33✔
2451
                sizeLimitWarn,
33✔
2452
                sizeLimitError,
33✔
2453
                domainID,
33✔
2454
                signalWithStartRequest.GetWorkflowID(),
33✔
2455
                "",
33✔
2456
                scope,
33✔
2457
                wh.GetThrottledLogger(),
33✔
2458
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
33✔
2459
        ); err != nil {
33✔
2460
                return nil, err
×
2461
        }
×
2462
        actualSize := len(signalWithStartRequest.Input) + common.GetSizeOfMapStringToByteArray(signalWithStartRequest.Memo.GetFields())
33✔
2463
        if err := common.CheckEventBlobSizeLimit(
33✔
2464
                actualSize,
33✔
2465
                sizeLimitWarn,
33✔
2466
                sizeLimitError,
33✔
2467
                domainID,
33✔
2468
                signalWithStartRequest.GetWorkflowID(),
33✔
2469
                "",
33✔
2470
                scope,
33✔
2471
                wh.GetThrottledLogger(),
33✔
2472
                tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
33✔
2473
        ); err != nil {
33✔
2474
                return nil, err
×
2475
        }
×
2476

2477
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
33✔
2478
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
33✔
2479
                return nil, &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
×
2480
        }
×
2481

2482
        resp, err = wh.GetHistoryClient().SignalWithStartWorkflowExecution(ctx, &types.HistorySignalWithStartWorkflowExecutionRequest{
33✔
2483
                DomainUUID:             domainID,
33✔
2484
                SignalWithStartRequest: signalWithStartRequest,
33✔
2485
                PartitionConfig:        wh.getPartitionConfig(ctx, domainName),
33✔
2486
        })
33✔
2487
        if err != nil {
39✔
2488
                return nil, err
6✔
2489
        }
6✔
2490

2491
        return resp, nil
27✔
2492
}
2493

2494
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event
2495
// in the history and immediately terminating the execution instance.
2496
func (wh *WorkflowHandler) TerminateWorkflowExecution(
2497
        ctx context.Context,
2498
        terminateRequest *types.TerminateWorkflowExecutionRequest,
2499
) (retError error) {
48✔
2500
        if wh.isShuttingDown() {
48✔
2501
                return validate.ErrShuttingDown
×
2502
        }
×
2503

2504
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
48✔
2505
                return err
×
2506
        }
×
2507

2508
        if terminateRequest == nil {
48✔
2509
                return validate.ErrRequestNotSet
×
2510
        }
×
2511

2512
        domainName := terminateRequest.GetDomain()
48✔
2513
        wfExecution := terminateRequest.GetWorkflowExecution()
48✔
2514
        if terminateRequest.GetDomain() == "" {
48✔
2515
                return validate.ErrDomainNotSet
×
2516
        }
×
2517
        if err := validate.CheckExecution(wfExecution); err != nil {
48✔
2518
                return err
×
2519
        }
×
2520

2521
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
48✔
2522
        if err != nil {
48✔
2523
                return err
×
2524
        }
×
2525

2526
        err = wh.GetHistoryClient().TerminateWorkflowExecution(ctx, &types.HistoryTerminateWorkflowExecutionRequest{
48✔
2527
                DomainUUID:       domainID,
48✔
2528
                TerminateRequest: terminateRequest,
48✔
2529
        })
48✔
2530
        if err != nil {
48✔
2531
                return wh.normalizeVersionedErrors(ctx, err)
×
2532
        }
×
2533

2534
        return nil
48✔
2535
}
2536

2537
// ResetWorkflowExecution reset an existing workflow execution to the nextFirstEventID
2538
// in the history and immediately terminating the current execution instance.
2539
func (wh *WorkflowHandler) ResetWorkflowExecution(
2540
        ctx context.Context,
2541
        resetRequest *types.ResetWorkflowExecutionRequest,
2542
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
15✔
2543
        if wh.isShuttingDown() {
15✔
2544
                return nil, validate.ErrShuttingDown
×
2545
        }
×
2546

2547
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2548
                return nil, err
×
2549
        }
×
2550

2551
        if resetRequest == nil {
15✔
2552
                return nil, validate.ErrRequestNotSet
×
2553
        }
×
2554

2555
        domainName := resetRequest.GetDomain()
15✔
2556
        wfExecution := resetRequest.GetWorkflowExecution()
15✔
2557
        if domainName == "" {
15✔
2558
                return nil, validate.ErrDomainNotSet
×
2559
        }
×
2560
        if err := validate.CheckExecution(wfExecution); err != nil {
15✔
2561
                return nil, err
×
2562
        }
×
2563

2564
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
15✔
2565
        if err != nil {
15✔
2566
                return nil, err
×
2567
        }
×
2568

2569
        resp, err = wh.GetHistoryClient().ResetWorkflowExecution(ctx, &types.HistoryResetWorkflowExecutionRequest{
15✔
2570
                DomainUUID:   domainID,
15✔
2571
                ResetRequest: resetRequest,
15✔
2572
        })
15✔
2573
        if err != nil {
15✔
2574
                return nil, err
×
2575
        }
×
2576

2577
        return resp, nil
15✔
2578
}
2579

2580
// RequestCancelWorkflowExecution - requests to cancel a workflow execution
2581
func (wh *WorkflowHandler) RequestCancelWorkflowExecution(
2582
        ctx context.Context,
2583
        cancelRequest *types.RequestCancelWorkflowExecutionRequest,
2584
) (retError error) {
6✔
2585
        if wh.isShuttingDown() {
6✔
2586
                return validate.ErrShuttingDown
×
2587
        }
×
2588

2589
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
6✔
2590
                return err
×
2591
        }
×
2592

2593
        if cancelRequest == nil {
6✔
2594
                return validate.ErrRequestNotSet
×
2595
        }
×
2596

2597
        domainName := cancelRequest.GetDomain()
6✔
2598
        wfExecution := cancelRequest.GetWorkflowExecution()
6✔
2599
        if domainName == "" {
6✔
2600
                return validate.ErrDomainNotSet
×
2601
        }
×
2602
        if err := validate.CheckExecution(wfExecution); err != nil {
6✔
2603
                return err
×
2604
        }
×
2605

2606
        domainID, err := wh.GetDomainCache().GetDomainID(cancelRequest.GetDomain())
6✔
2607
        if err != nil {
6✔
2608
                return err
×
2609
        }
×
2610

2611
        err = wh.GetHistoryClient().RequestCancelWorkflowExecution(ctx, &types.HistoryRequestCancelWorkflowExecutionRequest{
6✔
2612
                DomainUUID:    domainID,
6✔
2613
                CancelRequest: cancelRequest,
6✔
2614
        })
6✔
2615
        if err != nil {
9✔
2616
                return wh.normalizeVersionedErrors(ctx, err)
3✔
2617
        }
3✔
2618

2619
        return nil
3✔
2620
}
2621

2622
// ListOpenWorkflowExecutions - retrieves info for open workflow executions in a domain
2623
func (wh *WorkflowHandler) ListOpenWorkflowExecutions(
2624
        ctx context.Context,
2625
        listRequest *types.ListOpenWorkflowExecutionsRequest,
2626
) (resp *types.ListOpenWorkflowExecutionsResponse, retError error) {
107✔
2627
        if wh.isShuttingDown() {
107✔
2628
                return nil, validate.ErrShuttingDown
×
2629
        }
×
2630

2631
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
107✔
2632
                return nil, err
×
2633
        }
×
2634

2635
        if listRequest == nil {
107✔
2636
                return nil, validate.ErrRequestNotSet
×
2637
        }
×
2638

2639
        if listRequest.GetDomain() == "" {
107✔
2640
                return nil, validate.ErrDomainNotSet
×
2641
        }
×
2642

2643
        if listRequest.StartTimeFilter == nil {
107✔
2644
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2645
        }
×
2646

2647
        if listRequest.StartTimeFilter.EarliestTime == nil {
107✔
2648
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2649
        }
×
2650

2651
        if listRequest.StartTimeFilter.LatestTime == nil {
107✔
2652
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2653
        }
×
2654

2655
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
107✔
2656
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2657
        }
×
2658

2659
        if listRequest.ExecutionFilter != nil && listRequest.TypeFilter != nil {
107✔
2660
                return nil, &types.BadRequestError{
×
2661
                        Message: "Only one of ExecutionFilter or TypeFilter is allowed"}
×
2662
        }
×
2663

2664
        if listRequest.GetMaximumPageSize() <= 0 {
168✔
2665
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
61✔
2666
        }
61✔
2667

2668
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
107✔
2669
                return nil, &types.BadRequestError{
×
2670
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2671
        }
×
2672

2673
        domain := listRequest.GetDomain()
107✔
2674
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
107✔
2675
        if err != nil {
107✔
2676
                return nil, err
×
2677
        }
×
2678

2679
        baseReq := persistence.ListWorkflowExecutionsRequest{
107✔
2680
                DomainUUID:    domainID,
107✔
2681
                Domain:        domain,
107✔
2682
                PageSize:      int(listRequest.GetMaximumPageSize()),
107✔
2683
                NextPageToken: listRequest.NextPageToken,
107✔
2684
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
107✔
2685
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
107✔
2686
        }
107✔
2687

107✔
2688
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
107✔
2689
        if listRequest.ExecutionFilter != nil {
208✔
2690
                if wh.config.DisableListVisibilityByFilter(domain) {
102✔
2691
                        err = validate.ErrNoPermission
1✔
2692
                } else {
101✔
2693
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByWorkflowID(
100✔
2694
                                ctx,
100✔
2695
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
100✔
2696
                                        ListWorkflowExecutionsRequest: baseReq,
100✔
2697
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
100✔
2698
                                })
100✔
2699
                }
100✔
2700
                wh.GetLogger().Debug("List open workflow with filter",
101✔
2701
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
101✔
2702
        } else if listRequest.TypeFilter != nil {
7✔
2703
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2704
                        err = validate.ErrNoPermission
1✔
2705
                } else {
1✔
2706
                        persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByType(
×
2707
                                ctx,
×
2708
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2709
                                        ListWorkflowExecutionsRequest: baseReq,
×
2710
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2711
                                },
×
2712
                        )
×
2713
                }
×
2714
                wh.GetLogger().Debug("List open workflow with filter",
1✔
2715
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2716
        } else {
5✔
2717
                persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutions(ctx, &baseReq)
5✔
2718
        }
5✔
2719

2720
        if err != nil {
109✔
2721
                return nil, err
2✔
2722
        }
2✔
2723

2724
        resp = &types.ListOpenWorkflowExecutionsResponse{}
105✔
2725
        resp.Executions = persistenceResp.Executions
105✔
2726
        resp.NextPageToken = persistenceResp.NextPageToken
105✔
2727
        return resp, nil
105✔
2728
}
2729

2730
// ListArchivedWorkflowExecutions - retrieves archived info for closed workflow executions in a domain
2731
func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(
2732
        ctx context.Context,
2733
        listRequest *types.ListArchivedWorkflowExecutionsRequest,
2734
) (resp *types.ListArchivedWorkflowExecutionsResponse, retError error) {
15✔
2735
        if wh.isShuttingDown() {
15✔
2736
                return nil, validate.ErrShuttingDown
×
2737
        }
×
2738

2739
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
15✔
2740
                return nil, err
×
2741
        }
×
2742

2743
        if listRequest == nil {
15✔
2744
                return nil, validate.ErrRequestNotSet
×
2745
        }
×
2746

2747
        if listRequest.GetDomain() == "" {
16✔
2748
                return nil, validate.ErrDomainNotSet
1✔
2749
        }
1✔
2750

2751
        if listRequest.GetPageSize() <= 0 {
14✔
2752
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
2753
        }
×
2754

2755
        maxPageSize := wh.config.VisibilityArchivalQueryMaxPageSize()
14✔
2756
        if int(listRequest.GetPageSize()) > maxPageSize {
14✔
2757
                return nil, &types.BadRequestError{
×
2758
                        Message: fmt.Sprintf("Pagesize is larger than allowed %d", maxPageSize)}
×
2759
        }
×
2760

2761
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() {
15✔
2762
                return nil, &types.BadRequestError{Message: "Cluster is not configured for visibility archival"}
1✔
2763
        }
1✔
2764

2765
        if !wh.GetArchivalMetadata().GetVisibilityConfig().ReadEnabled() {
13✔
2766
                return nil, &types.BadRequestError{Message: "Cluster is not configured for reading archived visibility records"}
×
2767
        }
×
2768

2769
        entry, err := wh.GetDomainCache().GetDomain(listRequest.GetDomain())
13✔
2770
        if err != nil {
14✔
2771
                return nil, err
1✔
2772
        }
1✔
2773

2774
        if entry.GetConfig().VisibilityArchivalStatus != types.ArchivalStatusEnabled {
14✔
2775
                return nil, &types.BadRequestError{Message: "Domain is not configured for visibility archival"}
2✔
2776
        }
2✔
2777

2778
        URI, err := archiver.NewURI(entry.GetConfig().VisibilityArchivalURI)
10✔
2779
        if err != nil {
10✔
2780
                return nil, err
×
2781
        }
×
2782

2783
        visibilityArchiver, err := wh.GetArchiverProvider().GetVisibilityArchiver(URI.Scheme(), service.Frontend)
10✔
2784
        if err != nil {
10✔
2785
                return nil, err
×
2786
        }
×
2787

2788
        archiverRequest := &archiver.QueryVisibilityRequest{
10✔
2789
                DomainID:      entry.GetInfo().ID,
10✔
2790
                PageSize:      int(listRequest.GetPageSize()),
10✔
2791
                NextPageToken: listRequest.NextPageToken,
10✔
2792
                Query:         listRequest.GetQuery(),
10✔
2793
        }
10✔
2794

10✔
2795
        archiverResponse, err := visibilityArchiver.Query(ctx, URI, archiverRequest)
10✔
2796
        if err != nil {
10✔
2797
                return nil, err
×
2798
        }
×
2799

2800
        // special handling of ExecutionTime for cron or retry
2801
        for _, execution := range archiverResponse.Executions {
25✔
2802
                if execution.GetExecutionTime() == 0 {
30✔
2803
                        execution.ExecutionTime = common.Int64Ptr(execution.GetStartTime())
15✔
2804
                }
15✔
2805
        }
2806

2807
        return &types.ListArchivedWorkflowExecutionsResponse{
10✔
2808
                Executions:    archiverResponse.Executions,
10✔
2809
                NextPageToken: archiverResponse.NextPageToken,
10✔
2810
        }, nil
10✔
2811
}
2812

2813
// ListClosedWorkflowExecutions - retrieves info for closed workflow executions in a domain
2814
func (wh *WorkflowHandler) ListClosedWorkflowExecutions(
2815
        ctx context.Context,
2816
        listRequest *types.ListClosedWorkflowExecutionsRequest,
2817
) (resp *types.ListClosedWorkflowExecutionsResponse, retError error) {
29✔
2818
        if wh.isShuttingDown() {
29✔
2819
                return nil, validate.ErrShuttingDown
×
2820
        }
×
2821

2822
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
29✔
2823
                return nil, err
×
2824
        }
×
2825

2826
        if listRequest == nil {
29✔
2827
                return nil, validate.ErrRequestNotSet
×
2828
        }
×
2829

2830
        if listRequest.GetDomain() == "" {
29✔
2831
                return nil, validate.ErrDomainNotSet
×
2832
        }
×
2833

2834
        if listRequest.StartTimeFilter == nil {
29✔
2835
                return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
×
2836
        }
×
2837

2838
        if listRequest.StartTimeFilter.EarliestTime == nil {
29✔
2839
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
×
2840
        }
×
2841

2842
        if listRequest.StartTimeFilter.LatestTime == nil {
29✔
2843
                return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
×
2844
        }
×
2845

2846
        if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
29✔
2847
                return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
×
2848
        }
×
2849

2850
        filterCount := 0
29✔
2851
        if listRequest.TypeFilter != nil {
30✔
2852
                filterCount++
1✔
2853
        }
1✔
2854
        if listRequest.StatusFilter != nil {
30✔
2855
                filterCount++
1✔
2856
        }
1✔
2857

2858
        if filterCount > 1 {
29✔
2859
                return nil, &types.BadRequestError{
×
2860
                        Message: "Only one of ExecutionFilter, TypeFilter or StatusFilter is allowed"}
×
2861
        } // If ExecutionFilter is provided with one of TypeFilter or StatusFilter, use ExecutionFilter and ignore other filter
×
2862

2863
        if listRequest.GetMaximumPageSize() <= 0 {
30✔
2864
                listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
1✔
2865
        }
1✔
2866

2867
        if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
29✔
2868
                return nil, &types.BadRequestError{
×
2869
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2870
        }
×
2871

2872
        domain := listRequest.GetDomain()
29✔
2873
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
29✔
2874
        if err != nil {
29✔
2875
                return nil, err
×
2876
        }
×
2877

2878
        baseReq := persistence.ListWorkflowExecutionsRequest{
29✔
2879
                DomainUUID:    domainID,
29✔
2880
                Domain:        domain,
29✔
2881
                PageSize:      int(listRequest.GetMaximumPageSize()),
29✔
2882
                NextPageToken: listRequest.NextPageToken,
29✔
2883
                EarliestTime:  listRequest.StartTimeFilter.GetEarliestTime(),
29✔
2884
                LatestTime:    listRequest.StartTimeFilter.GetLatestTime(),
29✔
2885
        }
29✔
2886

29✔
2887
        var persistenceResp *persistence.ListWorkflowExecutionsResponse
29✔
2888
        if listRequest.ExecutionFilter != nil {
45✔
2889
                if wh.config.DisableListVisibilityByFilter(domain) {
17✔
2890
                        err = validate.ErrNoPermission
1✔
2891
                } else {
16✔
2892
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByWorkflowID(
15✔
2893
                                ctx,
15✔
2894
                                &persistence.ListWorkflowExecutionsByWorkflowIDRequest{
15✔
2895
                                        ListWorkflowExecutionsRequest: baseReq,
15✔
2896
                                        WorkflowID:                    listRequest.ExecutionFilter.GetWorkflowID(),
15✔
2897
                                },
15✔
2898
                        )
15✔
2899
                }
15✔
2900
                wh.GetLogger().Debug("List closed workflow with filter",
16✔
2901
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
16✔
2902
        } else if listRequest.TypeFilter != nil {
14✔
2903
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2904
                        err = validate.ErrNoPermission
1✔
2905
                } else {
1✔
2906
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByType(
×
2907
                                ctx,
×
2908
                                &persistence.ListWorkflowExecutionsByTypeRequest{
×
2909
                                        ListWorkflowExecutionsRequest: baseReq,
×
2910
                                        WorkflowTypeName:              listRequest.TypeFilter.GetName(),
×
2911
                                },
×
2912
                        )
×
2913
                }
×
2914
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
2915
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
1✔
2916
        } else if listRequest.StatusFilter != nil {
13✔
2917
                if wh.config.DisableListVisibilityByFilter(domain) {
2✔
2918
                        err = validate.ErrNoPermission
1✔
2919
                } else {
1✔
2920
                        persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByStatus(
×
2921
                                ctx,
×
2922
                                &persistence.ListClosedWorkflowExecutionsByStatusRequest{
×
2923
                                        ListWorkflowExecutionsRequest: baseReq,
×
2924
                                        Status:                        listRequest.GetStatusFilter(),
×
2925
                                },
×
2926
                        )
×
2927
                }
×
2928
                wh.GetLogger().Debug("List closed workflow with filter",
1✔
2929
                        tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByStatus)
1✔
2930
        } else {
11✔
2931
                persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutions(ctx, &baseReq)
11✔
2932
        }
11✔
2933

2934
        if err != nil {
32✔
2935
                return nil, err
3✔
2936
        }
3✔
2937

2938
        resp = &types.ListClosedWorkflowExecutionsResponse{}
26✔
2939
        resp.Executions = persistenceResp.Executions
26✔
2940
        resp.NextPageToken = persistenceResp.NextPageToken
26✔
2941
        return resp, nil
26✔
2942
}
2943

2944
// ListWorkflowExecutions - retrieves info for workflow executions in a domain
2945
func (wh *WorkflowHandler) ListWorkflowExecutions(
2946
        ctx context.Context,
2947
        listRequest *types.ListWorkflowExecutionsRequest,
2948
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
145✔
2949
        if wh.isShuttingDown() {
145✔
2950
                return nil, validate.ErrShuttingDown
×
2951
        }
×
2952

2953
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
145✔
2954
                return nil, err
×
2955
        }
×
2956

2957
        if listRequest == nil {
145✔
2958
                return nil, validate.ErrRequestNotSet
×
2959
        }
×
2960

2961
        if listRequest.GetDomain() == "" {
145✔
2962
                return nil, validate.ErrDomainNotSet
×
2963
        }
×
2964

2965
        if listRequest.GetPageSize() <= 0 {
145✔
2966
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
2967
        }
×
2968

2969
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
145✔
2970
                return nil, &types.BadRequestError{
×
2971
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
2972
        }
×
2973

2974
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
145✔
2975
        if err != nil {
148✔
2976
                return nil, err
3✔
2977
        }
3✔
2978

2979
        domain := listRequest.GetDomain()
142✔
2980
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
142✔
2981
        if err != nil {
142✔
2982
                return nil, err
×
2983
        }
×
2984

2985
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
142✔
2986
                DomainUUID:    domainID,
142✔
2987
                Domain:        domain,
142✔
2988
                PageSize:      int(listRequest.GetPageSize()),
142✔
2989
                NextPageToken: listRequest.NextPageToken,
142✔
2990
                Query:         validatedQuery,
142✔
2991
        }
142✔
2992
        persistenceResp, err := wh.GetVisibilityManager().ListWorkflowExecutions(ctx, req)
142✔
2993
        if err != nil {
142✔
2994
                return nil, err
×
2995
        }
×
2996

2997
        resp = &types.ListWorkflowExecutionsResponse{}
142✔
2998
        resp.Executions = persistenceResp.Executions
142✔
2999
        resp.NextPageToken = persistenceResp.NextPageToken
142✔
3000
        return resp, nil
142✔
3001
}
3002

3003
// RestartWorkflowExecution - retrieves info for an existing workflow then restarts it
3004
func (wh *WorkflowHandler) RestartWorkflowExecution(ctx context.Context, request *types.RestartWorkflowExecutionRequest) (resp *types.RestartWorkflowExecutionResponse, retError error) {
2✔
3005
        if wh.isShuttingDown() {
2✔
3006
                return nil, validate.ErrShuttingDown
×
3007
        }
×
3008

3009
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
2✔
3010
                return nil, err
×
3011
        }
×
3012

3013
        if request == nil {
2✔
3014
                return nil, validate.ErrRequestNotSet
×
3015
        }
×
3016

3017
        domainName := request.GetDomain()
2✔
3018
        wfExecution := request.GetWorkflowExecution()
2✔
3019

2✔
3020
        if request.GetDomain() == "" {
2✔
3021
                return nil, validate.ErrDomainNotSet
×
3022
        }
×
3023

3024
        if err := validate.CheckExecution(wfExecution); err != nil {
2✔
3025
                return nil, err
×
3026
        }
×
3027

3028
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
2✔
3029
        if err != nil {
2✔
3030
                return nil, err
×
3031
        }
×
3032

3033
        isolationGroup := wh.getIsolationGroup(ctx, domainName)
2✔
3034
        if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
3✔
3035
                return nil, &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
1✔
3036
        }
1✔
3037

3038
        history, err := wh.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
1✔
3039
                Domain: domainName,
1✔
3040
                Execution: &types.WorkflowExecution{
1✔
3041
                        WorkflowID: wfExecution.WorkflowID,
1✔
3042
                        RunID:      wfExecution.RunID,
1✔
3043
                },
1✔
3044
                SkipArchival: true,
1✔
3045
        })
1✔
3046
        if err != nil {
1✔
3047
                return nil, validate.ErrHistoryNotFound
×
3048
        }
×
3049
        startRequest := constructRestartWorkflowRequest(history.History.Events[0].WorkflowExecutionStartedEventAttributes,
1✔
3050
                domainName, request.Identity, wfExecution.WorkflowID)
1✔
3051
        req, err := common.CreateHistoryStartWorkflowRequest(domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
1✔
3052
        if err != nil {
1✔
3053
                return nil, err
×
3054
        }
×
3055
        startResp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, req)
1✔
3056
        if err != nil {
1✔
3057
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3058
        }
×
3059
        resp = &types.RestartWorkflowExecutionResponse{
1✔
3060
                RunID: startResp.RunID,
1✔
3061
        }
1✔
3062

1✔
3063
        return resp, nil
1✔
3064
}
3065

3066
// ScanWorkflowExecutions - retrieves info for large amount of workflow executions in a domain without order
3067
func (wh *WorkflowHandler) ScanWorkflowExecutions(
3068
        ctx context.Context,
3069
        listRequest *types.ListWorkflowExecutionsRequest,
3070
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
26✔
3071
        if wh.isShuttingDown() {
26✔
3072
                return nil, validate.ErrShuttingDown
×
3073
        }
×
3074

3075
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
26✔
3076
                return nil, err
×
3077
        }
×
3078

3079
        if listRequest == nil {
26✔
3080
                return nil, validate.ErrRequestNotSet
×
3081
        }
×
3082

3083
        if listRequest.GetDomain() == "" {
26✔
3084
                return nil, validate.ErrDomainNotSet
×
3085
        }
×
3086

3087
        if listRequest.GetPageSize() <= 0 {
26✔
3088
                listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
×
3089
        }
×
3090

3091
        if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
26✔
3092
                return nil, &types.BadRequestError{
×
3093
                        Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
×
3094
        }
×
3095

3096
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
26✔
3097
        if err != nil {
27✔
3098
                return nil, err
1✔
3099
        }
1✔
3100

3101
        domain := listRequest.GetDomain()
25✔
3102
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
25✔
3103
        if err != nil {
25✔
3104
                return nil, err
×
3105
        }
×
3106

3107
        req := &persistence.ListWorkflowExecutionsByQueryRequest{
25✔
3108
                DomainUUID:    domainID,
25✔
3109
                Domain:        domain,
25✔
3110
                PageSize:      int(listRequest.GetPageSize()),
25✔
3111
                NextPageToken: listRequest.NextPageToken,
25✔
3112
                Query:         validatedQuery,
25✔
3113
        }
25✔
3114
        persistenceResp, err := wh.GetVisibilityManager().ScanWorkflowExecutions(ctx, req)
25✔
3115
        if err != nil {
25✔
3116
                return nil, err
×
3117
        }
×
3118

3119
        resp = &types.ListWorkflowExecutionsResponse{}
25✔
3120
        resp.Executions = persistenceResp.Executions
25✔
3121
        resp.NextPageToken = persistenceResp.NextPageToken
25✔
3122
        return resp, nil
25✔
3123
}
3124

3125
// CountWorkflowExecutions - count number of workflow executions in a domain
3126
func (wh *WorkflowHandler) CountWorkflowExecutions(
3127
        ctx context.Context,
3128
        countRequest *types.CountWorkflowExecutionsRequest,
3129
) (resp *types.CountWorkflowExecutionsResponse, retError error) {
14✔
3130
        if wh.isShuttingDown() {
14✔
3131
                return nil, validate.ErrShuttingDown
×
3132
        }
×
3133

3134
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
14✔
3135
                return nil, err
×
3136
        }
×
3137

3138
        if countRequest == nil {
14✔
3139
                return nil, validate.ErrRequestNotSet
×
3140
        }
×
3141

3142
        if countRequest.GetDomain() == "" {
14✔
3143
                return nil, validate.ErrDomainNotSet
×
3144
        }
×
3145

3146
        validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(countRequest.GetQuery())
14✔
3147
        if err != nil {
15✔
3148
                return nil, err
1✔
3149
        }
1✔
3150

3151
        domain := countRequest.GetDomain()
13✔
3152
        domainID, err := wh.GetDomainCache().GetDomainID(domain)
13✔
3153
        if err != nil {
13✔
3154
                return nil, err
×
3155
        }
×
3156

3157
        req := &persistence.CountWorkflowExecutionsRequest{
13✔
3158
                DomainUUID: domainID,
13✔
3159
                Domain:     domain,
13✔
3160
                Query:      validatedQuery,
13✔
3161
        }
13✔
3162
        persistenceResp, err := wh.GetVisibilityManager().CountWorkflowExecutions(ctx, req)
13✔
3163
        if err != nil {
13✔
3164
                return nil, err
×
3165
        }
×
3166

3167
        resp = &types.CountWorkflowExecutionsResponse{
13✔
3168
                Count: persistenceResp.Count,
13✔
3169
        }
13✔
3170
        return resp, nil
13✔
3171
}
3172

3173
// GetSearchAttributes return valid indexed keys
3174
func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context) (resp *types.GetSearchAttributesResponse, retError error) {
1✔
3175
        if wh.isShuttingDown() {
1✔
3176
                return nil, validate.ErrShuttingDown
×
3177
        }
×
3178

3179
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
1✔
3180
                return nil, err
×
3181
        }
×
3182

3183
        keys := wh.config.ValidSearchAttributes()
1✔
3184
        resp = &types.GetSearchAttributesResponse{
1✔
3185
                Keys: wh.convertIndexedKeyToThrift(keys),
1✔
3186
        }
1✔
3187
        return resp, nil
1✔
3188
}
3189

3190
// ResetStickyTaskList reset the volatile information in mutable state of a given workflow.
3191
func (wh *WorkflowHandler) ResetStickyTaskList(
3192
        ctx context.Context,
3193
        resetRequest *types.ResetStickyTaskListRequest,
3194
) (resp *types.ResetStickyTaskListResponse, retError error) {
3✔
3195
        if wh.isShuttingDown() {
3✔
3196
                return nil, validate.ErrShuttingDown
×
3197
        }
×
3198

3199
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
3✔
3200
                return nil, err
×
3201
        }
×
3202

3203
        if resetRequest == nil {
3✔
3204
                return nil, validate.ErrRequestNotSet
×
3205
        }
×
3206

3207
        domainName := resetRequest.GetDomain()
3✔
3208
        wfExecution := resetRequest.GetExecution()
3✔
3209

3✔
3210
        if domainName == "" {
3✔
3211
                return nil, validate.ErrDomainNotSet
×
3212
        }
×
3213

3214
        if err := validate.CheckExecution(wfExecution); err != nil {
3✔
3215
                return nil, err
×
3216
        }
×
3217

3218
        domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
3✔
3219
        if err != nil {
3✔
3220
                return nil, err
×
3221
        }
×
3222

3223
        _, err = wh.GetHistoryClient().ResetStickyTaskList(ctx, &types.HistoryResetStickyTaskListRequest{
3✔
3224
                DomainUUID: domainID,
3✔
3225
                Execution:  resetRequest.Execution,
3✔
3226
        })
3✔
3227
        if err != nil {
3✔
3228
                return nil, wh.normalizeVersionedErrors(ctx, err)
×
3229
        }
×
3230
        return &types.ResetStickyTaskListResponse{}, nil
3✔
3231
}
3232

3233
// QueryWorkflow returns query result for a specified workflow execution
3234
func (wh *WorkflowHandler) QueryWorkflow(
3235
        ctx context.Context,
3236
        queryRequest *types.QueryWorkflowRequest,
3237
) (resp *types.QueryWorkflowResponse, retError error) {
45✔
3238
        if wh.isShuttingDown() {
45✔
3239
                return nil, validate.ErrShuttingDown
×
3240
        }
×
3241

3242
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
45✔
3243
                return nil, err
×
3244
        }
×
3245

3246
        if queryRequest == nil {
45✔
3247
                return nil, validate.ErrRequestNotSet
×
3248
        }
×
3249

3250
        domainName := queryRequest.GetDomain()
45✔
3251
        wfExecution := queryRequest.GetExecution()
45✔
3252

45✔
3253
        if domainName == "" {
45✔
3254
                return nil, validate.ErrDomainNotSet
×
3255
        }
×
3256

3257
        if err := validate.CheckExecution(wfExecution); err != nil {
45✔
3258
                return nil, err
×
3259
        }
×
3260

3261
        if wh.config.DisallowQuery(domainName) {
45✔
3262
                return nil, validate.ErrQueryDisallowedForDomain
×
3263
        }
×
3264

3265
        if queryRequest.Query == nil {
45✔
3266
                return nil, validate.ErrQueryNotSet
×
3267
        }
×
3268

3269
        if queryRequest.Query.GetQueryType() == "" {
45✔
3270
                return nil, validate.ErrQueryTypeNotSet
×
3271
        }
×
3272

3273
        domainID, err := wh.GetDomainCache().GetDomainID(domainName)
45✔
3274
        if err != nil {
45✔
3275
                return nil, err
×
3276
        }
×
3277

3278
        sizeLimitError := wh.config.BlobSizeLimitError(domainName)
45✔
3279
        sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
45✔
3280

45✔
3281
        scope := getMetricsScopeWithDomain(metrics.FrontendQueryWorkflowScope, queryRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
45✔
3282
        if err := common.CheckEventBlobSizeLimit(
45✔
3283
                len(queryRequest.GetQuery().GetQueryArgs()),
45✔
3284
                sizeLimitWarn,
45✔
3285
                sizeLimitError,
45✔
3286
                domainID,
45✔
3287
                queryRequest.GetExecution().GetWorkflowID(),
45✔
3288
                queryRequest.GetExecution().GetRunID(),
45✔
3289
                scope,
45✔
3290
                wh.GetThrottledLogger(),
45✔
3291
                tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil {
45✔
3292
                return nil, err
×
3293
        }
×
3294

3295
        req := &types.HistoryQueryWorkflowRequest{
45✔
3296
                DomainUUID: domainID,
45✔
3297
                Request:    queryRequest,
45✔
3298
        }
45✔
3299
        hResponse, err := wh.GetHistoryClient().QueryWorkflow(ctx, req)
45✔
3300
        if err != nil {
57✔
3301
                return nil, err
12✔
3302
        }
12✔
3303
        return hResponse.GetResponse(), nil
33✔
3304
}
3305

3306
// DescribeWorkflowExecution returns information about the specified workflow execution.
3307
func (wh *WorkflowHandler) DescribeWorkflowExecution(
3308
        ctx context.Context,
3309
        request *types.DescribeWorkflowExecutionRequest,
3310
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
93✔
3311
        if wh.isShuttingDown() {
93✔
3312
                return nil, validate.ErrShuttingDown
×
3313
        }
×
3314

3315
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
93✔
3316
                return nil, err
×
3317
        }
×
3318

3319
        if request == nil {
93✔
3320
                return nil, validate.ErrRequestNotSet
×
3321
        }
×
3322

3323
        domainName := request.GetDomain()
93✔
3324
        wfExecution := request.GetExecution()
93✔
3325
        if domainName == "" {
93✔
3326
                return nil, validate.ErrDomainNotSet
×
3327
        }
×
3328

3329
        if err := validate.CheckExecution(wfExecution); err != nil {
93✔
3330
                return nil, err
×
3331
        }
×
3332

3333
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
93✔
3334
        if err != nil {
93✔
3335
                return nil, err
×
3336
        }
×
3337

3338
        response, err := wh.GetHistoryClient().DescribeWorkflowExecution(ctx, &types.HistoryDescribeWorkflowExecutionRequest{
93✔
3339
                DomainUUID: domainID,
93✔
3340
                Request:    request,
93✔
3341
        })
93✔
3342

93✔
3343
        if err != nil {
93✔
3344
                return nil, err
×
3345
        }
×
3346

3347
        return response, nil
93✔
3348
}
3349

3350
// DescribeTaskList returns information about the target tasklist, right now this API returns the
3351
// pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true,
3352
// it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
3353
func (wh *WorkflowHandler) DescribeTaskList(
3354
        ctx context.Context,
3355
        request *types.DescribeTaskListRequest,
3356
) (resp *types.DescribeTaskListResponse, retError error) {
18✔
3357
        if wh.isShuttingDown() {
18✔
3358
                return nil, validate.ErrShuttingDown
×
3359
        }
×
3360

3361
        if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
18✔
3362
                return nil, err
×
3363
        }
×
3364

3365
        if request == nil {
18✔
3366
                return nil, validate.ErrRequestNotSet
×
3367
        }
×
3368

3369
        if request.GetDomain() == "" {
18✔
3370
                return nil, validate.ErrDomainNotSet
×
3371
        }
×
3372

3373
        domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
18✔
3374
        if err != nil {
18✔
3375
                return nil, err
×
3376
        }
×
3377

3378
        scope := getMetricsScopeWithDomain(metrics.FrontendDescribeTaskListScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
18✔
3379
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
18✔
3380
                return nil, err
×
3381
        }
×
3382

3383
        if request.TaskListType == nil {
18✔
3384
                return nil, validate.ErrTaskListTypeNotSet
×
3385
        }
×
3386

3387
        response, err := wh.GetMatchingClient().DescribeTaskList(ctx, &types.MatchingDescribeTaskListRequest{
18✔
3388
                DomainUUID:  domainID,
18✔
3389
                DescRequest: request,
18✔
3390
        })
18✔
3391
        if err != nil {
18✔
3392
                return nil, err
×
3393
        }
×
3394

3395
        return response, nil
18✔
3396
}
3397

3398
// ListTaskListPartitions returns all the partition and host for a taskList
3399
func (wh *WorkflowHandler) ListTaskListPartitions(
3400
        ctx context.Context,
3401
        request *types.ListTaskListPartitionsRequest,
3402
) (resp *types.ListTaskListPartitionsResponse, retError error) {
×
3403
        if wh.isShuttingDown() {
×
3404
                return nil, validate.ErrShuttingDown
×
3405
        }
×
3406

3407
        if request == nil {
×
3408
                return nil, validate.ErrRequestNotSet
×
3409
        }
×
3410

3411
        if request.GetDomain() == "" {
×
3412
                return nil, validate.ErrDomainNotSet
×
3413
        }
×
3414

3415
        scope := getMetricsScopeWithDomain(metrics.FrontendListTaskListPartitionsScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
×
3416
        if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
×
3417
                return nil, err
×
3418
        }
×
3419

3420
        resp, err := wh.GetMatchingClient().ListTaskListPartitions(ctx, &types.MatchingListTaskListPartitionsRequest{
×
3421
                Domain:   request.Domain,
×
3422
                TaskList: request.TaskList,
×
3423
        })
×
3424
        return resp, err
×
3425
}
3426

3427
// GetTaskListsByDomain returns all the partition and host for a taskList
3428
func (wh *WorkflowHandler) GetTaskListsByDomain(
3429
        ctx context.Context,
3430
        request *types.GetTaskListsByDomainRequest,
3431
) (resp *types.GetTaskListsByDomainResponse, retError error) {
×
3432
        if wh.isShuttingDown() {
×
3433
                return nil, validate.ErrShuttingDown
×
3434
        }
×
3435

3436
        if request == nil {
×
3437
                return nil, validate.ErrRequestNotSet
×
3438
        }
×
3439

3440
        if request.GetDomain() == "" {
×
3441
                return nil, validate.ErrDomainNotSet
×
3442
        }
×
3443

3444
        resp, err := wh.GetMatchingClient().GetTaskListsByDomain(ctx, &types.GetTaskListsByDomainRequest{
×
3445
                Domain: request.Domain,
×
3446
        })
×
3447
        return resp, err
×
3448
}
3449

3450
// RefreshWorkflowTasks re-generates the workflow tasks
3451
func (wh *WorkflowHandler) RefreshWorkflowTasks(
3452
        ctx context.Context,
3453
        request *types.RefreshWorkflowTasksRequest,
3454
) (err error) {
×
3455
        if request == nil {
×
3456
                return validate.ErrRequestNotSet
×
3457
        }
×
3458
        if err := validate.CheckExecution(request.Execution); err != nil {
×
3459
                return err
×
3460
        }
×
3461
        domainEntry, err := wh.GetDomainCache().GetDomain(request.GetDomain())
×
3462
        if err != nil {
×
3463
                return err
×
3464
        }
×
3465

3466
        err = wh.GetHistoryClient().RefreshWorkflowTasks(ctx, &types.HistoryRefreshWorkflowTasksRequest{
×
3467
                DomainUIID: domainEntry.GetInfo().ID,
×
3468
                Request:    request,
×
3469
        })
×
3470
        if err != nil {
×
3471
                return err
×
3472
        }
×
3473
        return nil
×
3474
}
3475

3476
func (wh *WorkflowHandler) getRawHistory(
3477
        ctx context.Context,
3478
        scope metrics.Scope,
3479
        domainID string,
3480
        domainName string,
3481
        execution types.WorkflowExecution,
3482
        firstEventID int64,
3483
        nextEventID int64,
3484
        pageSize int32,
3485
        nextPageToken []byte,
3486
        transientDecision *types.TransientDecisionInfo,
3487
        branchToken []byte,
3488
) ([]*types.DataBlob, []byte, error) {
2✔
3489
        rawHistory := []*types.DataBlob{}
2✔
3490
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
2✔
3491

2✔
3492
        resp, err := wh.GetHistoryManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
2✔
3493
                BranchToken:   branchToken,
2✔
3494
                MinEventID:    firstEventID,
2✔
3495
                MaxEventID:    nextEventID,
2✔
3496
                PageSize:      int(pageSize),
2✔
3497
                NextPageToken: nextPageToken,
2✔
3498
                ShardID:       common.IntPtr(shardID),
2✔
3499
                DomainName:    domainName,
2✔
3500
        })
2✔
3501
        if err != nil {
2✔
3502
                return nil, nil, err
×
3503
        }
×
3504

3505
        var encoding *types.EncodingType
2✔
3506
        for _, data := range resp.HistoryEventBlobs {
4✔
3507
                switch data.Encoding {
2✔
3508
                case common.EncodingTypeJSON:
×
3509
                        encoding = types.EncodingTypeJSON.Ptr()
×
3510
                case common.EncodingTypeThriftRW:
2✔
3511
                        encoding = types.EncodingTypeThriftRW.Ptr()
2✔
3512
                default:
×
3513
                        panic(fmt.Sprintf("Invalid encoding type for raw history, encoding type: %s", data.Encoding))
×
3514
                }
3515
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3516
                        EncodingType: encoding,
2✔
3517
                        Data:         data.Data,
2✔
3518
                })
2✔
3519
        }
3520

3521
        if len(resp.NextPageToken) == 0 && transientDecision != nil {
4✔
3522
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
2✔
3523
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3524
                        wh.GetLogger().Error("getHistory error",
×
3525
                                tag.WorkflowDomainID(domainID),
×
3526
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3527
                                tag.WorkflowRunID(execution.GetRunID()),
×
3528
                                tag.Error(err))
×
3529
                }
×
3530
                blob, err := wh.GetPayloadSerializer().SerializeBatchEvents(
2✔
3531
                        []*types.HistoryEvent{transientDecision.ScheduledEvent, transientDecision.StartedEvent}, common.EncodingTypeThriftRW)
2✔
3532
                if err != nil {
2✔
3533
                        return nil, nil, err
×
3534
                }
×
3535
                rawHistory = append(rawHistory, &types.DataBlob{
2✔
3536
                        EncodingType: types.EncodingTypeThriftRW.Ptr(),
2✔
3537
                        Data:         blob.Data,
2✔
3538
                })
2✔
3539
        }
3540

3541
        return rawHistory, resp.NextPageToken, nil
2✔
3542
}
3543

3544
func (wh *WorkflowHandler) getHistory(
3545
        ctx context.Context,
3546
        scope metrics.Scope,
3547
        domainID string,
3548
        domainName string,
3549
        execution types.WorkflowExecution,
3550
        firstEventID, nextEventID int64,
3551
        pageSize int32,
3552
        nextPageToken []byte,
3553
        transientDecision *types.TransientDecisionInfo,
3554
        branchToken []byte,
3555
) (*types.History, []byte, error) {
1,592✔
3556

1,592✔
3557
        var size int
1,592✔
3558

1,592✔
3559
        isFirstPage := len(nextPageToken) == 0
1,592✔
3560
        shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
1,592✔
3561
        var err error
1,592✔
3562
        historyEvents, size, nextPageToken, err := persistenceutils.ReadFullPageV2Events(ctx, wh.GetHistoryManager(), &persistence.ReadHistoryBranchRequest{
1,592✔
3563
                BranchToken:   branchToken,
1,592✔
3564
                MinEventID:    firstEventID,
1,592✔
3565
                MaxEventID:    nextEventID,
1,592✔
3566
                PageSize:      int(pageSize),
1,592✔
3567
                NextPageToken: nextPageToken,
1,592✔
3568
                ShardID:       common.IntPtr(shardID),
1,592✔
3569
                DomainName:    domainName,
1,592✔
3570
        })
1,592✔
3571

1,592✔
3572
        if err != nil {
1,592✔
3573
                return nil, nil, err
×
3574
        }
×
3575

3576
        scope.RecordTimer(metrics.HistorySize, time.Duration(size))
1,592✔
3577

1,592✔
3578
        isLastPage := len(nextPageToken) == 0
1,592✔
3579
        if err := verifyHistoryIsComplete(
1,592✔
3580
                historyEvents,
1,592✔
3581
                firstEventID,
1,592✔
3582
                nextEventID-1,
1,592✔
3583
                isFirstPage,
1,592✔
3584
                isLastPage,
1,592✔
3585
                int(pageSize)); err != nil {
1,592✔
3586
                scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3587
                wh.GetLogger().Error("getHistory: incomplete history",
×
3588
                        tag.WorkflowDomainID(domainID),
×
3589
                        tag.WorkflowID(execution.GetWorkflowID()),
×
3590
                        tag.WorkflowRunID(execution.GetRunID()),
×
3591
                        tag.Error(err))
×
3592
                return nil, nil, err
×
3593
        }
×
3594

3595
        if len(nextPageToken) == 0 && transientDecision != nil {
1,763✔
3596
                if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
171✔
3597
                        scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
×
3598
                        wh.GetLogger().Error("getHistory error",
×
3599
                                tag.WorkflowDomainID(domainID),
×
3600
                                tag.WorkflowID(execution.GetWorkflowID()),
×
3601
                                tag.WorkflowRunID(execution.GetRunID()),
×
3602
                                tag.Error(err))
×
3603
                }
×
3604
                // Append the transient decision events once we are done enumerating everything from the events table
3605
                historyEvents = append(historyEvents, transientDecision.ScheduledEvent, transientDecision.StartedEvent)
171✔
3606
        }
3607

3608
        executionHistory := &types.History{}
1,592✔
3609
        executionHistory.Events = historyEvents
1,592✔
3610
        return executionHistory, nextPageToken, nil
1,592✔
3611
}
3612

3613
func (wh *WorkflowHandler) validateTransientDecisionEvents(
3614
        expectedNextEventID int64,
3615
        decision *types.TransientDecisionInfo,
3616
) error {
173✔
3617

173✔
3618
        if decision.ScheduledEvent.ID == expectedNextEventID &&
173✔
3619
                decision.StartedEvent.ID == expectedNextEventID+1 {
346✔
3620
                return nil
173✔
3621
        }
173✔
3622

3623
        return fmt.Errorf(
×
3624
                "invalid transient decision: "+
×
3625
                        "expectedScheduledEventID=%v expectedStartedEventID=%v but have scheduledEventID=%v startedEventID=%v",
×
3626
                expectedNextEventID,
×
3627
                expectedNextEventID+1,
×
3628
                decision.ScheduledEvent.ID,
×
3629
                decision.StartedEvent.ID)
×
3630
}
3631

3632
func (wh *WorkflowHandler) validateTaskList(t *types.TaskList, scope metrics.Scope, domain string) error {
2,696✔
3633
        if t == nil || t.GetName() == "" {
2,697✔
3634
                return validate.ErrTaskListNotSet
1✔
3635
        }
1✔
3636

3637
        if !common.IsValidIDLength(
2,695✔
3638
                t.GetName(),
2,695✔
3639
                scope,
2,695✔
3640
                wh.config.MaxIDLengthWarnLimit(),
2,695✔
3641
                wh.config.TaskListNameMaxLength(domain),
2,695✔
3642
                metrics.CadenceErrTaskListNameExceededWarnLimit,
2,695✔
3643
                domain,
2,695✔
3644
                wh.GetLogger(),
2,695✔
3645
                tag.IDTypeTaskListName) {
2,695✔
3646
                return validate.ErrTaskListTooLong
×
3647
        }
×
3648
        return nil
2,695✔
3649
}
3650

3651
func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
3652
        ctx context.Context,
3653
        scope metrics.Scope,
3654
        domainID string,
3655
        matchingResp *types.MatchingPollForDecisionTaskResponse,
3656
        branchToken []byte,
3657
) (*types.PollForDecisionTaskResponse, error) {
1,210✔
3658

1,210✔
3659
        if matchingResp.WorkflowExecution == nil {
1,258✔
3660
                // this will happen if there is no decision task to be send to worker / caller
48✔
3661
                return &types.PollForDecisionTaskResponse{}, nil
48✔
3662
        }
48✔
3663

3664
        var history *types.History
1,162✔
3665
        var continuation []byte
1,162✔
3666
        var err error
1,162✔
3667

1,162✔
3668
        if matchingResp.GetStickyExecutionEnabled() && matchingResp.Query != nil {
1,168✔
3669
                // meaning sticky query, we should not return any events to worker
6✔
3670
                // since query task only check the current status
6✔
3671
                history = &types.History{
6✔
3672
                        Events: []*types.HistoryEvent{},
6✔
3673
                }
6✔
3674
        } else {
1,162✔
3675
                // here we have 3 cases:
1,156✔
3676
                // 1. sticky && non query task
1,156✔
3677
                // 2. non sticky &&  non query task
1,156✔
3678
                // 3. non sticky && query task
1,156✔
3679
                // for 1, partial history have to be send back
1,156✔
3680
                // for 2 and 3, full history have to be send back
1,156✔
3681

1,156✔
3682
                var persistenceToken []byte
1,156✔
3683

1,156✔
3684
                firstEventID := common.FirstEventID
1,156✔
3685
                nextEventID := matchingResp.GetNextEventID()
1,156✔
3686
                if matchingResp.GetStickyExecutionEnabled() {
1,252✔
3687
                        firstEventID = matchingResp.GetPreviousStartedEventID() + 1
96✔
3688
                }
96✔
3689
                domainName, dErr := wh.GetDomainCache().GetDomainName(domainID)
1,156✔
3690
                if dErr != nil {
1,156✔
3691
                        return nil, dErr
×
3692
                }
×
3693
                scope = scope.Tagged(metrics.DomainTag(domainName))
1,156✔
3694
                history, persistenceToken, err = wh.getHistory(
1,156✔
3695
                        ctx,
1,156✔
3696
                        scope,
1,156✔
3697
                        domainID,
1,156✔
3698
                        domainName,
1,156✔
3699
                        *matchingResp.WorkflowExecution,
1,156✔
3700
                        firstEventID,
1,156✔
3701
                        nextEventID,
1,156✔
3702
                        int32(wh.config.HistoryMaxPageSize(domainName)),
1,156✔
3703
                        nil,
1,156✔
3704
                        matchingResp.DecisionInfo,
1,156✔
3705
                        branchToken,
1,156✔
3706
                )
1,156✔
3707
                if err != nil {
1,156✔
3708
                        return nil, err
×
3709
                }
×
3710

3711
                if len(persistenceToken) != 0 {
1,156✔
3712
                        continuation, err = serializeHistoryToken(&getHistoryContinuationToken{
×
3713
                                RunID:             matchingResp.WorkflowExecution.GetRunID(),
×
3714
                                FirstEventID:      firstEventID,
×
3715
                                NextEventID:       nextEventID,
×
3716
                                PersistenceToken:  persistenceToken,
×
3717
                                TransientDecision: matchingResp.DecisionInfo,
×
3718
                                BranchToken:       branchToken,
×
3719
                        })
×
3720
                        if err != nil {
×
3721
                                return nil, err
×
3722
                        }
×
3723
                }
3724
        }
3725

3726
        resp := &types.PollForDecisionTaskResponse{
1,162✔
3727
                TaskToken:                 matchingResp.TaskToken,
1,162✔
3728
                WorkflowExecution:         matchingResp.WorkflowExecution,
1,162✔
3729
                WorkflowType:              matchingResp.WorkflowType,
1,162✔
3730
                PreviousStartedEventID:    matchingResp.PreviousStartedEventID,
1,162✔
3731
                StartedEventID:            matchingResp.StartedEventID, // this field is not set for query tasks as there's no decision task started event
1,162✔
3732
                Query:                     matchingResp.Query,
1,162✔
3733
                BacklogCountHint:          matchingResp.BacklogCountHint,
1,162✔
3734
                Attempt:                   matchingResp.Attempt,
1,162✔
3735
                History:                   history,
1,162✔
3736
                NextPageToken:             continuation,
1,162✔
3737
                WorkflowExecutionTaskList: matchingResp.WorkflowExecutionTaskList,
1,162✔
3738
                ScheduledTimestamp:        matchingResp.ScheduledTimestamp,
1,162✔
3739
                StartedTimestamp:          matchingResp.StartedTimestamp,
1,162✔
3740
                Queries:                   matchingResp.Queries,
1,162✔
3741
                NextEventID:               matchingResp.NextEventID,
1,162✔
3742
                TotalHistoryBytes:         matchingResp.TotalHistoryBytes,
1,162✔
3743
        }
1,162✔
3744

1,162✔
3745
        return resp, nil
1,162✔
3746
}
3747

3748
func verifyHistoryIsComplete(
3749
        events []*types.HistoryEvent,
3750
        expectedFirstEventID int64,
3751
        expectedLastEventID int64,
3752
        isFirstPage bool,
3753
        isLastPage bool,
3754
        pageSize int,
3755
) error {
1,611✔
3756

1,611✔
3757
        nEvents := len(events)
1,611✔
3758
        if nEvents == 0 {
1,623✔
3759
                if isLastPage {
24✔
3760
                        // we seem to be returning a non-nil pageToken on the lastPage which
12✔
3761
                        // in turn cases the client to call getHistory again - only to find
12✔
3762
                        // there are no more events to consume - bail out if this is the case here
12✔
3763
                        return nil
12✔
3764
                }
12✔
3765
                return fmt.Errorf("invalid history: contains zero events")
×
3766
        }
3767

3768
        firstEventID := events[0].ID
1,599✔
3769
        lastEventID := events[nEvents-1].ID
1,599✔
3770

1,599✔
3771
        if !isFirstPage { // atleast one page of history has been read previously
1,635✔
3772
                if firstEventID <= expectedFirstEventID {
36✔
3773
                        // not first page and no events have been read in the previous pages - not possible
×
3774
                        return &types.InternalServiceError{
×
3775
                                Message: fmt.Sprintf(
×
3776
                                        "invalid history: expected first eventID to be > %v but got %v", expectedFirstEventID, firstEventID),
×
3777
                        }
×
3778
                }
×
3779
                expectedFirstEventID = firstEventID
36✔
3780
        }
3781

3782
        if !isLastPage {
1,648✔
3783
                // estimate lastEventID based on pageSize. This is a lower bound
49✔
3784
                // since the persistence layer counts "batch of events" as a single page
49✔
3785
                expectedLastEventID = expectedFirstEventID + int64(pageSize) - 1
49✔
3786
        }
49✔
3787

3788
        nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1
1,599✔
3789

1,599✔
3790
        if firstEventID == expectedFirstEventID &&
1,599✔
3791
                ((isLastPage && lastEventID == expectedLastEventID && int64(nEvents) == nExpectedEvents) ||
1,599✔
3792
                        (!isLastPage && lastEventID >= expectedLastEventID && int64(nEvents) >= nExpectedEvents)) {
3,186✔
3793
                return nil
1,587✔
3794
        }
1,587✔
3795

3796
        return &types.InternalServiceError{
12✔
3797
                Message: fmt.Sprintf(
12✔
3798
                        "incomplete history: "+
12✔
3799
                                "expected events [%v-%v] but got events [%v-%v] of length %v:"+
12✔
3800
                                "isFirstPage=%v,isLastPage=%v,pageSize=%v",
12✔
3801
                        expectedFirstEventID,
12✔
3802
                        expectedLastEventID,
12✔
3803
                        firstEventID,
12✔
3804
                        lastEventID,
12✔
3805
                        nEvents,
12✔
3806
                        isFirstPage,
12✔
3807
                        isLastPage,
12✔
3808
                        pageSize),
12✔
3809
        }
12✔
3810
}
3811

3812
func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) {
44✔
3813
        token := &getHistoryContinuationToken{}
44✔
3814
        err := json.Unmarshal(bytes, token)
44✔
3815
        return token, err
44✔
3816
}
44✔
3817

3818
func serializeHistoryToken(token *getHistoryContinuationToken) ([]byte, error) {
437✔
3819
        if token == nil {
832✔
3820
                return nil, nil
395✔
3821
        }
395✔
3822

3823
        bytes, err := json.Marshal(token)
42✔
3824
        return bytes, err
42✔
3825
}
3826

3827
func isFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3828
        return updateRequest.ActiveClusterName != nil
9✔
3829
}
9✔
3830

3831
func isGraceFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
9✔
3832
        return updateRequest.FailoverTimeoutInSeconds != nil
9✔
3833
}
9✔
3834

3835
func (wh *WorkflowHandler) checkOngoingFailover(
3836
        ctx context.Context,
3837
        domainName *string,
3838
) error {
1✔
3839

1✔
3840
        enabledClusters := wh.GetClusterMetadata().GetEnabledClusterInfo()
1✔
3841
        respChan := make(chan *types.DescribeDomainResponse, len(enabledClusters))
1✔
3842

1✔
3843
        g := &errgroup.Group{}
1✔
3844
        for clusterName := range enabledClusters {
3✔
3845
                frontendClient := wh.GetRemoteFrontendClient(clusterName)
2✔
3846
                g.Go(func() (e error) {
4✔
3847
                        defer func() { log.CapturePanic(recover(), wh.GetLogger(), &e) }()
4✔
3848

3849
                        resp, _ := frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{Name: domainName})
2✔
3850
                        respChan <- resp
2✔
3851
                        return nil
2✔
3852
                })
3853
        }
3854
        g.Wait()
1✔
3855
        close(respChan)
1✔
3856

1✔
3857
        var failoverVersion *int64
1✔
3858
        for resp := range respChan {
3✔
3859
                if resp == nil {
2✔
3860
                        return &types.InternalServiceError{
×
3861
                                Message: "Failed to verify failover version from all clusters",
×
3862
                        }
×
3863
                }
×
3864
                if failoverVersion == nil {
3✔
3865
                        failoverVersion = &resp.FailoverVersion
1✔
3866
                }
1✔
3867
                if *failoverVersion != resp.GetFailoverVersion() {
2✔
3868
                        return &types.BadRequestError{
×
3869
                                Message: "Concurrent failover is not allow.",
×
3870
                        }
×
3871
                }
×
3872
        }
3873
        return nil
1✔
3874
}
3875

3876
func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *types.GetWorkflowExecutionHistoryRequest, domainID string) bool {
445✔
3877
        if request.GetExecution() == nil || request.GetExecution().GetRunID() == "" {
480✔
3878
                return false
35✔
3879
        }
35✔
3880
        getMutableStateRequest := &types.GetMutableStateRequest{
410✔
3881
                DomainUUID: domainID,
410✔
3882
                Execution:  request.Execution,
410✔
3883
        }
410✔
3884
        _, err := wh.GetHistoryClient().GetMutableState(ctx, getMutableStateRequest)
410✔
3885
        if err == nil {
797✔
3886
                return false
387✔
3887
        }
387✔
3888
        switch err.(type) {
23✔
3889
        case *types.EntityNotExistsError:
22✔
3890
                // the only case in which history is assumed to be archived is if getting mutable state returns entity not found error
22✔
3891
                return true
22✔
3892
        }
3893
        return false
1✔
3894
}
3895

3896
func (wh *WorkflowHandler) getArchivedHistory(
3897
        ctx context.Context,
3898
        request *types.GetWorkflowExecutionHistoryRequest,
3899
        domainID string,
3900
) (*types.GetWorkflowExecutionHistoryResponse, error) {
25✔
3901
        entry, err := wh.GetDomainCache().GetDomainByID(domainID)
25✔
3902
        if err != nil {
26✔
3903
                return nil, err
1✔
3904
        }
1✔
3905

3906
        URIString := entry.GetConfig().HistoryArchivalURI
24✔
3907
        if URIString == "" {
25✔
3908
                // if URI is empty, it means the domain has never enabled for archival.
1✔
3909
                // the error is not "workflow has passed retention period", because
1✔
3910
                // we have no way to tell if the requested workflow exists or not.
1✔
3911
                return nil, validate.ErrHistoryNotFound
1✔
3912
        }
1✔
3913

3914
        URI, err := archiver.NewURI(URIString)
23✔
3915
        if err != nil {
24✔
3916
                return nil, err
1✔
3917
        }
1✔
3918

3919
        historyArchiver, err := wh.GetArchiverProvider().GetHistoryArchiver(URI.Scheme(), service.Frontend)
22✔
3920
        if err != nil {
22✔
3921
                return nil, err
×
3922
        }
×
3923

3924
        resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{
22✔
3925
                DomainID:      domainID,
22✔
3926
                WorkflowID:    request.GetExecution().GetWorkflowID(),
22✔
3927
                RunID:         request.GetExecution().GetRunID(),
22✔
3928
                NextPageToken: request.GetNextPageToken(),
22✔
3929
                PageSize:      int(request.GetMaximumPageSize()),
22✔
3930
        })
22✔
3931
        if err != nil {
22✔
UNCOV
3932
                return nil, err
×
UNCOV
3933
        }
×
3934

3935
        history := &types.History{}
22✔
3936
        for _, batch := range resp.HistoryBatches {
279✔
3937
                history.Events = append(history.Events, batch.Events...)
257✔
3938
        }
257✔
3939
        return &types.GetWorkflowExecutionHistoryResponse{
22✔
3940
                History:       history,
22✔
3941
                NextPageToken: resp.NextPageToken,
22✔
3942
                Archived:      true,
22✔
3943
        }, nil
22✔
3944
}
3945

3946
func (wh *WorkflowHandler) convertIndexedKeyToThrift(keys map[string]interface{}) map[string]types.IndexedValueType {
3✔
3947
        converted := make(map[string]types.IndexedValueType)
3✔
3948
        for k, v := range keys {
51✔
3949
                converted[k] = common.ConvertIndexedValueTypeToInternalType(v, wh.GetLogger())
48✔
3950
        }
48✔
3951
        return converted
2✔
3952
}
3953

3954
func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain string) bool {
307✔
3955
        return common.IsAdvancedVisibilityReadingEnabled(wh.config.EnableReadVisibilityFromES(domain), wh.config.IsAdvancedVisConfigExist) &&
307✔
3956
                pageSize > int32(wh.config.ESIndexMaxResultWindow())
307✔
3957
}
307✔
3958

3959
// GetClusterInfo return information about cadence deployment
3960
func (wh *WorkflowHandler) GetClusterInfo(
3961
        ctx context.Context,
3962
) (resp *types.ClusterInfo, err error) {
×
3963
        return &types.ClusterInfo{
×
3964
                SupportedClientVersions: &types.SupportedClientVersions{
×
3965
                        GoSdk:   client.SupportedGoSDKVersion,
×
3966
                        JavaSdk: client.SupportedJavaSDKVersion,
×
3967
                },
×
3968
        }, nil
×
3969
}
×
3970

3971
func checkFailOverPermission(config *config.Config, domainName string) error {
2✔
3972
        if config.Lockdown(domainName) {
3✔
3973
                return validate.ErrDomainInLockdown
1✔
3974
        }
1✔
3975
        return nil
1✔
3976
}
3977

3978
type domainWrapper struct {
3979
        domain string
3980
}
3981

3982
func (d domainWrapper) GetDomain() string {
1,758✔
3983
        return d.domain
1,758✔
3984
}
1,758✔
3985

3986
func (hs HealthStatus) String() string {
2✔
3987
        switch hs {
2✔
3988
        case HealthStatusOK:
1✔
3989
                return "OK"
1✔
3990
        case HealthStatusWarmingUp:
1✔
3991
                return "WarmingUp"
1✔
3992
        case HealthStatusShuttingDown:
×
3993
                return "ShuttingDown"
×
3994
        default:
×
3995
                return "unknown"
×
3996
        }
3997
}
3998

3999
func getDomainWfIDRunIDTags(
4000
        domainName string,
4001
        wf *types.WorkflowExecution,
4002
) []tag.Tag {
1,487✔
4003
        tags := []tag.Tag{tag.WorkflowDomainName(domainName)}
1,487✔
4004
        if wf == nil {
2,974✔
4005
                return tags
1,487✔
4006
        }
1,487✔
4007
        return append(
×
4008
                tags,
×
4009
                tag.WorkflowID(wf.GetWorkflowID()),
×
4010
                tag.WorkflowRunID(wf.GetRunID()),
×
4011
        )
×
4012
}
4013

4014
func checkRequiredDomainDataKVs(requiredDomainDataKeys map[string]interface{}, domainData map[string]string) error {
45✔
4015
        // check requiredDomainDataKeys
45✔
4016
        for k := range requiredDomainDataKeys {
46✔
4017
                _, ok := domainData[k]
1✔
4018
                if !ok {
2✔
4019
                        return fmt.Errorf("domain data error, missing required key %v . All required keys: %v", k, requiredDomainDataKeys)
1✔
4020
                }
1✔
4021
        }
4022
        return nil
44✔
4023
}
4024

4025
// Some error types are introduced later that some clients might not support
4026
// To make them backward compatible, we continue returning the legacy error types
4027
// for older clients
4028
func (wh *WorkflowHandler) normalizeVersionedErrors(ctx context.Context, err error) error {
66✔
4029
        switch err.(type) {
66✔
4030
        case *types.WorkflowExecutionAlreadyCompletedError:
17✔
4031
                call := yarpc.CallFromContext(ctx)
17✔
4032
                clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
17✔
4033
                clientImpl := call.Header(common.ClientImplHeaderName)
17✔
4034
                featureFlags := client.GetFeatureFlagsFromHeader(call)
17✔
4035

17✔
4036
                vErr := wh.versionChecker.SupportsWorkflowAlreadyCompletedError(clientImpl, clientFeatureVersion, featureFlags)
17✔
4037
                if vErr == nil {
20✔
4038
                        return err
3✔
4039
                }
3✔
4040
                return &types.EntityNotExistsError{Message: "Workflow execution already completed."}
14✔
4041
        default:
49✔
4042
                return err
49✔
4043
        }
4044
}
4045
func constructRestartWorkflowRequest(w *types.WorkflowExecutionStartedEventAttributes, domain string, identity string, workflowID string) *types.StartWorkflowExecutionRequest {
1✔
4046

1✔
4047
        startRequest := &types.StartWorkflowExecutionRequest{
1✔
4048
                RequestID:  uuid.New().String(),
1✔
4049
                Domain:     domain,
1✔
4050
                WorkflowID: workflowID,
1✔
4051
                WorkflowType: &types.WorkflowType{
1✔
4052
                        Name: w.WorkflowType.Name,
1✔
4053
                },
1✔
4054
                TaskList: &types.TaskList{
1✔
4055
                        Name: w.TaskList.Name,
1✔
4056
                },
1✔
4057
                Input:                               w.Input,
1✔
4058
                ExecutionStartToCloseTimeoutSeconds: w.ExecutionStartToCloseTimeoutSeconds,
1✔
4059
                TaskStartToCloseTimeoutSeconds:      w.TaskStartToCloseTimeoutSeconds,
1✔
4060
                Identity:                            identity,
1✔
4061
                WorkflowIDReusePolicy:               types.WorkflowIDReusePolicyTerminateIfRunning.Ptr(),
1✔
4062
        }
1✔
4063
        startRequest.CronSchedule = w.CronSchedule
1✔
4064
        startRequest.RetryPolicy = w.RetryPolicy
1✔
4065
        startRequest.DelayStartSeconds = w.FirstDecisionTaskBackoffSeconds
1✔
4066
        startRequest.Header = w.Header
1✔
4067
        startRequest.Memo = w.Memo
1✔
4068
        startRequest.SearchAttributes = w.SearchAttributes
1✔
4069

1✔
4070
        return startRequest
1✔
4071
}
1✔
4072

4073
func getMetricsScopeWithDomain(
4074
        scope int,
4075
        d domainGetter,
4076
        metricsClient metrics.Client,
4077
) metrics.Scope {
5,767✔
4078
        var metricsScope metrics.Scope
5,767✔
4079
        if d != nil {
11,534✔
4080
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainTag(d.GetDomain()))
5,767✔
4081
        } else {
5,767✔
4082
                metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainUnknownTag())
×
4083
        }
×
4084
        return metricsScope
5,767✔
4085
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc