• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018daedd-c4d7-4e21-b456-76c002abd778

15 Feb 2024 10:23PM UTC coverage: 62.721% (-0.03%) from 62.749%
018daedd-c4d7-4e21-b456-76c002abd778

Pull #5666

buildkite

neil-xie
Update in submodule
Pull Request #5666: Upgrade pinot client version

92590 of 147622 relevant lines covered (62.72%)

2306.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.97
/service/history/handler.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package history
22

23
import (
24
        "context"
25
        "errors"
26
        "fmt"
27
        "strconv"
28
        "sync"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/pborman/uuid"
33
        "go.uber.org/yarpc/yarpcerrors"
34
        "golang.org/x/sync/errgroup"
35

36
        "github.com/uber/cadence/common"
37
        "github.com/uber/cadence/common/definition"
38
        "github.com/uber/cadence/common/future"
39
        "github.com/uber/cadence/common/log"
40
        "github.com/uber/cadence/common/log/tag"
41
        "github.com/uber/cadence/common/membership"
42
        "github.com/uber/cadence/common/metrics"
43
        "github.com/uber/cadence/common/persistence"
44
        "github.com/uber/cadence/common/quotas"
45
        "github.com/uber/cadence/common/service"
46
        "github.com/uber/cadence/common/types"
47
        "github.com/uber/cadence/common/types/mapper/proto"
48
        "github.com/uber/cadence/service/history/config"
49
        "github.com/uber/cadence/service/history/engine"
50
        "github.com/uber/cadence/service/history/events"
51
        "github.com/uber/cadence/service/history/failover"
52
        "github.com/uber/cadence/service/history/replication"
53
        "github.com/uber/cadence/service/history/resource"
54
        "github.com/uber/cadence/service/history/shard"
55
        "github.com/uber/cadence/service/history/task"
56
        "github.com/uber/cadence/service/history/workflowcache"
57
)
58

59
const (
60
        shardOwnershipTransferDelay = 5 * time.Second
61
        workflowIDCacheTTL          = 1 * time.Second
62
        workflowIDCacheMaxCount     = 10_000
63
)
64

65
type (
66
        // handlerImpl is an implementation for history service independent of wire protocol
67
        handlerImpl struct {
68
                resource.Resource
69

70
                shuttingDown             int32
71
                controller               shard.Controller
72
                tokenSerializer          common.TaskTokenSerializer
73
                startWG                  sync.WaitGroup
74
                config                   *config.Config
75
                historyEventNotifier     events.Notifier
76
                rateLimiter              quotas.Limiter
77
                crossClusterTaskFetchers task.Fetchers
78
                replicationTaskFetchers  replication.TaskFetchers
79
                queueTaskProcessor       task.Processor
80
                failoverCoordinator      failover.Coordinator
81
                workflowIDCache          workflowcache.WFCache
82
        }
83
)
84

85
var _ Handler = (*handlerImpl)(nil)
86
var _ shard.EngineFactory = (*handlerImpl)(nil)
87

88
var (
89
        errDomainNotSet            = &types.BadRequestError{Message: "Domain not set on request."}
90
        errWorkflowExecutionNotSet = &types.BadRequestError{Message: "WorkflowExecution not set on request."}
91
        errTaskListNotSet          = &types.BadRequestError{Message: "Tasklist not set."}
92
        errWorkflowIDNotSet        = &types.BadRequestError{Message: "WorkflowId is not set on request."}
93
        errRunIDNotValid           = &types.BadRequestError{Message: "RunID is not valid UUID."}
94
        errSourceClusterNotSet     = &types.BadRequestError{Message: "Source Cluster not set on request."}
95
        errTimestampNotSet         = &types.BadRequestError{Message: "Timestamp not set on request."}
96
        errInvalidTaskType         = &types.BadRequestError{Message: "Invalid task type"}
97
        errHistoryHostThrottle     = &types.ServiceBusyError{Message: "History host rps exceeded"}
98
        errShuttingDown            = &types.InternalServiceError{Message: "Shutting down"}
99
)
100

101
// NewHandler creates a thrift handler for the history service
102
func NewHandler(
103
        resource resource.Resource,
104
        config *config.Config,
105
) Handler {
19✔
106
        handler := &handlerImpl{
19✔
107
                Resource:        resource,
19✔
108
                config:          config,
19✔
109
                tokenSerializer: common.NewJSONTaskTokenSerializer(),
19✔
110
                rateLimiter:     quotas.NewDynamicRateLimiter(config.RPS.AsFloat64()),
19✔
111
                workflowIDCache: workflowcache.New(workflowcache.Params{
19✔
112
                        TTL:                            workflowIDCacheTTL,
19✔
113
                        ExternalLimiterFactory:         quotas.NewSimpleDynamicRateLimiterFactory(config.WorkflowIDExternalRPS),
19✔
114
                        InternalLimiterFactory:         quotas.NewSimpleDynamicRateLimiterFactory(config.WorkflowIDInternalRPS),
19✔
115
                        WorkflowIDCacheExternalEnabled: config.WorkflowIDCacheExternalEnabled,
19✔
116
                        WorkflowIDCacheInternalEnabled: config.WorkflowIDCacheInternalEnabled,
19✔
117
                        MaxCount:                       workflowIDCacheMaxCount,
19✔
118
                        DomainCache:                    resource.GetDomainCache(),
19✔
119
                        Logger:                         resource.GetLogger(),
19✔
120
                        MetricsClient:                  resource.GetMetricsClient(),
19✔
121
                }),
19✔
122
        }
19✔
123

19✔
124
        // prevent us from trying to serve requests before shard controller is started and ready
19✔
125
        handler.startWG.Add(1)
19✔
126
        return handler
19✔
127
}
19✔
128

129
// Start starts the handler
130
func (h *handlerImpl) Start() {
15✔
131
        h.crossClusterTaskFetchers = task.NewCrossClusterTaskFetchers(
15✔
132
                h.GetClusterMetadata(),
15✔
133
                h.GetClientBean(),
15✔
134
                &task.FetcherOptions{
15✔
135
                        Parallelism:                h.config.CrossClusterFetcherParallelism,
15✔
136
                        AggregationInterval:        h.config.CrossClusterFetcherAggregationInterval,
15✔
137
                        ServiceBusyBackoffInterval: h.config.CrossClusterFetcherServiceBusyBackoffInterval,
15✔
138
                        ErrorRetryInterval:         h.config.CrossClusterFetcherErrorBackoffInterval,
15✔
139
                        TimerJitterCoefficient:     h.config.CrossClusterFetcherJitterCoefficient,
15✔
140
                },
15✔
141
                h.GetMetricsClient(),
15✔
142
                h.GetLogger(),
15✔
143
        )
15✔
144
        h.crossClusterTaskFetchers.Start()
15✔
145

15✔
146
        h.replicationTaskFetchers = replication.NewTaskFetchers(
15✔
147
                h.GetLogger(),
15✔
148
                h.config,
15✔
149
                h.GetClusterMetadata(),
15✔
150
                h.GetClientBean(),
15✔
151
        )
15✔
152

15✔
153
        h.replicationTaskFetchers.Start()
15✔
154

15✔
155
        var err error
15✔
156
        taskPriorityAssigner := task.NewPriorityAssigner(
15✔
157
                h.GetClusterMetadata().GetCurrentClusterName(),
15✔
158
                h.GetDomainCache(),
15✔
159
                h.GetLogger(),
15✔
160
                h.GetMetricsClient(),
15✔
161
                h.config,
15✔
162
        )
15✔
163

15✔
164
        h.queueTaskProcessor, err = task.NewProcessor(
15✔
165
                taskPriorityAssigner,
15✔
166
                h.config,
15✔
167
                h.GetLogger(),
15✔
168
                h.GetMetricsClient(),
15✔
169
        )
15✔
170
        if err != nil {
15✔
171
                h.GetLogger().Fatal("Creating priority task processor failed", tag.Error(err))
×
172
        }
×
173
        h.queueTaskProcessor.Start()
15✔
174

15✔
175
        h.controller = shard.NewShardController(
15✔
176
                h.Resource,
15✔
177
                h,
15✔
178
                h.config,
15✔
179
        )
15✔
180
        h.historyEventNotifier = events.NewNotifier(h.GetTimeSource(), h.GetMetricsClient(), h.config.GetShardID)
15✔
181
        // events notifier must starts before controller
15✔
182
        h.historyEventNotifier.Start()
15✔
183

15✔
184
        h.failoverCoordinator = failover.NewCoordinator(
15✔
185
                h.GetDomainManager(),
15✔
186
                h.GetHistoryClient(),
15✔
187
                h.GetTimeSource(),
15✔
188
                h.GetDomainCache(),
15✔
189
                h.config,
15✔
190
                h.GetMetricsClient(),
15✔
191
                h.GetLogger(),
15✔
192
        )
15✔
193
        if h.config.EnableGracefulFailover() {
30✔
194
                h.failoverCoordinator.Start()
15✔
195
        }
15✔
196

197
        h.controller.Start()
15✔
198

15✔
199
        h.startWG.Done()
15✔
200
}
201

202
// Stop stops the handler
203
func (h *handlerImpl) Stop() {
15✔
204
        h.prepareToShutDown()
15✔
205
        h.crossClusterTaskFetchers.Stop()
15✔
206
        h.replicationTaskFetchers.Stop()
15✔
207
        h.queueTaskProcessor.Stop()
15✔
208
        h.controller.Stop()
15✔
209
        h.historyEventNotifier.Stop()
15✔
210
        h.failoverCoordinator.Stop()
15✔
211
}
15✔
212

213
// PrepareToStop starts graceful traffic drain in preparation for shutdown
214
func (h *handlerImpl) PrepareToStop(remainingTime time.Duration) time.Duration {
15✔
215
        h.GetLogger().Info("ShutdownHandler: Initiating shardController shutdown")
15✔
216
        h.controller.PrepareToStop()
15✔
217
        h.GetLogger().Info("ShutdownHandler: Waiting for traffic to drain")
15✔
218
        remainingTime = common.SleepWithMinDuration(shardOwnershipTransferDelay, remainingTime)
15✔
219
        h.GetLogger().Info("ShutdownHandler: No longer taking rpc requests")
15✔
220
        h.prepareToShutDown()
15✔
221
        return remainingTime
15✔
222
}
15✔
223

224
func (h *handlerImpl) prepareToShutDown() {
27✔
225
        atomic.StoreInt32(&h.shuttingDown, 1)
27✔
226
}
27✔
227

228
func (h *handlerImpl) isShuttingDown() bool {
939✔
229
        return atomic.LoadInt32(&h.shuttingDown) != 0
939✔
230
}
939✔
231

232
// CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard
233
func (h *handlerImpl) CreateEngine(
234
        shardContext shard.Context,
235
) engine.Engine {
51✔
236
        return NewEngineWithShardContext(
51✔
237
                shardContext,
51✔
238
                h.GetVisibilityManager(),
51✔
239
                h.GetMatchingClient(),
51✔
240
                h.GetSDKClient(),
51✔
241
                h.historyEventNotifier,
51✔
242
                h.config,
51✔
243
                h.crossClusterTaskFetchers,
51✔
244
                h.replicationTaskFetchers,
51✔
245
                h.GetMatchingRawClient(),
51✔
246
                h.queueTaskProcessor,
51✔
247
                h.failoverCoordinator,
51✔
248
                h.workflowIDCache,
51✔
249
        )
51✔
250
}
51✔
251

252
// Health is for health check
253
func (h *handlerImpl) Health(ctx context.Context) (*types.HealthStatus, error) {
×
254
        h.startWG.Wait()
×
255
        h.GetLogger().Debug("History health check endpoint reached.")
×
256
        hs := &types.HealthStatus{Ok: true, Msg: "OK"}
×
257
        return hs, nil
×
258
}
×
259

260
// RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
261
func (h *handlerImpl) RecordActivityTaskHeartbeat(
262
        ctx context.Context,
263
        wrappedRequest *types.HistoryRecordActivityTaskHeartbeatRequest,
264
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
381✔
265

381✔
266
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
762✔
267
        h.startWG.Wait()
381✔
268

381✔
269
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordActivityTaskHeartbeatScope)
381✔
270
        defer sw.Stop()
381✔
271

381✔
272
        domainID := wrappedRequest.GetDomainUUID()
381✔
273
        if domainID == "" {
381✔
274
                return nil, h.error(errDomainNotSet, scope, domainID, "", "")
×
275
        }
×
276

277
        if ok := h.rateLimiter.Allow(); !ok {
381✔
278
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
279
        }
×
280

281
        heartbeatRequest := wrappedRequest.HeartbeatRequest
381✔
282
        token, err0 := h.tokenSerializer.Deserialize(heartbeatRequest.TaskToken)
381✔
283
        if err0 != nil {
381✔
284
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
285
                return nil, h.error(err0, scope, domainID, "", "")
×
286
        }
×
287

288
        err0 = validateTaskToken(token)
381✔
289
        if err0 != nil {
381✔
290
                return nil, h.error(err0, scope, domainID, "", "")
×
291
        }
×
292
        workflowID := token.WorkflowID
381✔
293

381✔
294
        engine, err1 := h.controller.GetEngine(workflowID)
381✔
295
        if err1 != nil {
381✔
296
                return nil, h.error(err1, scope, domainID, workflowID, "")
×
297
        }
×
298

299
        response, err2 := engine.RecordActivityTaskHeartbeat(ctx, wrappedRequest)
381✔
300
        if err2 != nil {
381✔
301
                return nil, h.error(err2, scope, domainID, workflowID, "")
×
302
        }
×
303

304
        return response, nil
381✔
305
}
306

307
// RecordActivityTaskStarted - Record Activity Task started.
308
func (h *handlerImpl) RecordActivityTaskStarted(
309
        ctx context.Context,
310
        recordRequest *types.RecordActivityTaskStartedRequest,
311
) (resp *types.RecordActivityTaskStartedResponse, retError error) {
330✔
312

330✔
313
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
660✔
314
        h.startWG.Wait()
330✔
315

330✔
316
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordActivityTaskStartedScope)
330✔
317
        defer sw.Stop()
330✔
318

330✔
319
        domainID := recordRequest.GetDomainUUID()
330✔
320
        workflowExecution := recordRequest.WorkflowExecution
330✔
321
        workflowID := workflowExecution.GetWorkflowID()
330✔
322

330✔
323
        h.emitInfoOrDebugLog(
330✔
324
                domainID,
330✔
325
                "RecordActivityTaskStarted",
330✔
326
                tag.WorkflowDomainID(domainID),
330✔
327
                tag.WorkflowID(workflowExecution.GetWorkflowID()),
330✔
328
                tag.WorkflowRunID(recordRequest.WorkflowExecution.RunID),
330✔
329
                tag.WorkflowScheduleID(recordRequest.GetScheduleID()),
330✔
330
        )
330✔
331

330✔
332
        if recordRequest.GetDomainUUID() == "" {
330✔
333
                return nil, h.error(errDomainNotSet, scope, domainID, workflowID, "")
×
334
        }
×
335

336
        if ok := h.rateLimiter.Allow(); !ok {
330✔
337
                return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID, "")
×
338
        }
×
339

340
        engine, err1 := h.controller.GetEngine(workflowID)
330✔
341
        if err1 != nil {
330✔
342
                return nil, h.error(err1, scope, domainID, workflowID, "")
×
343
        }
×
344

345
        response, err2 := engine.RecordActivityTaskStarted(ctx, recordRequest)
330✔
346
        if err2 != nil {
336✔
347
                return nil, h.error(err2, scope, domainID, workflowID, "")
6✔
348
        }
6✔
349

350
        return response, nil
324✔
351
}
352

353
// RecordDecisionTaskStarted - Record Decision Task started.
354
func (h *handlerImpl) RecordDecisionTaskStarted(
355
        ctx context.Context,
356
        recordRequest *types.RecordDecisionTaskStartedRequest,
357
) (resp *types.RecordDecisionTaskStartedResponse, retError error) {
1,096✔
358

1,096✔
359
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
2,192✔
360
        h.startWG.Wait()
1,096✔
361

1,096✔
362
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordDecisionTaskStartedScope)
1,096✔
363
        defer sw.Stop()
1,096✔
364

1,096✔
365
        domainID := recordRequest.GetDomainUUID()
1,096✔
366
        workflowExecution := recordRequest.WorkflowExecution
1,096✔
367
        workflowID := workflowExecution.GetWorkflowID()
1,096✔
368
        runID := workflowExecution.GetRunID()
1,096✔
369

1,096✔
370
        h.emitInfoOrDebugLog(
1,096✔
371
                domainID,
1,096✔
372
                "RecordDecisionTaskStarted",
1,096✔
373
                tag.WorkflowDomainID(domainID),
1,096✔
374
                tag.WorkflowID(workflowExecution.GetWorkflowID()),
1,096✔
375
                tag.WorkflowRunID(recordRequest.WorkflowExecution.RunID),
1,096✔
376
                tag.WorkflowScheduleID(recordRequest.GetScheduleID()),
1,096✔
377
        )
1,096✔
378

1,096✔
379
        if domainID == "" {
1,096✔
380
                return nil, h.error(errDomainNotSet, scope, domainID, workflowID, runID)
×
381
        }
×
382

383
        if ok := h.rateLimiter.Allow(); !ok {
1,096✔
384
                return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID, runID)
×
385
        }
×
386

387
        if recordRequest.PollRequest == nil || recordRequest.PollRequest.TaskList.GetName() == "" {
1,096✔
388
                return nil, h.error(errTaskListNotSet, scope, domainID, workflowID, runID)
×
389
        }
×
390

391
        engine, err1 := h.controller.GetEngine(workflowID)
1,096✔
392
        if err1 != nil {
1,096✔
393
                h.GetLogger().Error("RecordDecisionTaskStarted failed.",
×
394
                        tag.Error(err1),
×
395
                        tag.WorkflowID(recordRequest.WorkflowExecution.GetWorkflowID()),
×
396
                        tag.WorkflowRunID(runID),
×
397
                        tag.WorkflowRunID(recordRequest.WorkflowExecution.GetRunID()),
×
398
                        tag.WorkflowScheduleID(recordRequest.GetScheduleID()),
×
399
                )
×
400
                return nil, h.error(err1, scope, domainID, workflowID, runID)
×
401
        }
×
402

403
        response, err2 := engine.RecordDecisionTaskStarted(ctx, recordRequest)
1,096✔
404
        if err2 != nil {
1,117✔
405
                return nil, h.error(err2, scope, domainID, workflowID, runID)
21✔
406
        }
21✔
407

408
        return response, nil
1,075✔
409
}
410

411
// RespondActivityTaskCompleted - records completion of an activity task
412
func (h *handlerImpl) RespondActivityTaskCompleted(
413
        ctx context.Context,
414
        wrappedRequest *types.HistoryRespondActivityTaskCompletedRequest,
415
) (retError error) {
321✔
416

321✔
417
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
642✔
418
        h.startWG.Wait()
321✔
419

321✔
420
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondActivityTaskCompletedScope)
321✔
421
        defer sw.Stop()
321✔
422

321✔
423
        domainID := wrappedRequest.GetDomainUUID()
321✔
424
        if domainID == "" {
321✔
425
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
426
        }
×
427

428
        if ok := h.rateLimiter.Allow(); !ok {
321✔
429
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
430
        }
×
431

432
        completeRequest := wrappedRequest.CompleteRequest
321✔
433
        token, err0 := h.tokenSerializer.Deserialize(completeRequest.TaskToken)
321✔
434
        if err0 != nil {
321✔
435
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
436
                return h.error(err0, scope, domainID, "", "")
×
437
        }
×
438

439
        err0 = validateTaskToken(token)
321✔
440
        if err0 != nil {
321✔
441
                return h.error(err0, scope, domainID, "", "")
×
442
        }
×
443
        workflowID := token.WorkflowID
321✔
444
        runID := token.RunID
321✔
445

321✔
446
        engine, err1 := h.controller.GetEngine(workflowID)
321✔
447
        if err1 != nil {
321✔
448
                return h.error(err1, scope, domainID, workflowID, runID)
×
449
        }
×
450

451
        err2 := engine.RespondActivityTaskCompleted(ctx, wrappedRequest)
321✔
452
        if err2 != nil {
366✔
453
                return h.error(err2, scope, domainID, workflowID, runID)
45✔
454
        }
45✔
455

456
        return nil
276✔
457
}
458

459
// RespondActivityTaskFailed - records failure of an activity task
460
func (h *handlerImpl) RespondActivityTaskFailed(
461
        ctx context.Context,
462
        wrappedRequest *types.HistoryRespondActivityTaskFailedRequest,
463
) (retError error) {
12✔
464

12✔
465
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
24✔
466
        h.startWG.Wait()
12✔
467

12✔
468
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondActivityTaskFailedScope)
12✔
469
        defer sw.Stop()
12✔
470

12✔
471
        domainID := wrappedRequest.GetDomainUUID()
12✔
472
        if domainID == "" {
12✔
473
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
474
        }
×
475

476
        if ok := h.rateLimiter.Allow(); !ok {
12✔
477
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
478
        }
×
479

480
        failRequest := wrappedRequest.FailedRequest
12✔
481
        token, err0 := h.tokenSerializer.Deserialize(failRequest.TaskToken)
12✔
482
        if err0 != nil {
12✔
483
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
484
                return h.error(err0, scope, domainID, "", "")
×
485
        }
×
486

487
        err0 = validateTaskToken(token)
12✔
488
        if err0 != nil {
12✔
489
                return h.error(err0, scope, domainID, "", "")
×
490
        }
×
491
        workflowID := token.WorkflowID
12✔
492
        runID := token.RunID
12✔
493

12✔
494
        engine, err1 := h.controller.GetEngine(workflowID)
12✔
495
        if err1 != nil {
12✔
496
                return h.error(err1, scope, domainID, workflowID, runID)
×
497
        }
×
498

499
        err2 := engine.RespondActivityTaskFailed(ctx, wrappedRequest)
12✔
500
        if err2 != nil {
12✔
501
                return h.error(err2, scope, domainID, workflowID, runID)
×
502
        }
×
503

504
        return nil
12✔
505
}
506

507
// RespondActivityTaskCanceled - records failure of an activity task
508
func (h *handlerImpl) RespondActivityTaskCanceled(
509
        ctx context.Context,
510
        wrappedRequest *types.HistoryRespondActivityTaskCanceledRequest,
511
) (retError error) {
×
512

×
513
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
514
        h.startWG.Wait()
×
515

×
516
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondActivityTaskCanceledScope)
×
517
        defer sw.Stop()
×
518

×
519
        domainID := wrappedRequest.GetDomainUUID()
×
520
        if domainID == "" {
×
521
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
522
        }
×
523

524
        if ok := h.rateLimiter.Allow(); !ok {
×
525
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
526
        }
×
527

528
        cancelRequest := wrappedRequest.CancelRequest
×
529
        token, err0 := h.tokenSerializer.Deserialize(cancelRequest.TaskToken)
×
530
        if err0 != nil {
×
531
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
532
                return h.error(err0, scope, domainID, "", "")
×
533
        }
×
534

535
        err0 = validateTaskToken(token)
×
536
        if err0 != nil {
×
537
                return h.error(err0, scope, domainID, "", "")
×
538
        }
×
539
        workflowID := token.WorkflowID
×
540
        runID := token.RunID
×
541

×
542
        engine, err1 := h.controller.GetEngine(workflowID)
×
543
        if err1 != nil {
×
544
                return h.error(err1, scope, domainID, workflowID, runID)
×
545
        }
×
546

547
        err2 := engine.RespondActivityTaskCanceled(ctx, wrappedRequest)
×
548
        if err2 != nil {
×
549
                return h.error(err2, scope, domainID, workflowID, runID)
×
550
        }
×
551

552
        return nil
×
553
}
554

555
// RespondDecisionTaskCompleted - records completion of a decision task
556
func (h *handlerImpl) RespondDecisionTaskCompleted(
557
        ctx context.Context,
558
        wrappedRequest *types.HistoryRespondDecisionTaskCompletedRequest,
559
) (resp *types.HistoryRespondDecisionTaskCompletedResponse, retError error) {
931✔
560

931✔
561
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
1,862✔
562
        h.startWG.Wait()
931✔
563

931✔
564
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondDecisionTaskCompletedScope)
931✔
565
        defer sw.Stop()
931✔
566

931✔
567
        domainID := wrappedRequest.GetDomainUUID()
931✔
568
        if domainID == "" {
931✔
569
                return nil, h.error(errDomainNotSet, scope, domainID, "", "")
×
570
        }
×
571

572
        if ok := h.rateLimiter.Allow(); !ok {
931✔
573
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
574
        }
×
575

576
        completeRequest := wrappedRequest.CompleteRequest
931✔
577
        if len(completeRequest.Decisions) == 0 {
1,076✔
578
                scope.IncCounter(metrics.EmptyCompletionDecisionsCounter)
145✔
579
        }
145✔
580
        token, err0 := h.tokenSerializer.Deserialize(completeRequest.TaskToken)
931✔
581
        if err0 != nil {
931✔
582
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
583
                return nil, h.error(err0, scope, domainID, "", "")
×
584
        }
×
585

586
        h.GetLogger().Debug(fmt.Sprintf("RespondDecisionTaskCompleted. DomainID: %v, WorkflowID: %v, RunID: %v, ScheduleID: %v",
931✔
587
                token.DomainID,
931✔
588
                token.WorkflowID,
931✔
589
                token.RunID,
931✔
590
                token.ScheduleID))
931✔
591

931✔
592
        err0 = validateTaskToken(token)
931✔
593
        if err0 != nil {
931✔
594
                return nil, h.error(err0, scope, domainID, "", "")
×
595
        }
×
596
        workflowID := token.WorkflowID
931✔
597
        runID := token.RunID
931✔
598

931✔
599
        engine, err1 := h.controller.GetEngine(workflowID)
931✔
600
        if err1 != nil {
931✔
601
                return nil, h.error(err1, scope, domainID, workflowID, runID)
×
602
        }
×
603

604
        response, err2 := engine.RespondDecisionTaskCompleted(ctx, wrappedRequest)
931✔
605
        if err2 != nil {
940✔
606
                return nil, h.error(err2, scope, domainID, workflowID, runID)
9✔
607
        }
9✔
608

609
        return response, nil
922✔
610
}
611

612
// RespondDecisionTaskFailed - failed response to decision task
613
func (h *handlerImpl) RespondDecisionTaskFailed(
614
        ctx context.Context,
615
        wrappedRequest *types.HistoryRespondDecisionTaskFailedRequest,
616
) (retError error) {
159✔
617

159✔
618
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
318✔
619
        h.startWG.Wait()
159✔
620

159✔
621
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondDecisionTaskFailedScope)
159✔
622
        defer sw.Stop()
159✔
623

159✔
624
        domainID := wrappedRequest.GetDomainUUID()
159✔
625
        if domainID == "" {
159✔
626
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
627
        }
×
628

629
        if ok := h.rateLimiter.Allow(); !ok {
159✔
630
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
631
        }
×
632

633
        failedRequest := wrappedRequest.FailedRequest
159✔
634
        token, err0 := h.tokenSerializer.Deserialize(failedRequest.TaskToken)
159✔
635
        if err0 != nil {
159✔
636
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
637
                return h.error(err0, scope, domainID, "", "")
×
638
        }
×
639

640
        h.GetLogger().Debug(fmt.Sprintf("RespondDecisionTaskFailed. DomainID: %v, WorkflowID: %v, RunID: %v, ScheduleID: %v",
159✔
641
                token.DomainID,
159✔
642
                token.WorkflowID,
159✔
643
                token.RunID,
159✔
644
                token.ScheduleID))
159✔
645

159✔
646
        if failedRequest != nil && failedRequest.GetCause() == types.DecisionTaskFailedCauseUnhandledDecision {
159✔
647
                h.GetLogger().Info("Non-Deterministic Error", tag.WorkflowDomainID(token.DomainID), tag.WorkflowID(token.WorkflowID), tag.WorkflowRunID(token.RunID))
×
648
                domainName, err := h.GetDomainCache().GetDomainName(token.DomainID)
×
649
                var domainTag metrics.Tag
×
650

×
651
                if err == nil {
×
652
                        domainTag = metrics.DomainTag(domainName)
×
653
                } else {
×
654
                        domainTag = metrics.DomainUnknownTag()
×
655
                }
×
656

657
                scope.Tagged(domainTag).IncCounter(metrics.CadenceErrNonDeterministicCounter)
×
658
        }
659
        err0 = validateTaskToken(token)
159✔
660
        if err0 != nil {
159✔
661
                return h.error(err0, scope, domainID, "", "")
×
662
        }
×
663
        workflowID := token.WorkflowID
159✔
664
        runID := token.RunID
159✔
665

159✔
666
        engine, err1 := h.controller.GetEngine(workflowID)
159✔
667
        if err1 != nil {
159✔
668
                return h.error(err1, scope, domainID, workflowID, runID)
×
669
        }
×
670

671
        err2 := engine.RespondDecisionTaskFailed(ctx, wrappedRequest)
159✔
672
        if err2 != nil {
159✔
673
                return h.error(err2, scope, domainID, workflowID, runID)
×
674
        }
×
675

676
        return nil
159✔
677
}
678

679
// StartWorkflowExecution - creates a new workflow execution
680
func (h *handlerImpl) StartWorkflowExecution(
681
        ctx context.Context,
682
        wrappedRequest *types.HistoryStartWorkflowExecutionRequest,
683
) (resp *types.StartWorkflowExecutionResponse, retError error) {
464✔
684

464✔
685
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
928✔
686
        h.startWG.Wait()
464✔
687

464✔
688
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryStartWorkflowExecutionScope)
464✔
689
        defer sw.Stop()
464✔
690

464✔
691
        domainID := wrappedRequest.GetDomainUUID()
464✔
692
        if domainID == "" {
464✔
693
                return nil, h.error(errDomainNotSet, scope, domainID, "", "")
×
694
        }
×
695

696
        if ok := h.rateLimiter.Allow(); !ok {
464✔
697
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
698
        }
×
699

700
        startRequest := wrappedRequest.StartRequest
464✔
701
        workflowID := startRequest.GetWorkflowID()
464✔
702

464✔
703
        h.workflowIDCache.AllowExternal(domainID, workflowID)
464✔
704

464✔
705
        engine, err1 := h.controller.GetEngine(workflowID)
464✔
706
        if err1 != nil {
464✔
707
                return nil, h.error(err1, scope, domainID, workflowID, "")
×
708
        }
×
709

710
        response, err2 := engine.StartWorkflowExecution(ctx, wrappedRequest)
464✔
711
        runID := response.GetRunID()
464✔
712
        if err2 != nil {
482✔
713
                return nil, h.error(err2, scope, domainID, workflowID, runID)
18✔
714
        }
18✔
715

716
        return response, nil
446✔
717
}
718

719
// DescribeHistoryHost returns information about the internal states of a history host
720
func (h *handlerImpl) DescribeHistoryHost(
721
        ctx context.Context,
722
        request *types.DescribeHistoryHostRequest,
723
) (resp *types.DescribeHistoryHostResponse, retError error) {
×
724

×
725
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
726
        h.startWG.Wait()
×
727

×
728
        numOfItemsInCacheByID, numOfItemsInCacheByName := h.GetDomainCache().GetCacheSize()
×
729
        status := ""
×
730
        switch h.controller.Status() {
×
731
        case common.DaemonStatusInitialized:
×
732
                status = "initialized"
×
733
        case common.DaemonStatusStarted:
×
734
                status = "started"
×
735
        case common.DaemonStatusStopped:
×
736
                status = "stopped"
×
737
        }
738

739
        resp = &types.DescribeHistoryHostResponse{
×
740
                NumberOfShards: int32(h.controller.NumShards()),
×
741
                ShardIDs:       h.controller.ShardIDs(),
×
742
                DomainCache: &types.DomainCacheInfo{
×
743
                        NumOfItemsInCacheByID:   numOfItemsInCacheByID,
×
744
                        NumOfItemsInCacheByName: numOfItemsInCacheByName,
×
745
                },
×
746
                ShardControllerStatus: status,
×
747
                Address:               h.GetHostInfo().GetAddress(),
×
748
        }
×
749
        return resp, nil
×
750
}
751

752
// RemoveTask returns information about the internal states of a history host
753
func (h *handlerImpl) RemoveTask(
754
        ctx context.Context,
755
        request *types.RemoveTaskRequest,
756
) (retError error) {
×
757
        executionMgr, err := h.GetExecutionManager(int(request.GetShardID()))
×
758
        if err != nil {
×
759
                return err
×
760
        }
×
761

762
        switch taskType := common.TaskType(request.GetType()); taskType {
×
763
        case common.TaskTypeTransfer:
×
764
                return executionMgr.CompleteTransferTask(ctx, &persistence.CompleteTransferTaskRequest{
×
765
                        TaskID: request.GetTaskID(),
×
766
                })
×
767
        case common.TaskTypeTimer:
×
768
                return executionMgr.CompleteTimerTask(ctx, &persistence.CompleteTimerTaskRequest{
×
769
                        VisibilityTimestamp: time.Unix(0, request.GetVisibilityTimestamp()),
×
770
                        TaskID:              request.GetTaskID(),
×
771
                })
×
772
        case common.TaskTypeReplication:
×
773
                return executionMgr.CompleteReplicationTask(ctx, &persistence.CompleteReplicationTaskRequest{
×
774
                        TaskID: request.GetTaskID(),
×
775
                })
×
776
        case common.TaskTypeCrossCluster:
×
777
                return executionMgr.CompleteCrossClusterTask(ctx, &persistence.CompleteCrossClusterTaskRequest{
×
778
                        TargetCluster: request.GetClusterName(),
×
779
                        TaskID:        request.GetTaskID(),
×
780
                })
×
781
        default:
×
782
                return errInvalidTaskType
×
783
        }
784
}
785

786
// CloseShard closes a shard hosted by this instance
787
func (h *handlerImpl) CloseShard(
788
        ctx context.Context,
789
        request *types.CloseShardRequest,
790
) (retError error) {
×
791
        h.controller.RemoveEngineForShard(int(request.GetShardID()))
×
792
        return nil
×
793
}
×
794

795
// ResetQueue resets processing queue states
796
func (h *handlerImpl) ResetQueue(
797
        ctx context.Context,
798
        request *types.ResetQueueRequest,
799
) (retError error) {
×
800

×
801
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
802
        h.startWG.Wait()
×
803

×
804
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryResetQueueScope)
×
805
        defer sw.Stop()
×
806

×
807
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
×
808
        if err != nil {
×
809
                return h.error(err, scope, "", "", "")
×
810
        }
×
811

812
        switch taskType := common.TaskType(request.GetType()); taskType {
×
813
        case common.TaskTypeTransfer:
×
814
                err = engine.ResetTransferQueue(ctx, request.GetClusterName())
×
815
        case common.TaskTypeTimer:
×
816
                err = engine.ResetTimerQueue(ctx, request.GetClusterName())
×
817
        case common.TaskTypeCrossCluster:
×
818
                err = engine.ResetCrossClusterQueue(ctx, request.GetClusterName())
×
819
        default:
×
820
                err = errInvalidTaskType
×
821
        }
822

823
        if err != nil {
×
824
                return h.error(err, scope, "", "", "")
×
825
        }
×
826
        return nil
×
827
}
828

829
// DescribeQueue describes processing queue states
830
func (h *handlerImpl) DescribeQueue(
831
        ctx context.Context,
832
        request *types.DescribeQueueRequest,
833
) (resp *types.DescribeQueueResponse, retError error) {
×
834

×
835
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
836
        h.startWG.Wait()
×
837

×
838
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryDescribeQueueScope)
×
839
        defer sw.Stop()
×
840

×
841
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
×
842
        if err != nil {
×
843
                return nil, h.error(err, scope, "", "", "")
×
844
        }
×
845

846
        switch taskType := common.TaskType(request.GetType()); taskType {
×
847
        case common.TaskTypeTransfer:
×
848
                resp, err = engine.DescribeTransferQueue(ctx, request.GetClusterName())
×
849
        case common.TaskTypeTimer:
×
850
                resp, err = engine.DescribeTimerQueue(ctx, request.GetClusterName())
×
851
        case common.TaskTypeCrossCluster:
×
852
                resp, err = engine.DescribeCrossClusterQueue(ctx, request.GetClusterName())
×
853
        default:
×
854
                err = errInvalidTaskType
×
855
        }
856

857
        if err != nil {
×
858
                return nil, h.error(err, scope, "", "", "")
×
859
        }
×
860
        return resp, nil
×
861
}
862

863
// DescribeMutableState - returns the internal analysis of workflow execution state
864
func (h *handlerImpl) DescribeMutableState(
865
        ctx context.Context,
866
        request *types.DescribeMutableStateRequest,
867
) (resp *types.DescribeMutableStateResponse, retError error) {
×
868

×
869
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
870
        h.startWG.Wait()
×
871

×
872
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryDescribeMutabelStateScope)
×
873
        defer sw.Stop()
×
874

×
875
        domainID := request.GetDomainUUID()
×
876
        if domainID == "" {
×
877
                return nil, h.error(errDomainNotSet, scope, domainID, "", "")
×
878
        }
×
879

880
        workflowExecution := request.Execution
×
881
        workflowID := workflowExecution.GetWorkflowID()
×
882
        runID := workflowExecution.GetRunID()
×
883
        engine, err1 := h.controller.GetEngine(workflowID)
×
884
        if err1 != nil {
×
885
                return nil, h.error(err1, scope, domainID, workflowID, runID)
×
886
        }
×
887

888
        resp, err2 := engine.DescribeMutableState(ctx, request)
×
889
        if err2 != nil {
×
890
                return nil, h.error(err2, scope, domainID, workflowID, runID)
×
891
        }
×
892
        return resp, nil
×
893
}
894

895
// GetMutableState - returns the id of the next event in the execution's history
896
func (h *handlerImpl) GetMutableState(
897
        ctx context.Context,
898
        getRequest *types.GetMutableStateRequest,
899
) (resp *types.GetMutableStateResponse, retError error) {
439✔
900

439✔
901
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
878✔
902
        h.startWG.Wait()
439✔
903

439✔
904
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryGetMutableStateScope)
439✔
905
        defer sw.Stop()
439✔
906

439✔
907
        domainID := getRequest.GetDomainUUID()
439✔
908
        if domainID == "" {
439✔
909
                return nil, h.error(errDomainNotSet, scope, domainID, "", "")
×
910
        }
×
911

912
        if ok := h.rateLimiter.Allow(); !ok {
439✔
913
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
914
        }
×
915

916
        workflowExecution := getRequest.Execution
439✔
917
        workflowID := workflowExecution.GetWorkflowID()
439✔
918
        runID := workflowExecution.GetWorkflowID()
439✔
919
        engine, err1 := h.controller.GetEngine(workflowID)
439✔
920
        if err1 != nil {
439✔
921
                return nil, h.error(err1, scope, domainID, workflowID, runID)
×
922
        }
×
923

924
        resp, err2 := engine.GetMutableState(ctx, getRequest)
439✔
925
        if err2 != nil {
462✔
926
                return nil, h.error(err2, scope, domainID, workflowID, runID)
23✔
927
        }
23✔
928
        return resp, nil
416✔
929
}
930

931
// PollMutableState - returns the id of the next event in the execution's history
932
func (h *handlerImpl) PollMutableState(
933
        ctx context.Context,
934
        getRequest *types.PollMutableStateRequest,
935
) (resp *types.PollMutableStateResponse, retError error) {
392✔
936

392✔
937
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
784✔
938
        h.startWG.Wait()
392✔
939

392✔
940
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryPollMutableStateScope)
392✔
941
        defer sw.Stop()
392✔
942

392✔
943
        domainID := getRequest.GetDomainUUID()
392✔
944
        if domainID == "" {
392✔
945
                return nil, h.error(errDomainNotSet, scope, domainID, "", "")
×
946
        }
×
947

948
        if ok := h.rateLimiter.Allow(); !ok {
392✔
949
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
950
        }
×
951

952
        workflowExecution := getRequest.Execution
392✔
953
        workflowID := workflowExecution.GetWorkflowID()
392✔
954
        runID := workflowExecution.GetRunID()
392✔
955
        engine, err1 := h.controller.GetEngine(workflowID)
392✔
956
        if err1 != nil {
392✔
957
                return nil, h.error(err1, scope, domainID, workflowID, runID)
×
958
        }
×
959

960
        resp, err2 := engine.PollMutableState(ctx, getRequest)
392✔
961
        if err2 != nil {
392✔
962
                return nil, h.error(err2, scope, domainID, workflowID, runID)
×
963
        }
×
964
        return resp, nil
392✔
965
}
966

967
// DescribeWorkflowExecution returns information about the specified workflow execution.
968
func (h *handlerImpl) DescribeWorkflowExecution(
969
        ctx context.Context,
970
        request *types.HistoryDescribeWorkflowExecutionRequest,
971
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
93✔
972

93✔
973
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
186✔
974
        h.startWG.Wait()
93✔
975

93✔
976
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryDescribeWorkflowExecutionScope)
93✔
977
        defer sw.Stop()
93✔
978

93✔
979
        domainID := request.GetDomainUUID()
93✔
980
        if domainID == "" {
93✔
981
                return nil, h.error(errDomainNotSet, scope, domainID, "", "")
×
982
        }
×
983

984
        if ok := h.rateLimiter.Allow(); !ok {
93✔
985
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
986
        }
×
987

988
        workflowExecution := request.Request.Execution
93✔
989
        workflowID := workflowExecution.GetWorkflowID()
93✔
990
        runID := workflowExecution.GetRunID()
93✔
991
        engine, err1 := h.controller.GetEngine(workflowID)
93✔
992
        if err1 != nil {
93✔
993
                return nil, h.error(err1, scope, domainID, workflowID, runID)
×
994
        }
×
995

996
        resp, err2 := engine.DescribeWorkflowExecution(ctx, request)
93✔
997
        if err2 != nil {
93✔
998
                return nil, h.error(err2, scope, domainID, workflowID, runID)
×
999
        }
×
1000
        return resp, nil
93✔
1001
}
1002

1003
// RequestCancelWorkflowExecution - requests cancellation of a workflow
1004
func (h *handlerImpl) RequestCancelWorkflowExecution(
1005
        ctx context.Context,
1006
        request *types.HistoryRequestCancelWorkflowExecutionRequest,
1007
) (retError error) {
12✔
1008

12✔
1009
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
24✔
1010
        h.startWG.Wait()
12✔
1011

12✔
1012
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRequestCancelWorkflowExecutionScope)
12✔
1013
        defer sw.Stop()
12✔
1014

12✔
1015
        if h.isShuttingDown() {
12✔
1016
                return errShuttingDown
×
1017
        }
×
1018

1019
        domainID := request.GetDomainUUID()
12✔
1020
        if domainID == "" || request.CancelRequest.GetDomain() == "" {
12✔
1021
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
1022
        }
×
1023

1024
        if ok := h.rateLimiter.Allow(); !ok {
12✔
1025
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1026
        }
×
1027

1028
        cancelRequest := request.CancelRequest
12✔
1029
        h.GetLogger().Debug(fmt.Sprintf("RequestCancelWorkflowExecution. DomainID: %v/%v, WorkflowID: %v, RunID: %v.",
12✔
1030
                cancelRequest.GetDomain(),
12✔
1031
                request.GetDomainUUID(),
12✔
1032
                cancelRequest.WorkflowExecution.GetWorkflowID(),
12✔
1033
                cancelRequest.WorkflowExecution.GetRunID()))
12✔
1034

12✔
1035
        workflowID := cancelRequest.WorkflowExecution.GetWorkflowID()
12✔
1036
        runID := cancelRequest.WorkflowExecution.GetRunID()
12✔
1037
        engine, err1 := h.controller.GetEngine(workflowID)
12✔
1038
        if err1 != nil {
12✔
1039
                return h.error(err1, scope, domainID, workflowID, runID)
×
1040
        }
×
1041

1042
        err2 := engine.RequestCancelWorkflowExecution(ctx, request)
12✔
1043
        if err2 != nil {
18✔
1044
                return h.error(err2, scope, domainID, workflowID, runID)
6✔
1045
        }
6✔
1046

1047
        return nil
6✔
1048
}
1049

1050
// SignalWorkflowExecution is used to send a signal event to running workflow execution.  This results in
1051
// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
1052
func (h *handlerImpl) SignalWorkflowExecution(
1053
        ctx context.Context,
1054
        wrappedRequest *types.HistorySignalWorkflowExecutionRequest,
1055
) (retError error) {
732✔
1056

732✔
1057
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
1,464✔
1058
        h.startWG.Wait()
732✔
1059

732✔
1060
        scope, sw := h.startRequestProfile(ctx, metrics.HistorySignalWorkflowExecutionScope)
732✔
1061
        defer sw.Stop()
732✔
1062

732✔
1063
        if h.isShuttingDown() {
732✔
1064
                return errShuttingDown
×
1065
        }
×
1066

1067
        domainID := wrappedRequest.GetDomainUUID()
732✔
1068
        if domainID == "" {
732✔
1069
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
1070
        }
×
1071

1072
        if ok := h.rateLimiter.Allow(); !ok {
732✔
1073
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1074
        }
×
1075

1076
        workflowExecution := wrappedRequest.SignalRequest.WorkflowExecution
732✔
1077
        workflowID := workflowExecution.GetWorkflowID()
732✔
1078
        runID := workflowExecution.GetRunID()
732✔
1079
        engine, err1 := h.controller.GetEngine(workflowID)
732✔
1080
        if err1 != nil {
732✔
1081
                return h.error(err1, scope, domainID, workflowID, runID)
×
1082
        }
×
1083

1084
        err2 := engine.SignalWorkflowExecution(ctx, wrappedRequest)
732✔
1085
        if err2 != nil {
744✔
1086
                return h.error(err2, scope, domainID, workflowID, runID)
12✔
1087
        }
12✔
1088

1089
        return nil
720✔
1090
}
1091

1092
// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
1093
// If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history
1094
// and a decision task being created for the execution.
1095
// If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled
1096
// event recorded in history, and a decision task being created for the execution
1097
func (h *handlerImpl) SignalWithStartWorkflowExecution(
1098
        ctx context.Context,
1099
        wrappedRequest *types.HistorySignalWithStartWorkflowExecutionRequest,
1100
) (resp *types.StartWorkflowExecutionResponse, retError error) {
33✔
1101

33✔
1102
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
66✔
1103
        h.startWG.Wait()
33✔
1104

33✔
1105
        scope, sw := h.startRequestProfile(ctx, metrics.HistorySignalWithStartWorkflowExecutionScope)
33✔
1106
        defer sw.Stop()
33✔
1107

33✔
1108
        if h.isShuttingDown() {
33✔
1109
                return nil, errShuttingDown
×
1110
        }
×
1111

1112
        domainID := wrappedRequest.GetDomainUUID()
33✔
1113
        if domainID == "" {
33✔
1114
                return nil, h.error(errDomainNotSet, scope, domainID, "", "")
×
1115
        }
×
1116

1117
        if ok := h.rateLimiter.Allow(); !ok {
33✔
1118
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1119
        }
×
1120

1121
        signalWithStartRequest := wrappedRequest.SignalWithStartRequest
33✔
1122
        workflowID := signalWithStartRequest.GetWorkflowID()
33✔
1123
        engine, err1 := h.controller.GetEngine(workflowID)
33✔
1124
        if err1 != nil {
33✔
1125
                return nil, h.error(err1, scope, domainID, workflowID, "")
×
1126
        }
×
1127

1128
        resp, err2 := engine.SignalWithStartWorkflowExecution(ctx, wrappedRequest)
33✔
1129
        if err2 == nil {
60✔
1130
                return resp, nil
27✔
1131
        }
27✔
1132
        // Two simultaneous SignalWithStart requests might try to start a workflow at the same time.
1133
        // This can result in one of the requests failing with one of two possible errors:
1134
        //    1) If it is a brand new WF ID, one of the requests can fail with WorkflowExecutionAlreadyStartedError
1135
        //       (createMode is persistence.CreateWorkflowModeBrandNew)
1136
        //    2) If it an already existing WF ID, one of the requests can fail with a CurrentWorkflowConditionFailedError
1137
        //       (createMode is persisetence.CreateWorkflowModeWorkflowIDReuse)
1138
        // If either error occurs, just go ahead and retry. It should succeed on the subsequent attempt.
1139
        var e1 *persistence.WorkflowExecutionAlreadyStartedError
6✔
1140
        var e2 *persistence.CurrentWorkflowConditionFailedError
6✔
1141
        if !errors.As(err2, &e1) && !errors.As(err2, &e2) {
12✔
1142
                return nil, h.error(err2, scope, domainID, workflowID, resp.GetRunID())
6✔
1143
        }
6✔
1144

1145
        resp, err2 = engine.SignalWithStartWorkflowExecution(ctx, wrappedRequest)
×
1146
        if err2 != nil {
×
1147
                return nil, h.error(err2, scope, domainID, workflowID, resp.GetRunID())
×
1148
        }
×
1149
        return resp, nil
×
1150
}
1151

1152
// RemoveSignalMutableState is used to remove a signal request ID that was previously recorded.  This is currently
1153
// used to clean execution info when signal decision finished.
1154
func (h *handlerImpl) RemoveSignalMutableState(
1155
        ctx context.Context,
1156
        wrappedRequest *types.RemoveSignalMutableStateRequest,
1157
) (retError error) {
6✔
1158

6✔
1159
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
12✔
1160
        h.startWG.Wait()
6✔
1161

6✔
1162
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRemoveSignalMutableStateScope)
6✔
1163
        defer sw.Stop()
6✔
1164

6✔
1165
        if h.isShuttingDown() {
6✔
1166
                return errShuttingDown
×
1167
        }
×
1168

1169
        domainID := wrappedRequest.GetDomainUUID()
6✔
1170
        if domainID == "" {
6✔
1171
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
1172
        }
×
1173

1174
        if ok := h.rateLimiter.Allow(); !ok {
6✔
1175
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1176
        }
×
1177

1178
        workflowExecution := wrappedRequest.WorkflowExecution
6✔
1179
        workflowID := workflowExecution.GetWorkflowID()
6✔
1180
        runID := workflowExecution.GetRunID()
6✔
1181
        engine, err1 := h.controller.GetEngine(workflowID)
6✔
1182
        if err1 != nil {
6✔
1183
                return h.error(err1, scope, domainID, workflowID, runID)
×
1184
        }
×
1185

1186
        err2 := engine.RemoveSignalMutableState(ctx, wrappedRequest)
6✔
1187
        if err2 != nil {
6✔
1188
                return h.error(err2, scope, domainID, workflowID, runID)
×
1189
        }
×
1190

1191
        return nil
6✔
1192
}
1193

1194
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event
1195
// in the history and immediately terminating the execution instance.
1196
func (h *handlerImpl) TerminateWorkflowExecution(
1197
        ctx context.Context,
1198
        wrappedRequest *types.HistoryTerminateWorkflowExecutionRequest,
1199
) (retError error) {
51✔
1200

51✔
1201
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
102✔
1202
        h.startWG.Wait()
51✔
1203

51✔
1204
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryTerminateWorkflowExecutionScope)
51✔
1205
        defer sw.Stop()
51✔
1206

51✔
1207
        if h.isShuttingDown() {
51✔
1208
                return errShuttingDown
×
1209
        }
×
1210

1211
        domainID := wrappedRequest.GetDomainUUID()
51✔
1212
        if domainID == "" {
51✔
1213
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
1214
        }
×
1215

1216
        if ok := h.rateLimiter.Allow(); !ok {
51✔
1217
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1218
        }
×
1219

1220
        workflowExecution := wrappedRequest.TerminateRequest.WorkflowExecution
51✔
1221
        workflowID := workflowExecution.GetWorkflowID()
51✔
1222
        runID := workflowExecution.GetRunID()
51✔
1223
        engine, err1 := h.controller.GetEngine(workflowID)
51✔
1224
        if err1 != nil {
51✔
1225
                return h.error(err1, scope, domainID, workflowID, runID)
×
1226
        }
×
1227

1228
        err2 := engine.TerminateWorkflowExecution(ctx, wrappedRequest)
51✔
1229
        if err2 != nil {
51✔
1230
                return h.error(err2, scope, domainID, workflowID, runID)
×
1231
        }
×
1232

1233
        return nil
51✔
1234
}
1235

1236
// ResetWorkflowExecution reset an existing workflow execution
1237
// in the history and immediately terminating the execution instance.
1238
func (h *handlerImpl) ResetWorkflowExecution(
1239
        ctx context.Context,
1240
        wrappedRequest *types.HistoryResetWorkflowExecutionRequest,
1241
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
15✔
1242

15✔
1243
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
30✔
1244
        h.startWG.Wait()
15✔
1245

15✔
1246
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryResetWorkflowExecutionScope)
15✔
1247
        defer sw.Stop()
15✔
1248

15✔
1249
        if h.isShuttingDown() {
15✔
1250
                return nil, errShuttingDown
×
1251
        }
×
1252

1253
        domainID := wrappedRequest.GetDomainUUID()
15✔
1254
        if domainID == "" {
15✔
1255
                return nil, h.error(errDomainNotSet, scope, domainID, "", "")
×
1256
        }
×
1257

1258
        if ok := h.rateLimiter.Allow(); !ok {
15✔
1259
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1260
        }
×
1261

1262
        workflowExecution := wrappedRequest.ResetRequest.WorkflowExecution
15✔
1263
        workflowID := workflowExecution.GetWorkflowID()
15✔
1264
        runID := workflowExecution.GetRunID()
15✔
1265
        engine, err1 := h.controller.GetEngine(workflowID)
15✔
1266
        if err1 != nil {
15✔
1267
                return nil, h.error(err1, scope, domainID, workflowID, runID)
×
1268
        }
×
1269

1270
        resp, err2 := engine.ResetWorkflowExecution(ctx, wrappedRequest)
15✔
1271
        if err2 != nil {
15✔
1272
                return nil, h.error(err2, scope, domainID, workflowID, runID)
×
1273
        }
×
1274

1275
        return resp, nil
15✔
1276
}
1277

1278
// QueryWorkflow queries a types.
1279
func (h *handlerImpl) QueryWorkflow(
1280
        ctx context.Context,
1281
        request *types.HistoryQueryWorkflowRequest,
1282
) (resp *types.HistoryQueryWorkflowResponse, retError error) {
45✔
1283
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
90✔
1284
        h.startWG.Wait()
45✔
1285

45✔
1286
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryQueryWorkflowScope)
45✔
1287
        defer sw.Stop()
45✔
1288

45✔
1289
        if h.isShuttingDown() {
45✔
1290
                return nil, errShuttingDown
×
1291
        }
×
1292

1293
        domainID := request.GetDomainUUID()
45✔
1294
        if domainID == "" {
45✔
1295
                return nil, h.error(errDomainNotSet, scope, domainID, "", "")
×
1296
        }
×
1297

1298
        if ok := h.rateLimiter.Allow(); !ok {
45✔
1299
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1300
        }
×
1301

1302
        workflowID := request.GetRequest().GetExecution().GetWorkflowID()
45✔
1303
        runID := request.GetRequest().GetExecution().GetRunID()
45✔
1304
        engine, err1 := h.controller.GetEngine(workflowID)
45✔
1305
        if err1 != nil {
45✔
1306
                return nil, h.error(err1, scope, domainID, workflowID, runID)
×
1307
        }
×
1308

1309
        resp, err2 := engine.QueryWorkflow(ctx, request)
45✔
1310
        if err2 != nil {
57✔
1311
                return nil, h.error(err2, scope, domainID, workflowID, runID)
12✔
1312
        }
12✔
1313

1314
        return resp, nil
33✔
1315
}
1316

1317
// ScheduleDecisionTask is used for creating a decision task for already started workflow execution.  This is mainly
1318
// used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts
1319
// child execution without creating the decision task and then calls this API after updating the mutable state of
1320
// parent execution.
1321
func (h *handlerImpl) ScheduleDecisionTask(
1322
        ctx context.Context,
1323
        request *types.ScheduleDecisionTaskRequest,
1324
) (retError error) {
18✔
1325

18✔
1326
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
36✔
1327
        h.startWG.Wait()
18✔
1328

18✔
1329
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryScheduleDecisionTaskScope)
18✔
1330
        defer sw.Stop()
18✔
1331

18✔
1332
        if h.isShuttingDown() {
18✔
1333
                return errShuttingDown
×
1334
        }
×
1335

1336
        domainID := request.GetDomainUUID()
18✔
1337
        if domainID == "" {
18✔
1338
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
1339
        }
×
1340

1341
        if ok := h.rateLimiter.Allow(); !ok {
18✔
1342
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1343
        }
×
1344

1345
        if request.WorkflowExecution == nil {
18✔
1346
                return h.error(errWorkflowExecutionNotSet, scope, domainID, "", "")
×
1347
        }
×
1348

1349
        workflowExecution := request.WorkflowExecution
18✔
1350
        workflowID := workflowExecution.GetWorkflowID()
18✔
1351
        runID := workflowExecution.GetRunID()
18✔
1352
        engine, err1 := h.controller.GetEngine(workflowID)
18✔
1353
        if err1 != nil {
18✔
1354
                return h.error(err1, scope, domainID, workflowID, runID)
×
1355
        }
×
1356

1357
        err2 := engine.ScheduleDecisionTask(ctx, request)
18✔
1358
        if err2 != nil {
18✔
1359
                return h.error(err2, scope, domainID, workflowID, runID)
×
1360
        }
×
1361

1362
        return nil
18✔
1363
}
1364

1365
// RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent.
1366
// This is mainly called by transfer queue processor during the processing of DeleteExecution task.
1367
func (h *handlerImpl) RecordChildExecutionCompleted(
1368
        ctx context.Context,
1369
        request *types.RecordChildExecutionCompletedRequest,
1370
) (retError error) {
18✔
1371

18✔
1372
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
36✔
1373
        h.startWG.Wait()
18✔
1374

18✔
1375
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordChildExecutionCompletedScope)
18✔
1376
        defer sw.Stop()
18✔
1377

18✔
1378
        if h.isShuttingDown() {
18✔
1379
                return errShuttingDown
×
1380
        }
×
1381

1382
        domainID := request.GetDomainUUID()
18✔
1383
        if domainID == "" {
18✔
1384
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
1385
        }
×
1386

1387
        if ok := h.rateLimiter.Allow(); !ok {
18✔
1388
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1389
        }
×
1390

1391
        if request.WorkflowExecution == nil {
18✔
1392
                return h.error(errWorkflowExecutionNotSet, scope, domainID, "", "")
×
1393
        }
×
1394

1395
        workflowExecution := request.WorkflowExecution
18✔
1396
        workflowID := workflowExecution.GetWorkflowID()
18✔
1397
        runID := workflowExecution.GetRunID()
18✔
1398
        engine, err1 := h.controller.GetEngine(workflowID)
18✔
1399
        if err1 != nil {
18✔
1400
                return h.error(err1, scope, domainID, workflowID, runID)
×
1401
        }
×
1402

1403
        err2 := engine.RecordChildExecutionCompleted(ctx, request)
18✔
1404
        if err2 != nil {
21✔
1405
                return h.error(err2, scope, domainID, workflowID, runID)
3✔
1406
        }
3✔
1407

1408
        return nil
15✔
1409
}
1410

1411
// ResetStickyTaskList reset the volatile information in mutable state of a given types.
1412
// Volatile information are the information related to client, such as:
1413
// 1. StickyTaskList
1414
// 2. StickyScheduleToStartTimeout
1415
// 3. ClientLibraryVersion
1416
// 4. ClientFeatureVersion
1417
// 5. ClientImpl
1418
func (h *handlerImpl) ResetStickyTaskList(
1419
        ctx context.Context,
1420
        resetRequest *types.HistoryResetStickyTaskListRequest,
1421
) (resp *types.HistoryResetStickyTaskListResponse, retError error) {
3✔
1422

3✔
1423
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
6✔
1424
        h.startWG.Wait()
3✔
1425

3✔
1426
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryResetStickyTaskListScope)
3✔
1427
        defer sw.Stop()
3✔
1428

3✔
1429
        if h.isShuttingDown() {
3✔
1430
                return nil, errShuttingDown
×
1431
        }
×
1432

1433
        domainID := resetRequest.GetDomainUUID()
3✔
1434
        if domainID == "" {
3✔
1435
                return nil, h.error(errDomainNotSet, scope, domainID, "", "")
×
1436
        }
×
1437

1438
        if ok := h.rateLimiter.Allow(); !ok {
3✔
1439
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1440
        }
×
1441

1442
        workflowID := resetRequest.Execution.GetWorkflowID()
3✔
1443
        runID := resetRequest.Execution.GetRunID()
3✔
1444
        engine, err := h.controller.GetEngine(workflowID)
3✔
1445
        if err != nil {
3✔
1446
                return nil, h.error(err, scope, domainID, workflowID, runID)
×
1447
        }
×
1448

1449
        resp, err = engine.ResetStickyTaskList(ctx, resetRequest)
3✔
1450
        if err != nil {
3✔
1451
                return nil, h.error(err, scope, domainID, workflowID, runID)
×
1452
        }
×
1453

1454
        return resp, nil
3✔
1455
}
1456

1457
// ReplicateEventsV2 is called by processor to replicate history events for passive domains
1458
func (h *handlerImpl) ReplicateEventsV2(
1459
        ctx context.Context,
1460
        replicateRequest *types.ReplicateEventsV2Request,
1461
) (retError error) {
3✔
1462

3✔
1463
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
6✔
1464
        h.startWG.Wait()
3✔
1465

3✔
1466
        if h.isShuttingDown() {
3✔
1467
                return errShuttingDown
×
1468
        }
×
1469

1470
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryReplicateEventsV2Scope)
3✔
1471
        defer sw.Stop()
3✔
1472

3✔
1473
        domainID := replicateRequest.GetDomainUUID()
3✔
1474
        if domainID == "" {
3✔
1475
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
1476
        }
×
1477

1478
        if ok := h.rateLimiter.Allow(); !ok {
3✔
1479
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1480
        }
×
1481

1482
        workflowExecution := replicateRequest.WorkflowExecution
3✔
1483
        workflowID := workflowExecution.GetWorkflowID()
3✔
1484
        runID := workflowExecution.GetRunID()
3✔
1485
        engine, err1 := h.controller.GetEngine(workflowID)
3✔
1486
        if err1 != nil {
3✔
1487
                return h.error(err1, scope, domainID, workflowID, runID)
×
1488
        }
×
1489

1490
        err2 := engine.ReplicateEventsV2(ctx, replicateRequest)
3✔
1491
        if err2 != nil {
3✔
1492
                return h.error(err2, scope, domainID, workflowID, runID)
×
1493
        }
×
1494

1495
        return nil
3✔
1496
}
1497

1498
// SyncShardStatus is called by processor to sync history shard information from another cluster
1499
func (h *handlerImpl) SyncShardStatus(
1500
        ctx context.Context,
1501
        syncShardStatusRequest *types.SyncShardStatusRequest,
1502
) (retError error) {
×
1503

×
1504
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1505
        h.startWG.Wait()
×
1506

×
1507
        scope, sw := h.startRequestProfile(ctx, metrics.HistorySyncShardStatusScope)
×
1508
        defer sw.Stop()
×
1509

×
1510
        if h.isShuttingDown() {
×
1511
                return errShuttingDown
×
1512
        }
×
1513

1514
        if ok := h.rateLimiter.Allow(); !ok {
×
1515
                return h.error(errHistoryHostThrottle, scope, "", "", "")
×
1516
        }
×
1517

1518
        if syncShardStatusRequest.SourceCluster == "" {
×
1519
                return h.error(errSourceClusterNotSet, scope, "", "", "")
×
1520
        }
×
1521

1522
        if syncShardStatusRequest.Timestamp == nil {
×
1523
                return h.error(errTimestampNotSet, scope, "", "", "")
×
1524
        }
×
1525

1526
        // shard ID is already provided in the request
1527
        engine, err := h.controller.GetEngineForShard(int(syncShardStatusRequest.GetShardID()))
×
1528
        if err != nil {
×
1529
                return h.error(err, scope, "", "", "")
×
1530
        }
×
1531

1532
        err = engine.SyncShardStatus(ctx, syncShardStatusRequest)
×
1533
        if err != nil {
×
1534
                return h.error(err, scope, "", "", "")
×
1535
        }
×
1536

1537
        return nil
×
1538
}
1539

1540
// SyncActivity is called by processor to sync activity
1541
func (h *handlerImpl) SyncActivity(
1542
        ctx context.Context,
1543
        syncActivityRequest *types.SyncActivityRequest,
1544
) (retError error) {
×
1545

×
1546
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1547
        h.startWG.Wait()
×
1548

×
1549
        scope, sw := h.startRequestProfile(ctx, metrics.HistorySyncActivityScope)
×
1550
        defer sw.Stop()
×
1551

×
1552
        if h.isShuttingDown() {
×
1553
                return errShuttingDown
×
1554
        }
×
1555

1556
        domainID := syncActivityRequest.GetDomainID()
×
1557
        if syncActivityRequest.DomainID == "" || uuid.Parse(syncActivityRequest.GetDomainID()) == nil {
×
1558
                return h.error(errDomainNotSet, scope, domainID, "", "")
×
1559
        }
×
1560

1561
        if ok := h.rateLimiter.Allow(); !ok {
×
1562
                return h.error(errHistoryHostThrottle, scope, domainID, "", "")
×
1563
        }
×
1564

1565
        if syncActivityRequest.WorkflowID == "" {
×
1566
                return h.error(errWorkflowIDNotSet, scope, domainID, "", "")
×
1567
        }
×
1568

1569
        if syncActivityRequest.RunID == "" || uuid.Parse(syncActivityRequest.GetRunID()) == nil {
×
1570
                return h.error(errRunIDNotValid, scope, domainID, "", "")
×
1571
        }
×
1572

1573
        workflowID := syncActivityRequest.GetWorkflowID()
×
1574
        runID := syncActivityRequest.GetRunID()
×
1575
        engine, err := h.controller.GetEngine(workflowID)
×
1576
        if err != nil {
×
1577
                return h.error(err, scope, domainID, workflowID, runID)
×
1578
        }
×
1579

1580
        err = engine.SyncActivity(ctx, syncActivityRequest)
×
1581
        if err != nil {
×
1582
                return h.error(err, scope, domainID, workflowID, runID)
×
1583
        }
×
1584

1585
        return nil
×
1586
}
1587

1588
// GetReplicationMessages is called by remote peers to get replicated messages for cross DC replication
1589
func (h *handlerImpl) GetReplicationMessages(
1590
        ctx context.Context,
1591
        request *types.GetReplicationMessagesRequest,
1592
) (resp *types.GetReplicationMessagesResponse, retError error) {
×
1593
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1594
        h.startWG.Wait()
×
1595

×
1596
        h.GetLogger().Debug("Received GetReplicationMessages call.")
×
1597

×
1598
        _, sw := h.startRequestProfile(ctx, metrics.HistoryGetReplicationMessagesScope)
×
1599
        defer sw.Stop()
×
1600

×
1601
        if h.isShuttingDown() {
×
1602
                return nil, errShuttingDown
×
1603
        }
×
1604

1605
        var wg sync.WaitGroup
×
1606
        wg.Add(len(request.Tokens))
×
1607
        result := new(sync.Map)
×
1608

×
1609
        for _, token := range request.Tokens {
×
1610
                go func(token *types.ReplicationToken) {
×
1611
                        defer wg.Done()
×
1612

×
1613
                        engine, err := h.controller.GetEngineForShard(int(token.GetShardID()))
×
1614
                        if err != nil {
×
1615
                                h.GetLogger().Warn("History engine not found for shard", tag.Error(err))
×
1616
                                return
×
1617
                        }
×
1618
                        tasks, err := engine.GetReplicationMessages(
×
1619
                                ctx,
×
1620
                                request.GetClusterName(),
×
1621
                                token.GetLastRetrievedMessageID(),
×
1622
                        )
×
1623
                        if err != nil {
×
1624
                                h.GetLogger().Warn("Failed to get replication tasks for shard", tag.Error(err))
×
1625
                                return
×
1626
                        }
×
1627

1628
                        result.Store(token.GetShardID(), tasks)
×
1629
                }(token)
1630
        }
1631

1632
        wg.Wait()
×
1633

×
1634
        responseSize := 0
×
1635
        maxResponseSize := h.config.MaxResponseSize
×
1636

×
1637
        messagesByShard := make(map[int32]*types.ReplicationMessages)
×
1638
        result.Range(func(key, value interface{}) bool {
×
1639
                shardID := key.(int32)
×
1640
                tasks := value.(*types.ReplicationMessages)
×
1641

×
1642
                size := proto.FromReplicationMessages(tasks).Size()
×
1643
                if (responseSize + size) >= maxResponseSize {
×
1644
                        // Log shards that did not fit for debugging purposes
×
1645
                        h.GetLogger().Warn("Replication messages did not fit in the response (history host)",
×
1646
                                tag.ShardID(int(shardID)),
×
1647
                                tag.ResponseSize(size),
×
1648
                                tag.ResponseTotalSize(responseSize),
×
1649
                                tag.ResponseMaxSize(maxResponseSize),
×
1650
                        )
×
1651
                } else {
×
1652
                        responseSize += size
×
1653
                        messagesByShard[shardID] = tasks
×
1654
                }
×
1655

1656
                return true
×
1657
        })
1658

1659
        h.GetLogger().Debug("GetReplicationMessages succeeded.")
×
1660

×
1661
        return &types.GetReplicationMessagesResponse{MessagesByShard: messagesByShard}, nil
×
1662
}
1663

1664
// GetDLQReplicationMessages is called by remote peers to get replicated messages for DLQ merging
1665
func (h *handlerImpl) GetDLQReplicationMessages(
1666
        ctx context.Context,
1667
        request *types.GetDLQReplicationMessagesRequest,
1668
) (resp *types.GetDLQReplicationMessagesResponse, retError error) {
×
1669
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1670
        h.startWG.Wait()
×
1671

×
1672
        _, sw := h.startRequestProfile(ctx, metrics.HistoryGetDLQReplicationMessagesScope)
×
1673
        defer sw.Stop()
×
1674

×
1675
        if h.isShuttingDown() {
×
1676
                return nil, errShuttingDown
×
1677
        }
×
1678

1679
        taskInfoPerExecution := map[definition.WorkflowIdentifier][]*types.ReplicationTaskInfo{}
×
1680
        // do batch based on workflow ID and run ID
×
1681
        for _, taskInfo := range request.GetTaskInfos() {
×
1682
                identity := definition.NewWorkflowIdentifier(
×
1683
                        taskInfo.GetDomainID(),
×
1684
                        taskInfo.GetWorkflowID(),
×
1685
                        taskInfo.GetRunID(),
×
1686
                )
×
1687
                if _, ok := taskInfoPerExecution[identity]; !ok {
×
1688
                        taskInfoPerExecution[identity] = []*types.ReplicationTaskInfo{}
×
1689
                }
×
1690
                taskInfoPerExecution[identity] = append(taskInfoPerExecution[identity], taskInfo)
×
1691
        }
1692

1693
        var wg sync.WaitGroup
×
1694
        wg.Add(len(taskInfoPerExecution))
×
1695
        tasksChan := make(chan *types.ReplicationTask, len(request.GetTaskInfos()))
×
1696
        handleTaskInfoPerExecution := func(taskInfos []*types.ReplicationTaskInfo) {
×
1697
                defer wg.Done()
×
1698
                if len(taskInfos) == 0 {
×
1699
                        return
×
1700
                }
×
1701

1702
                engine, err := h.controller.GetEngine(
×
1703
                        taskInfos[0].GetWorkflowID(),
×
1704
                )
×
1705
                if err != nil {
×
1706
                        h.GetLogger().Warn("History engine not found for workflow ID.", tag.Error(err))
×
1707
                        return
×
1708
                }
×
1709

1710
                tasks, err := engine.GetDLQReplicationMessages(
×
1711
                        ctx,
×
1712
                        taskInfos,
×
1713
                )
×
1714
                if err != nil {
×
1715
                        h.GetLogger().Error("Failed to get dlq replication tasks.", tag.Error(err))
×
1716
                        return
×
1717
                }
×
1718

1719
                for _, task := range tasks {
×
1720
                        tasksChan <- task
×
1721
                }
×
1722
        }
1723

1724
        for _, replicationTaskInfos := range taskInfoPerExecution {
×
1725
                go handleTaskInfoPerExecution(replicationTaskInfos)
×
1726
        }
×
1727
        wg.Wait()
×
1728
        close(tasksChan)
×
1729

×
1730
        replicationTasks := make([]*types.ReplicationTask, 0, len(tasksChan))
×
1731
        for task := range tasksChan {
×
1732
                replicationTasks = append(replicationTasks, task)
×
1733
        }
×
1734
        return &types.GetDLQReplicationMessagesResponse{
×
1735
                ReplicationTasks: replicationTasks,
×
1736
        }, nil
×
1737
}
1738

1739
// ReapplyEvents applies stale events to the current workflow and the current run
1740
func (h *handlerImpl) ReapplyEvents(
1741
        ctx context.Context,
1742
        request *types.HistoryReapplyEventsRequest,
1743
) (retError error) {
×
1744

×
1745
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1746
        h.startWG.Wait()
×
1747

×
1748
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryReapplyEventsScope)
×
1749
        defer sw.Stop()
×
1750

×
1751
        if h.isShuttingDown() {
×
1752
                return errShuttingDown
×
1753
        }
×
1754

1755
        domainID := request.GetDomainUUID()
×
1756
        workflowID := request.GetRequest().GetWorkflowExecution().GetWorkflowID()
×
1757
        runID := request.GetRequest().GetWorkflowExecution().GetRunID()
×
1758
        engine, err := h.controller.GetEngine(workflowID)
×
1759
        if err != nil {
×
1760
                return h.error(err, scope, domainID, workflowID, runID)
×
1761
        }
×
1762
        // deserialize history event object
1763
        historyEvents, err := h.GetPayloadSerializer().DeserializeBatchEvents(&persistence.DataBlob{
×
1764
                Encoding: common.EncodingTypeThriftRW,
×
1765
                Data:     request.GetRequest().GetEvents().GetData(),
×
1766
        })
×
1767
        if err != nil {
×
1768
                return h.error(err, scope, domainID, workflowID, runID)
×
1769
        }
×
1770

1771
        execution := request.GetRequest().GetWorkflowExecution()
×
1772
        if err := engine.ReapplyEvents(
×
1773
                ctx,
×
1774
                request.GetDomainUUID(),
×
1775
                execution.GetWorkflowID(),
×
1776
                execution.GetRunID(),
×
1777
                historyEvents,
×
1778
        ); err != nil {
×
1779
                return h.error(err, scope, domainID, workflowID, runID)
×
1780
        }
×
1781
        return nil
×
1782
}
1783

1784
func (h *handlerImpl) CountDLQMessages(
1785
        ctx context.Context,
1786
        request *types.CountDLQMessagesRequest,
1787
) (resp *types.HistoryCountDLQMessagesResponse, retError error) {
×
1788
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1789
        h.startWG.Wait()
×
1790

×
1791
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryCountDLQMessagesScope)
×
1792
        defer sw.Stop()
×
1793

×
1794
        if h.isShuttingDown() {
×
1795
                return nil, errShuttingDown
×
1796
        }
×
1797

1798
        g := &errgroup.Group{}
×
1799
        var mu sync.Mutex
×
1800
        entries := map[types.HistoryDLQCountKey]int64{}
×
1801
        for _, shardID := range h.controller.ShardIDs() {
×
1802
                shardID := shardID
×
1803
                g.Go(func() (e error) {
×
1804
                        defer func() { log.CapturePanic(recover(), h.GetLogger(), &e) }()
×
1805

1806
                        engine, err := h.controller.GetEngineForShard(int(shardID))
×
1807
                        if err != nil {
×
1808
                                return fmt.Errorf("dlq count for shard %d: %w", shardID, err)
×
1809
                        }
×
1810

1811
                        counts, err := engine.CountDLQMessages(ctx, request.ForceFetch)
×
1812
                        if err != nil {
×
1813
                                return fmt.Errorf("dlq count for shard %d: %w", shardID, err)
×
1814
                        }
×
1815

1816
                        mu.Lock()
×
1817
                        defer mu.Unlock()
×
1818
                        for sourceCluster, count := range counts {
×
1819
                                key := types.HistoryDLQCountKey{ShardID: shardID, SourceCluster: sourceCluster}
×
1820
                                entries[key] = count
×
1821
                        }
×
1822
                        return nil
×
1823
                })
1824
        }
1825

1826
        err := g.Wait()
×
1827
        return &types.HistoryCountDLQMessagesResponse{Entries: entries}, h.error(err, scope, "", "", "")
×
1828
}
1829

1830
// ReadDLQMessages reads replication DLQ messages
1831
func (h *handlerImpl) ReadDLQMessages(
1832
        ctx context.Context,
1833
        request *types.ReadDLQMessagesRequest,
1834
) (resp *types.ReadDLQMessagesResponse, retError error) {
×
1835

×
1836
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1837
        h.startWG.Wait()
×
1838

×
1839
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryReadDLQMessagesScope)
×
1840
        defer sw.Stop()
×
1841

×
1842
        if h.isShuttingDown() {
×
1843
                return nil, errShuttingDown
×
1844
        }
×
1845

1846
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
×
1847
        if err != nil {
×
1848
                return nil, h.error(err, scope, "", "", "")
×
1849
        }
×
1850

1851
        return engine.ReadDLQMessages(ctx, request)
×
1852
}
1853

1854
// PurgeDLQMessages deletes replication DLQ messages
1855
func (h *handlerImpl) PurgeDLQMessages(
1856
        ctx context.Context,
1857
        request *types.PurgeDLQMessagesRequest,
1858
) (retError error) {
×
1859

×
1860
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1861
        h.startWG.Wait()
×
1862

×
1863
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryPurgeDLQMessagesScope)
×
1864
        defer sw.Stop()
×
1865

×
1866
        if h.isShuttingDown() {
×
1867
                return errShuttingDown
×
1868
        }
×
1869

1870
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
×
1871
        if err != nil {
×
1872
                return h.error(err, scope, "", "", "")
×
1873
        }
×
1874

1875
        return engine.PurgeDLQMessages(ctx, request)
×
1876
}
1877

1878
// MergeDLQMessages reads and applies replication DLQ messages
1879
func (h *handlerImpl) MergeDLQMessages(
1880
        ctx context.Context,
1881
        request *types.MergeDLQMessagesRequest,
1882
) (resp *types.MergeDLQMessagesResponse, retError error) {
×
1883

×
1884
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1885
        h.startWG.Wait()
×
1886

×
1887
        if h.isShuttingDown() {
×
1888
                return nil, errShuttingDown
×
1889
        }
×
1890

1891
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryMergeDLQMessagesScope)
×
1892
        defer sw.Stop()
×
1893

×
1894
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
×
1895
        if err != nil {
×
1896
                return nil, h.error(err, scope, "", "", "")
×
1897
        }
×
1898

1899
        return engine.MergeDLQMessages(ctx, request)
×
1900
}
1901

1902
// RefreshWorkflowTasks refreshes all the tasks of a workflow
1903
func (h *handlerImpl) RefreshWorkflowTasks(
1904
        ctx context.Context,
1905
        request *types.HistoryRefreshWorkflowTasksRequest) (retError error) {
×
1906

×
1907
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRefreshWorkflowTasksScope)
×
1908
        defer sw.Stop()
×
1909

×
1910
        if h.isShuttingDown() {
×
1911
                return errShuttingDown
×
1912
        }
×
1913

1914
        domainID := request.DomainUIID
×
1915
        execution := request.GetRequest().GetExecution()
×
1916
        workflowID := execution.GetWorkflowID()
×
1917
        runID := execution.GetWorkflowID()
×
1918
        engine, err := h.controller.GetEngine(workflowID)
×
1919
        if err != nil {
×
1920
                return h.error(err, scope, domainID, workflowID, runID)
×
1921
        }
×
1922

1923
        err = engine.RefreshWorkflowTasks(
×
1924
                ctx,
×
1925
                domainID,
×
1926
                types.WorkflowExecution{
×
1927
                        WorkflowID: execution.WorkflowID,
×
1928
                        RunID:      execution.RunID,
×
1929
                },
×
1930
        )
×
1931

×
1932
        if err != nil {
×
1933
                return h.error(err, scope, domainID, workflowID, runID)
×
1934
        }
×
1935

1936
        return nil
×
1937
}
1938

1939
// NotifyFailoverMarkers sends the failover markers to failover coordinator.
1940
// The coordinator decides when the failover finishes based on received failover marker.
1941
func (h *handlerImpl) NotifyFailoverMarkers(
1942
        ctx context.Context,
1943
        request *types.NotifyFailoverMarkersRequest,
1944
) (retError error) {
×
1945

×
1946
        _, sw := h.startRequestProfile(ctx, metrics.HistoryNotifyFailoverMarkersScope)
×
1947
        defer sw.Stop()
×
1948

×
1949
        for _, token := range request.GetFailoverMarkerTokens() {
×
1950
                marker := token.GetFailoverMarker()
×
1951
                h.GetLogger().Debug("Handling failover maker", tag.WorkflowDomainID(marker.GetDomainID()))
×
1952
                h.failoverCoordinator.ReceiveFailoverMarkers(token.GetShardIDs(), token.GetFailoverMarker())
×
1953
        }
×
1954
        return nil
×
1955
}
1956

1957
func (h *handlerImpl) GetCrossClusterTasks(
1958
        ctx context.Context,
1959
        request *types.GetCrossClusterTasksRequest,
1960
) (resp *types.GetCrossClusterTasksResponse, retError error) {
1✔
1961
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
2✔
1962
        h.startWG.Wait()
1✔
1963

1✔
1964
        _, sw := h.startRequestProfile(ctx, metrics.HistoryGetCrossClusterTasksScope)
1✔
1965
        defer sw.Stop()
1✔
1966

1✔
1967
        if h.isShuttingDown() {
1✔
1968
                return nil, errShuttingDown
×
1969
        }
×
1970

1971
        ctx, cancel := common.CreateChildContext(ctx, 0.05)
1✔
1972
        defer cancel()
1✔
1973

1✔
1974
        futureByShardID := make(map[int32]future.Future, len(request.ShardIDs))
1✔
1975
        for _, shardID := range request.ShardIDs {
11✔
1976
                future, settable := future.NewFuture()
10✔
1977
                futureByShardID[shardID] = future
10✔
1978
                go func(shardID int32) {
20✔
1979
                        logger := h.GetLogger().WithTags(tag.ShardID(int(shardID)))
10✔
1980
                        engine, err := h.controller.GetEngineForShard(int(shardID))
10✔
1981
                        if err != nil {
10✔
1982
                                logger.Error("History engine not found for shard", tag.Error(err))
×
1983
                                var owner membership.HostInfo
×
1984
                                if info, err := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(int(shardID))); err == nil {
×
1985
                                        owner = info
×
1986
                                }
×
1987
                                settable.Set(nil, shard.CreateShardOwnershipLostError(h.GetHostInfo(), owner))
×
1988
                                return
×
1989
                        }
1990

1991
                        if tasks, err := engine.GetCrossClusterTasks(ctx, request.TargetCluster); err != nil {
17✔
1992
                                logger.Error("Failed to get cross cluster tasks", tag.Error(err))
7✔
1993
                                settable.Set(nil, h.convertError(err))
7✔
1994
                        } else {
10✔
1995
                                settable.Set(tasks, nil)
3✔
1996
                        }
3✔
1997
                }(shardID)
1998
        }
1999

2000
        response := &types.GetCrossClusterTasksResponse{
1✔
2001
                TasksByShard:       make(map[int32][]*types.CrossClusterTaskRequest),
1✔
2002
                FailedCauseByShard: make(map[int32]types.GetTaskFailedCause),
1✔
2003
        }
1✔
2004
        for shardID, future := range futureByShardID {
11✔
2005
                var taskRequests []*types.CrossClusterTaskRequest
10✔
2006
                if futureErr := future.Get(ctx, &taskRequests); futureErr != nil {
17✔
2007
                        response.FailedCauseByShard[shardID] = common.ConvertErrToGetTaskFailedCause(futureErr)
7✔
2008
                } else {
10✔
2009
                        response.TasksByShard[shardID] = taskRequests
3✔
2010
                }
3✔
2011
        }
2012
        // not using a waitGroup for created goroutines here
2013
        // as once all futures are unblocked,
2014
        // those goroutines will eventually be completed
2015

2016
        return response, nil
1✔
2017
}
2018

2019
func (h *handlerImpl) RespondCrossClusterTasksCompleted(
2020
        ctx context.Context,
2021
        request *types.RespondCrossClusterTasksCompletedRequest,
2022
) (resp *types.RespondCrossClusterTasksCompletedResponse, retError error) {
2✔
2023
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
4✔
2024
        h.startWG.Wait()
2✔
2025

2✔
2026
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondCrossClusterTasksCompletedScope)
2✔
2027
        defer sw.Stop()
2✔
2028

2✔
2029
        if h.isShuttingDown() {
2✔
2030
                return nil, errShuttingDown
×
2031
        }
×
2032

2033
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
2✔
2034
        if err != nil {
2✔
2035
                return nil, h.error(err, scope, "", "", "")
×
2036
        }
×
2037

2038
        err = engine.RespondCrossClusterTasksCompleted(ctx, request.TargetCluster, request.TaskResponses)
2✔
2039
        if err != nil {
2✔
2040
                return nil, h.error(err, scope, "", "", "")
×
2041
        }
×
2042

2043
        response := &types.RespondCrossClusterTasksCompletedResponse{}
2✔
2044
        if request.FetchNewTasks {
3✔
2045
                fetchTaskCtx, cancel := common.CreateChildContext(ctx, 0.05)
1✔
2046
                defer cancel()
1✔
2047

1✔
2048
                response.Tasks, err = engine.GetCrossClusterTasks(fetchTaskCtx, request.TargetCluster)
1✔
2049
                if err != nil {
1✔
2050
                        return nil, h.error(err, scope, "", "", "")
×
2051
                }
×
2052
        }
2053
        return response, nil
2✔
2054
}
2055

2056
func (h *handlerImpl) GetFailoverInfo(
2057
        ctx context.Context,
2058
        request *types.GetFailoverInfoRequest,
2059
) (resp *types.GetFailoverInfoResponse, retError error) {
×
2060
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
2061
        h.startWG.Wait()
×
2062

×
2063
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryGetFailoverInfoScope)
×
2064
        defer sw.Stop()
×
2065

×
2066
        if h.isShuttingDown() {
×
2067
                return nil, errShuttingDown
×
2068
        }
×
2069

2070
        resp, err := h.failoverCoordinator.GetFailoverInfo(request.GetDomainID())
×
2071
        if err != nil {
×
2072
                return nil, h.error(err, scope, request.GetDomainID(), "", "")
×
2073
        }
×
2074
        return resp, nil
×
2075
}
2076

2077
// convertError is a helper method to convert ShardOwnershipLostError from persistence layer returned by various
2078
// HistoryEngine API calls to ShardOwnershipLost error return by HistoryService for client to be redirected to the
2079
// correct shard.
2080
func (h *handlerImpl) convertError(err error) error {
184✔
2081
        switch err := err.(type) {
184✔
2082
        case *persistence.ShardOwnershipLostError:
×
2083
                info, err2 := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(err.ShardID))
×
2084
                if err2 != nil {
×
2085
                        return shard.CreateShardOwnershipLostError(h.GetHostInfo(), membership.HostInfo{})
×
2086
                }
×
2087

2088
                return shard.CreateShardOwnershipLostError(h.GetHostInfo(), info)
×
2089
        case *persistence.WorkflowExecutionAlreadyStartedError:
×
2090
                return &types.InternalServiceError{Message: err.Msg}
×
2091
        case *persistence.CurrentWorkflowConditionFailedError:
×
2092
                return &types.InternalServiceError{Message: err.Msg}
×
2093
        case *persistence.TransactionSizeLimitError:
×
2094
                return &types.BadRequestError{Message: err.Msg}
×
2095
        }
2096

2097
        return err
184✔
2098
}
2099

2100
func (h *handlerImpl) updateErrorMetric(
2101
        scope metrics.Scope,
2102
        domainID string,
2103
        workflowID string,
2104
        runID string,
2105
        err error,
2106
) {
177✔
2107

177✔
2108
        var yarpcE *yarpcerrors.Status
177✔
2109

177✔
2110
        var shardOwnershipLostError *types.ShardOwnershipLostError
177✔
2111
        var eventAlreadyStartedError *types.EventAlreadyStartedError
177✔
2112
        var badRequestError *types.BadRequestError
177✔
2113
        var domainNotActiveError *types.DomainNotActiveError
177✔
2114
        var workflowExecutionAlreadyStartedError *types.WorkflowExecutionAlreadyStartedError
177✔
2115
        var entityNotExistsError *types.EntityNotExistsError
177✔
2116
        var workflowExecutionAlreadyCompletedError *types.WorkflowExecutionAlreadyCompletedError
177✔
2117
        var cancellationAlreadyRequestedError *types.CancellationAlreadyRequestedError
177✔
2118
        var limitExceededError *types.LimitExceededError
177✔
2119
        var retryTaskV2Error *types.RetryTaskV2Error
177✔
2120
        var serviceBusyError *types.ServiceBusyError
177✔
2121
        var internalServiceError *types.InternalServiceError
177✔
2122

177✔
2123
        if err == context.DeadlineExceeded || err == context.Canceled {
182✔
2124
                scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
5✔
2125
                return
5✔
2126
        }
5✔
2127

2128
        if errors.As(err, &shardOwnershipLostError) {
173✔
2129
                scope.IncCounter(metrics.CadenceErrShardOwnershipLostCounter)
1✔
2130

1✔
2131
        } else if errors.As(err, &eventAlreadyStartedError) {
173✔
2132
                scope.IncCounter(metrics.CadenceErrEventAlreadyStartedCounter)
1✔
2133

1✔
2134
        } else if errors.As(err, &badRequestError) {
172✔
2135
                scope.IncCounter(metrics.CadenceErrBadRequestCounter)
1✔
2136

1✔
2137
        } else if errors.As(err, &domainNotActiveError) {
171✔
2138
                scope.IncCounter(metrics.CadenceErrDomainNotActiveCounter)
1✔
2139

1✔
2140
        } else if errors.As(err, &workflowExecutionAlreadyStartedError) {
194✔
2141
                scope.IncCounter(metrics.CadenceErrExecutionAlreadyStartedCounter)
25✔
2142

25✔
2143
        } else if errors.As(err, &entityNotExistsError) {
274✔
2144
                scope.IncCounter(metrics.CadenceErrEntityNotExistsCounter)
106✔
2145

106✔
2146
        } else if errors.As(err, &workflowExecutionAlreadyCompletedError) {
161✔
2147
                scope.IncCounter(metrics.CadenceErrWorkflowExecutionAlreadyCompletedCounter)
18✔
2148

18✔
2149
        } else if errors.As(err, &cancellationAlreadyRequestedError) {
41✔
2150
                scope.IncCounter(metrics.CadenceErrCancellationAlreadyRequestedCounter)
4✔
2151

4✔
2152
        } else if errors.As(err, &limitExceededError) {
20✔
2153
                scope.IncCounter(metrics.CadenceErrLimitExceededCounter)
1✔
2154

1✔
2155
        } else if errors.As(err, &retryTaskV2Error) {
16✔
2156
                scope.IncCounter(metrics.CadenceErrRetryTaskCounter)
1✔
2157

1✔
2158
        } else if errors.As(err, &serviceBusyError) {
15✔
2159
                scope.IncCounter(metrics.CadenceErrServiceBusyCounter)
1✔
2160

1✔
2161
        } else if errors.As(err, &yarpcE) {
14✔
2162

1✔
2163
                if yarpcE.Code() == yarpcerrors.CodeDeadlineExceeded {
2✔
2164
                        scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
1✔
2165
                }
1✔
2166
                scope.IncCounter(metrics.CadenceFailures)
1✔
2167

2168
        } else if errors.As(err, &internalServiceError) {
11✔
2169
                scope.IncCounter(metrics.CadenceFailures)
×
2170
                h.GetLogger().Error("Internal service error",
×
2171
                        tag.Error(err),
×
2172
                        tag.WorkflowID(workflowID),
×
2173
                        tag.WorkflowRunID(runID),
×
2174
                        tag.WorkflowDomainID(domainID))
×
2175

×
2176
        } else {
11✔
2177
                // Default / unknown error fallback
11✔
2178
                scope.IncCounter(metrics.CadenceFailures)
11✔
2179
                h.GetLogger().Error("Uncategorized error",
11✔
2180
                        tag.Error(err),
11✔
2181
                        tag.WorkflowID(workflowID),
11✔
2182
                        tag.WorkflowRunID(runID),
11✔
2183
                        tag.WorkflowDomainID(domainID))
11✔
2184
        }
11✔
2185
}
2186

2187
func (h *handlerImpl) error(
2188
        err error,
2189
        scope metrics.Scope,
2190
        domainID string,
2191
        workflowID string,
2192
        runID string,
2193
) error {
177✔
2194
        err = h.convertError(err)
177✔
2195

177✔
2196
        h.updateErrorMetric(scope, domainID, workflowID, runID, err)
177✔
2197
        return err
177✔
2198
}
177✔
2199

2200
func (h *handlerImpl) emitInfoOrDebugLog(
2201
        domainID string,
2202
        msg string,
2203
        tags ...tag.Tag,
2204
) {
1,426✔
2205
        if h.config.EnableDebugMode && h.config.EnableTaskInfoLogByDomainID(domainID) {
1,426✔
2206
                h.GetLogger().Info(msg, tags...)
×
2207
        } else {
1,426✔
2208
                h.GetLogger().Debug(msg, tags...)
1,426✔
2209
        }
1,426✔
2210
}
2211

2212
func (h *handlerImpl) startRequestProfile(ctx context.Context, scope int) (metrics.Scope, metrics.Stopwatch) {
5,548✔
2213
        metricsScope := h.GetMetricsClient().Scope(scope, metrics.GetContextTags(ctx)...)
5,548✔
2214
        metricsScope.IncCounter(metrics.CadenceRequests)
5,548✔
2215
        sw := metricsScope.StartTimer(metrics.CadenceLatency)
5,548✔
2216
        return metricsScope, sw
5,548✔
2217
}
5,548✔
2218

2219
func validateTaskToken(token *common.TaskToken) error {
1,804✔
2220
        if token.WorkflowID == "" {
1,804✔
2221
                return errWorkflowIDNotSet
×
2222
        }
×
2223
        if token.RunID != "" && uuid.Parse(token.RunID) == nil {
1,804✔
2224
                return errRunIDNotValid
×
2225
        }
×
2226
        return nil
1,804✔
2227
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc