• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.15
/service/history/handler.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination handler_mock.go -package history github.com/uber/cadence/service/history Handler
22

23
package history
24

25
import (
26
        "context"
27
        "errors"
28
        "fmt"
29
        "strconv"
30
        "sync"
31
        "sync/atomic"
32
        "time"
33

34
        "golang.org/x/sync/errgroup"
35

36
        "github.com/uber/cadence/common/membership"
37
        "github.com/uber/cadence/common/types/mapper/proto"
38

39
        "github.com/pborman/uuid"
40
        "go.uber.org/yarpc/yarpcerrors"
41

42
        "github.com/uber/cadence/common"
43
        "github.com/uber/cadence/common/definition"
44
        "github.com/uber/cadence/common/future"
45
        "github.com/uber/cadence/common/log"
46
        "github.com/uber/cadence/common/log/tag"
47
        "github.com/uber/cadence/common/metrics"
48
        "github.com/uber/cadence/common/persistence"
49
        "github.com/uber/cadence/common/quotas"
50
        "github.com/uber/cadence/common/service"
51
        "github.com/uber/cadence/common/types"
52
        "github.com/uber/cadence/service/history/config"
53
        "github.com/uber/cadence/service/history/engine"
54
        "github.com/uber/cadence/service/history/events"
55
        "github.com/uber/cadence/service/history/failover"
56
        "github.com/uber/cadence/service/history/replication"
57
        "github.com/uber/cadence/service/history/resource"
58
        "github.com/uber/cadence/service/history/shard"
59
        "github.com/uber/cadence/service/history/task"
60
)
61

62
const shardOwnershipTransferDelay = 5 * time.Second
63

64
type (
65
        // Handler interface for history service
66
        Handler interface {
67
                common.Daemon
68

69
                PrepareToStop(time.Duration) time.Duration
70
                Health(context.Context) (*types.HealthStatus, error)
71
                CloseShard(context.Context, *types.CloseShardRequest) error
72
                DescribeHistoryHost(context.Context, *types.DescribeHistoryHostRequest) (*types.DescribeHistoryHostResponse, error)
73
                DescribeMutableState(context.Context, *types.DescribeMutableStateRequest) (*types.DescribeMutableStateResponse, error)
74
                DescribeQueue(context.Context, *types.DescribeQueueRequest) (*types.DescribeQueueResponse, error)
75
                DescribeWorkflowExecution(context.Context, *types.HistoryDescribeWorkflowExecutionRequest) (*types.DescribeWorkflowExecutionResponse, error)
76
                GetCrossClusterTasks(context.Context, *types.GetCrossClusterTasksRequest) (*types.GetCrossClusterTasksResponse, error)
77
                CountDLQMessages(context.Context, *types.CountDLQMessagesRequest) (*types.HistoryCountDLQMessagesResponse, error)
78
                GetDLQReplicationMessages(context.Context, *types.GetDLQReplicationMessagesRequest) (*types.GetDLQReplicationMessagesResponse, error)
79
                GetMutableState(context.Context, *types.GetMutableStateRequest) (*types.GetMutableStateResponse, error)
80
                GetReplicationMessages(context.Context, *types.GetReplicationMessagesRequest) (*types.GetReplicationMessagesResponse, error)
81
                MergeDLQMessages(context.Context, *types.MergeDLQMessagesRequest) (*types.MergeDLQMessagesResponse, error)
82
                NotifyFailoverMarkers(context.Context, *types.NotifyFailoverMarkersRequest) error
83
                PollMutableState(context.Context, *types.PollMutableStateRequest) (*types.PollMutableStateResponse, error)
84
                PurgeDLQMessages(context.Context, *types.PurgeDLQMessagesRequest) error
85
                QueryWorkflow(context.Context, *types.HistoryQueryWorkflowRequest) (*types.HistoryQueryWorkflowResponse, error)
86
                ReadDLQMessages(context.Context, *types.ReadDLQMessagesRequest) (*types.ReadDLQMessagesResponse, error)
87
                ReapplyEvents(context.Context, *types.HistoryReapplyEventsRequest) error
88
                RecordActivityTaskHeartbeat(context.Context, *types.HistoryRecordActivityTaskHeartbeatRequest) (*types.RecordActivityTaskHeartbeatResponse, error)
89
                RecordActivityTaskStarted(context.Context, *types.RecordActivityTaskStartedRequest) (*types.RecordActivityTaskStartedResponse, error)
90
                RecordChildExecutionCompleted(context.Context, *types.RecordChildExecutionCompletedRequest) error
91
                RecordDecisionTaskStarted(context.Context, *types.RecordDecisionTaskStartedRequest) (*types.RecordDecisionTaskStartedResponse, error)
92
                RefreshWorkflowTasks(context.Context, *types.HistoryRefreshWorkflowTasksRequest) error
93
                RemoveSignalMutableState(context.Context, *types.RemoveSignalMutableStateRequest) error
94
                RemoveTask(context.Context, *types.RemoveTaskRequest) error
95
                ReplicateEventsV2(context.Context, *types.ReplicateEventsV2Request) error
96
                RequestCancelWorkflowExecution(context.Context, *types.HistoryRequestCancelWorkflowExecutionRequest) error
97
                ResetQueue(context.Context, *types.ResetQueueRequest) error
98
                ResetStickyTaskList(context.Context, *types.HistoryResetStickyTaskListRequest) (*types.HistoryResetStickyTaskListResponse, error)
99
                ResetWorkflowExecution(context.Context, *types.HistoryResetWorkflowExecutionRequest) (*types.ResetWorkflowExecutionResponse, error)
100
                RespondActivityTaskCanceled(context.Context, *types.HistoryRespondActivityTaskCanceledRequest) error
101
                RespondActivityTaskCompleted(context.Context, *types.HistoryRespondActivityTaskCompletedRequest) error
102
                RespondActivityTaskFailed(context.Context, *types.HistoryRespondActivityTaskFailedRequest) error
103
                RespondCrossClusterTasksCompleted(context.Context, *types.RespondCrossClusterTasksCompletedRequest) (*types.RespondCrossClusterTasksCompletedResponse, error)
104
                RespondDecisionTaskCompleted(context.Context, *types.HistoryRespondDecisionTaskCompletedRequest) (*types.HistoryRespondDecisionTaskCompletedResponse, error)
105
                RespondDecisionTaskFailed(context.Context, *types.HistoryRespondDecisionTaskFailedRequest) error
106
                ScheduleDecisionTask(context.Context, *types.ScheduleDecisionTaskRequest) error
107
                SignalWithStartWorkflowExecution(context.Context, *types.HistorySignalWithStartWorkflowExecutionRequest) (*types.StartWorkflowExecutionResponse, error)
108
                SignalWorkflowExecution(context.Context, *types.HistorySignalWorkflowExecutionRequest) error
109
                StartWorkflowExecution(context.Context, *types.HistoryStartWorkflowExecutionRequest) (*types.StartWorkflowExecutionResponse, error)
110
                SyncActivity(context.Context, *types.SyncActivityRequest) error
111
                SyncShardStatus(context.Context, *types.SyncShardStatusRequest) error
112
                TerminateWorkflowExecution(context.Context, *types.HistoryTerminateWorkflowExecutionRequest) error
113
                GetFailoverInfo(context.Context, *types.GetFailoverInfoRequest) (*types.GetFailoverInfoResponse, error)
114
        }
115

116
        // handlerImpl is an implementation for history service independent of wire protocol
117
        handlerImpl struct {
118
                resource.Resource
119

120
                shuttingDown             int32
121
                controller               shard.Controller
122
                tokenSerializer          common.TaskTokenSerializer
123
                startWG                  sync.WaitGroup
124
                config                   *config.Config
125
                historyEventNotifier     events.Notifier
126
                rateLimiter              quotas.Limiter
127
                crossClusterTaskFetchers task.Fetchers
128
                replicationTaskFetchers  replication.TaskFetchers
129
                queueTaskProcessor       task.Processor
130
                failoverCoordinator      failover.Coordinator
131
        }
132
)
133

134
var _ Handler = (*handlerImpl)(nil)
135
var _ shard.EngineFactory = (*handlerImpl)(nil)
136

137
var (
138
        errDomainNotSet            = &types.BadRequestError{Message: "Domain not set on request."}
139
        errWorkflowExecutionNotSet = &types.BadRequestError{Message: "WorkflowExecution not set on request."}
140
        errTaskListNotSet          = &types.BadRequestError{Message: "Tasklist not set."}
141
        errWorkflowIDNotSet        = &types.BadRequestError{Message: "WorkflowId is not set on request."}
142
        errRunIDNotValid           = &types.BadRequestError{Message: "RunID is not valid UUID."}
143
        errSourceClusterNotSet     = &types.BadRequestError{Message: "Source Cluster not set on request."}
144
        errTimestampNotSet         = &types.BadRequestError{Message: "Timestamp not set on request."}
145
        errInvalidTaskType         = &types.BadRequestError{Message: "Invalid task type"}
146
        errHistoryHostThrottle     = &types.ServiceBusyError{Message: "History host rps exceeded"}
147
        errShuttingDown            = &types.InternalServiceError{Message: "Shutting down"}
148
)
149

150
// NewHandler creates a thrift handler for the history service
151
func NewHandler(
152
        resource resource.Resource,
153
        config *config.Config,
154
) Handler {
18✔
155
        handler := &handlerImpl{
18✔
156
                Resource:        resource,
18✔
157
                config:          config,
18✔
158
                tokenSerializer: common.NewJSONTaskTokenSerializer(),
18✔
159
                rateLimiter:     quotas.NewDynamicRateLimiter(config.RPS.AsFloat64()),
18✔
160
        }
18✔
161

18✔
162
        // prevent us from trying to serve requests before shard controller is started and ready
18✔
163
        handler.startWG.Add(1)
18✔
164
        return handler
18✔
165
}
18✔
166

167
// Start starts the handler
168
func (h *handlerImpl) Start() {
15✔
169
        h.crossClusterTaskFetchers = task.NewCrossClusterTaskFetchers(
15✔
170
                h.GetClusterMetadata(),
15✔
171
                h.GetClientBean(),
15✔
172
                &task.FetcherOptions{
15✔
173
                        Parallelism:                h.config.CrossClusterFetcherParallelism,
15✔
174
                        AggregationInterval:        h.config.CrossClusterFetcherAggregationInterval,
15✔
175
                        ServiceBusyBackoffInterval: h.config.CrossClusterFetcherServiceBusyBackoffInterval,
15✔
176
                        ErrorRetryInterval:         h.config.CrossClusterFetcherErrorBackoffInterval,
15✔
177
                        TimerJitterCoefficient:     h.config.CrossClusterFetcherJitterCoefficient,
15✔
178
                },
15✔
179
                h.GetMetricsClient(),
15✔
180
                h.GetLogger(),
15✔
181
        )
15✔
182
        h.crossClusterTaskFetchers.Start()
15✔
183

15✔
184
        h.replicationTaskFetchers = replication.NewTaskFetchers(
15✔
185
                h.GetLogger(),
15✔
186
                h.config,
15✔
187
                h.GetClusterMetadata(),
15✔
188
                h.GetClientBean(),
15✔
189
        )
15✔
190

15✔
191
        h.replicationTaskFetchers.Start()
15✔
192

15✔
193
        var err error
15✔
194
        taskPriorityAssigner := task.NewPriorityAssigner(
15✔
195
                h.GetClusterMetadata().GetCurrentClusterName(),
15✔
196
                h.GetDomainCache(),
15✔
197
                h.GetLogger(),
15✔
198
                h.GetMetricsClient(),
15✔
199
                h.config,
15✔
200
        )
15✔
201

15✔
202
        h.queueTaskProcessor, err = task.NewProcessor(
15✔
203
                taskPriorityAssigner,
15✔
204
                h.config,
15✔
205
                h.GetLogger(),
15✔
206
                h.GetMetricsClient(),
15✔
207
        )
15✔
208
        if err != nil {
15✔
209
                h.GetLogger().Fatal("Creating priority task processor failed", tag.Error(err))
×
210
        }
×
211
        h.queueTaskProcessor.Start()
15✔
212

15✔
213
        h.controller = shard.NewShardController(
15✔
214
                h.Resource,
15✔
215
                h,
15✔
216
                h.config,
15✔
217
        )
15✔
218
        h.historyEventNotifier = events.NewNotifier(h.GetTimeSource(), h.GetMetricsClient(), h.config.GetShardID)
15✔
219
        // events notifier must starts before controller
15✔
220
        h.historyEventNotifier.Start()
15✔
221

15✔
222
        h.failoverCoordinator = failover.NewCoordinator(
15✔
223
                h.GetDomainManager(),
15✔
224
                h.GetHistoryClient(),
15✔
225
                h.GetTimeSource(),
15✔
226
                h.GetDomainCache(),
15✔
227
                h.config,
15✔
228
                h.GetMetricsClient(),
15✔
229
                h.GetLogger(),
15✔
230
        )
15✔
231
        if h.config.EnableGracefulFailover() {
30✔
232
                h.failoverCoordinator.Start()
15✔
233
        }
15✔
234

235
        h.controller.Start()
15✔
236

15✔
237
        h.startWG.Done()
15✔
238
}
239

240
// Stop stops the handler
241
func (h *handlerImpl) Stop() {
15✔
242
        h.prepareToShutDown()
15✔
243
        h.crossClusterTaskFetchers.Stop()
15✔
244
        h.replicationTaskFetchers.Stop()
15✔
245
        h.queueTaskProcessor.Stop()
15✔
246
        h.controller.Stop()
15✔
247
        h.historyEventNotifier.Stop()
15✔
248
        h.failoverCoordinator.Stop()
15✔
249
}
15✔
250

251
// PrepareToStop starts graceful traffic drain in preparation for shutdown
252
func (h *handlerImpl) PrepareToStop(remainingTime time.Duration) time.Duration {
15✔
253
        h.GetLogger().Info("ShutdownHandler: Initiating shardController shutdown")
15✔
254
        h.controller.PrepareToStop()
15✔
255
        h.GetLogger().Info("ShutdownHandler: Waiting for traffic to drain")
15✔
256
        remainingTime = common.SleepWithMinDuration(shardOwnershipTransferDelay, remainingTime)
15✔
257
        h.GetLogger().Info("ShutdownHandler: No longer taking rpc requests")
15✔
258
        h.prepareToShutDown()
15✔
259
        return remainingTime
15✔
260
}
15✔
261

262
func (h *handlerImpl) prepareToShutDown() {
27✔
263
        atomic.StoreInt32(&h.shuttingDown, 1)
27✔
264
}
27✔
265

266
func (h *handlerImpl) isShuttingDown() bool {
939✔
267
        return atomic.LoadInt32(&h.shuttingDown) != 0
939✔
268
}
939✔
269

270
// CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard
271
func (h *handlerImpl) CreateEngine(
272
        shardContext shard.Context,
273
) engine.Engine {
51✔
274
        return NewEngineWithShardContext(
51✔
275
                shardContext,
51✔
276
                h.GetVisibilityManager(),
51✔
277
                h.GetMatchingClient(),
51✔
278
                h.GetSDKClient(),
51✔
279
                h.historyEventNotifier,
51✔
280
                h.config,
51✔
281
                h.crossClusterTaskFetchers,
51✔
282
                h.replicationTaskFetchers,
51✔
283
                h.GetMatchingRawClient(),
51✔
284
                h.queueTaskProcessor,
51✔
285
                h.failoverCoordinator,
51✔
286
        )
51✔
287
}
51✔
288

289
// Health is for health check
290
func (h *handlerImpl) Health(ctx context.Context) (*types.HealthStatus, error) {
×
291
        h.startWG.Wait()
×
292
        h.GetLogger().Debug("History health check endpoint reached.")
×
293
        hs := &types.HealthStatus{Ok: true, Msg: "OK"}
×
294
        return hs, nil
×
295
}
×
296

297
// RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
298
func (h *handlerImpl) RecordActivityTaskHeartbeat(
299
        ctx context.Context,
300
        wrappedRequest *types.HistoryRecordActivityTaskHeartbeatRequest,
301
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
381✔
302

381✔
303
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
762✔
304
        h.startWG.Wait()
381✔
305

381✔
306
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordActivityTaskHeartbeatScope)
381✔
307
        defer sw.Stop()
381✔
308

381✔
309
        domainID := wrappedRequest.GetDomainUUID()
381✔
310
        if domainID == "" {
381✔
311
                return nil, h.error(errDomainNotSet, scope, domainID, "")
×
312
        }
×
313

314
        if ok := h.rateLimiter.Allow(); !ok {
381✔
315
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "")
×
316
        }
×
317

318
        heartbeatRequest := wrappedRequest.HeartbeatRequest
381✔
319
        token, err0 := h.tokenSerializer.Deserialize(heartbeatRequest.TaskToken)
381✔
320
        if err0 != nil {
381✔
321
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
322
                return nil, h.error(err0, scope, domainID, "")
×
323
        }
×
324

325
        err0 = validateTaskToken(token)
381✔
326
        if err0 != nil {
381✔
327
                return nil, h.error(err0, scope, domainID, "")
×
328
        }
×
329
        workflowID := token.WorkflowID
381✔
330

381✔
331
        engine, err1 := h.controller.GetEngine(workflowID)
381✔
332
        if err1 != nil {
381✔
333
                return nil, h.error(err1, scope, domainID, workflowID)
×
334
        }
×
335

336
        response, err2 := engine.RecordActivityTaskHeartbeat(ctx, wrappedRequest)
381✔
337
        if err2 != nil {
381✔
338
                return nil, h.error(err2, scope, domainID, workflowID)
×
339
        }
×
340

341
        return response, nil
381✔
342
}
343

344
// RecordActivityTaskStarted - Record Activity Task started.
345
func (h *handlerImpl) RecordActivityTaskStarted(
346
        ctx context.Context,
347
        recordRequest *types.RecordActivityTaskStartedRequest,
348
) (resp *types.RecordActivityTaskStartedResponse, retError error) {
330✔
349

330✔
350
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
660✔
351
        h.startWG.Wait()
330✔
352

330✔
353
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordActivityTaskStartedScope)
330✔
354
        defer sw.Stop()
330✔
355

330✔
356
        domainID := recordRequest.GetDomainUUID()
330✔
357
        workflowExecution := recordRequest.WorkflowExecution
330✔
358
        workflowID := workflowExecution.GetWorkflowID()
330✔
359

330✔
360
        h.emitInfoOrDebugLog(
330✔
361
                domainID,
330✔
362
                "RecordActivityTaskStarted",
330✔
363
                tag.WorkflowDomainID(domainID),
330✔
364
                tag.WorkflowID(workflowExecution.GetWorkflowID()),
330✔
365
                tag.WorkflowRunID(recordRequest.WorkflowExecution.RunID),
330✔
366
                tag.WorkflowScheduleID(recordRequest.GetScheduleID()),
330✔
367
        )
330✔
368

330✔
369
        if recordRequest.GetDomainUUID() == "" {
330✔
370
                return nil, h.error(errDomainNotSet, scope, domainID, workflowID)
×
371
        }
×
372

373
        if ok := h.rateLimiter.Allow(); !ok {
330✔
374
                return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID)
×
375
        }
×
376

377
        engine, err1 := h.controller.GetEngine(workflowID)
330✔
378
        if err1 != nil {
330✔
379
                return nil, h.error(err1, scope, domainID, workflowID)
×
380
        }
×
381

382
        response, err2 := engine.RecordActivityTaskStarted(ctx, recordRequest)
330✔
383
        if err2 != nil {
336✔
384
                return nil, h.error(err2, scope, domainID, workflowID)
6✔
385
        }
6✔
386

387
        return response, nil
324✔
388
}
389

390
// RecordDecisionTaskStarted - Record Decision Task started.
391
func (h *handlerImpl) RecordDecisionTaskStarted(
392
        ctx context.Context,
393
        recordRequest *types.RecordDecisionTaskStartedRequest,
394
) (resp *types.RecordDecisionTaskStartedResponse, retError error) {
1,096✔
395

1,096✔
396
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
2,192✔
397
        h.startWG.Wait()
1,096✔
398

1,096✔
399
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordDecisionTaskStartedScope)
1,096✔
400
        defer sw.Stop()
1,096✔
401

1,096✔
402
        domainID := recordRequest.GetDomainUUID()
1,096✔
403
        workflowExecution := recordRequest.WorkflowExecution
1,096✔
404
        workflowID := workflowExecution.GetWorkflowID()
1,096✔
405

1,096✔
406
        h.emitInfoOrDebugLog(
1,096✔
407
                domainID,
1,096✔
408
                "RecordDecisionTaskStarted",
1,096✔
409
                tag.WorkflowDomainID(domainID),
1,096✔
410
                tag.WorkflowID(workflowExecution.GetWorkflowID()),
1,096✔
411
                tag.WorkflowRunID(recordRequest.WorkflowExecution.RunID),
1,096✔
412
                tag.WorkflowScheduleID(recordRequest.GetScheduleID()),
1,096✔
413
        )
1,096✔
414

1,096✔
415
        if domainID == "" {
1,096✔
416
                return nil, h.error(errDomainNotSet, scope, domainID, workflowID)
×
417
        }
×
418

419
        if ok := h.rateLimiter.Allow(); !ok {
1,096✔
420
                return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID)
×
421
        }
×
422

423
        if recordRequest.PollRequest == nil || recordRequest.PollRequest.TaskList.GetName() == "" {
1,096✔
424
                return nil, h.error(errTaskListNotSet, scope, domainID, workflowID)
×
425
        }
×
426

427
        engine, err1 := h.controller.GetEngine(workflowID)
1,096✔
428
        if err1 != nil {
1,096✔
429
                h.GetLogger().Error("RecordDecisionTaskStarted failed.",
×
430
                        tag.Error(err1),
×
431
                        tag.WorkflowID(recordRequest.WorkflowExecution.GetWorkflowID()),
×
432
                        tag.WorkflowRunID(recordRequest.WorkflowExecution.GetRunID()),
×
433
                        tag.WorkflowScheduleID(recordRequest.GetScheduleID()),
×
434
                )
×
435
                return nil, h.error(err1, scope, domainID, workflowID)
×
436
        }
×
437

438
        response, err2 := engine.RecordDecisionTaskStarted(ctx, recordRequest)
1,096✔
439
        if err2 != nil {
1,117✔
440
                return nil, h.error(err2, scope, domainID, workflowID)
21✔
441
        }
21✔
442

443
        return response, nil
1,075✔
444
}
445

446
// RespondActivityTaskCompleted - records completion of an activity task
447
func (h *handlerImpl) RespondActivityTaskCompleted(
448
        ctx context.Context,
449
        wrappedRequest *types.HistoryRespondActivityTaskCompletedRequest,
450
) (retError error) {
321✔
451

321✔
452
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
642✔
453
        h.startWG.Wait()
321✔
454

321✔
455
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondActivityTaskCompletedScope)
321✔
456
        defer sw.Stop()
321✔
457

321✔
458
        domainID := wrappedRequest.GetDomainUUID()
321✔
459
        if domainID == "" {
321✔
460
                return h.error(errDomainNotSet, scope, domainID, "")
×
461
        }
×
462

463
        if ok := h.rateLimiter.Allow(); !ok {
321✔
464
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
465
        }
×
466

467
        completeRequest := wrappedRequest.CompleteRequest
321✔
468
        token, err0 := h.tokenSerializer.Deserialize(completeRequest.TaskToken)
321✔
469
        if err0 != nil {
321✔
470
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
471
                return h.error(err0, scope, domainID, "")
×
472
        }
×
473

474
        err0 = validateTaskToken(token)
321✔
475
        if err0 != nil {
321✔
476
                return h.error(err0, scope, domainID, "")
×
477
        }
×
478
        workflowID := token.WorkflowID
321✔
479

321✔
480
        engine, err1 := h.controller.GetEngine(workflowID)
321✔
481
        if err1 != nil {
321✔
482
                return h.error(err1, scope, domainID, workflowID)
×
483
        }
×
484

485
        err2 := engine.RespondActivityTaskCompleted(ctx, wrappedRequest)
321✔
486
        if err2 != nil {
366✔
487
                return h.error(err2, scope, domainID, workflowID)
45✔
488
        }
45✔
489

490
        return nil
276✔
491
}
492

493
// RespondActivityTaskFailed - records failure of an activity task
494
func (h *handlerImpl) RespondActivityTaskFailed(
495
        ctx context.Context,
496
        wrappedRequest *types.HistoryRespondActivityTaskFailedRequest,
497
) (retError error) {
12✔
498

12✔
499
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
24✔
500
        h.startWG.Wait()
12✔
501

12✔
502
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondActivityTaskFailedScope)
12✔
503
        defer sw.Stop()
12✔
504

12✔
505
        domainID := wrappedRequest.GetDomainUUID()
12✔
506
        if domainID == "" {
12✔
507
                return h.error(errDomainNotSet, scope, domainID, "")
×
508
        }
×
509

510
        if ok := h.rateLimiter.Allow(); !ok {
12✔
511
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
512
        }
×
513

514
        failRequest := wrappedRequest.FailedRequest
12✔
515
        token, err0 := h.tokenSerializer.Deserialize(failRequest.TaskToken)
12✔
516
        if err0 != nil {
12✔
517
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
518
                return h.error(err0, scope, domainID, "")
×
519
        }
×
520

521
        err0 = validateTaskToken(token)
12✔
522
        if err0 != nil {
12✔
523
                return h.error(err0, scope, domainID, "")
×
524
        }
×
525
        workflowID := token.WorkflowID
12✔
526

12✔
527
        engine, err1 := h.controller.GetEngine(workflowID)
12✔
528
        if err1 != nil {
12✔
529
                return h.error(err1, scope, domainID, workflowID)
×
530
        }
×
531

532
        err2 := engine.RespondActivityTaskFailed(ctx, wrappedRequest)
12✔
533
        if err2 != nil {
12✔
534
                return h.error(err2, scope, domainID, workflowID)
×
535
        }
×
536

537
        return nil
12✔
538
}
539

540
// RespondActivityTaskCanceled - records failure of an activity task
541
func (h *handlerImpl) RespondActivityTaskCanceled(
542
        ctx context.Context,
543
        wrappedRequest *types.HistoryRespondActivityTaskCanceledRequest,
544
) (retError error) {
×
545

×
546
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
547
        h.startWG.Wait()
×
548

×
549
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondActivityTaskCanceledScope)
×
550
        defer sw.Stop()
×
551

×
552
        domainID := wrappedRequest.GetDomainUUID()
×
553
        if domainID == "" {
×
554
                return h.error(errDomainNotSet, scope, domainID, "")
×
555
        }
×
556

557
        if ok := h.rateLimiter.Allow(); !ok {
×
558
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
559
        }
×
560

561
        cancelRequest := wrappedRequest.CancelRequest
×
562
        token, err0 := h.tokenSerializer.Deserialize(cancelRequest.TaskToken)
×
563
        if err0 != nil {
×
564
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
565
                return h.error(err0, scope, domainID, "")
×
566
        }
×
567

568
        err0 = validateTaskToken(token)
×
569
        if err0 != nil {
×
570
                return h.error(err0, scope, domainID, "")
×
571
        }
×
572
        workflowID := token.WorkflowID
×
573

×
574
        engine, err1 := h.controller.GetEngine(workflowID)
×
575
        if err1 != nil {
×
576
                return h.error(err1, scope, domainID, workflowID)
×
577
        }
×
578

579
        err2 := engine.RespondActivityTaskCanceled(ctx, wrappedRequest)
×
580
        if err2 != nil {
×
581
                return h.error(err2, scope, domainID, workflowID)
×
582
        }
×
583

584
        return nil
×
585
}
586

587
// RespondDecisionTaskCompleted - records completion of a decision task
588
func (h *handlerImpl) RespondDecisionTaskCompleted(
589
        ctx context.Context,
590
        wrappedRequest *types.HistoryRespondDecisionTaskCompletedRequest,
591
) (resp *types.HistoryRespondDecisionTaskCompletedResponse, retError error) {
931✔
592

931✔
593
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
1,862✔
594
        h.startWG.Wait()
931✔
595

931✔
596
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondDecisionTaskCompletedScope)
931✔
597
        defer sw.Stop()
931✔
598

931✔
599
        domainID := wrappedRequest.GetDomainUUID()
931✔
600
        if domainID == "" {
931✔
601
                return nil, h.error(errDomainNotSet, scope, domainID, "")
×
602
        }
×
603

604
        if ok := h.rateLimiter.Allow(); !ok {
931✔
605
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "")
×
606
        }
×
607

608
        completeRequest := wrappedRequest.CompleteRequest
931✔
609
        if len(completeRequest.Decisions) == 0 {
1,076✔
610
                scope.IncCounter(metrics.EmptyCompletionDecisionsCounter)
145✔
611
        }
145✔
612
        token, err0 := h.tokenSerializer.Deserialize(completeRequest.TaskToken)
931✔
613
        if err0 != nil {
931✔
614
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
615
                return nil, h.error(err0, scope, domainID, "")
×
616
        }
×
617

618
        h.GetLogger().Debug(fmt.Sprintf("RespondDecisionTaskCompleted. DomainID: %v, WorkflowID: %v, RunID: %v, ScheduleID: %v",
931✔
619
                token.DomainID,
931✔
620
                token.WorkflowID,
931✔
621
                token.RunID,
931✔
622
                token.ScheduleID))
931✔
623

931✔
624
        err0 = validateTaskToken(token)
931✔
625
        if err0 != nil {
931✔
626
                return nil, h.error(err0, scope, domainID, "")
×
627
        }
×
628
        workflowID := token.WorkflowID
931✔
629

931✔
630
        engine, err1 := h.controller.GetEngine(workflowID)
931✔
631
        if err1 != nil {
931✔
632
                return nil, h.error(err1, scope, domainID, workflowID)
×
633
        }
×
634

635
        response, err2 := engine.RespondDecisionTaskCompleted(ctx, wrappedRequest)
931✔
636
        if err2 != nil {
940✔
637
                return nil, h.error(err2, scope, domainID, workflowID)
9✔
638
        }
9✔
639

640
        return response, nil
922✔
641
}
642

643
// RespondDecisionTaskFailed - failed response to decision task
644
func (h *handlerImpl) RespondDecisionTaskFailed(
645
        ctx context.Context,
646
        wrappedRequest *types.HistoryRespondDecisionTaskFailedRequest,
647
) (retError error) {
159✔
648

159✔
649
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
318✔
650
        h.startWG.Wait()
159✔
651

159✔
652
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondDecisionTaskFailedScope)
159✔
653
        defer sw.Stop()
159✔
654

159✔
655
        domainID := wrappedRequest.GetDomainUUID()
159✔
656
        if domainID == "" {
159✔
657
                return h.error(errDomainNotSet, scope, domainID, "")
×
658
        }
×
659

660
        if ok := h.rateLimiter.Allow(); !ok {
159✔
661
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
662
        }
×
663

664
        failedRequest := wrappedRequest.FailedRequest
159✔
665
        token, err0 := h.tokenSerializer.Deserialize(failedRequest.TaskToken)
159✔
666
        if err0 != nil {
159✔
667
                err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)}
×
668
                return h.error(err0, scope, domainID, "")
×
669
        }
×
670

671
        h.GetLogger().Debug(fmt.Sprintf("RespondDecisionTaskFailed. DomainID: %v, WorkflowID: %v, RunID: %v, ScheduleID: %v",
159✔
672
                token.DomainID,
159✔
673
                token.WorkflowID,
159✔
674
                token.RunID,
159✔
675
                token.ScheduleID))
159✔
676

159✔
677
        if failedRequest != nil && failedRequest.GetCause() == types.DecisionTaskFailedCauseUnhandledDecision {
159✔
678
                h.GetLogger().Info("Non-Deterministic Error", tag.WorkflowDomainID(token.DomainID), tag.WorkflowID(token.WorkflowID), tag.WorkflowRunID(token.RunID))
×
679
                domainName, err := h.GetDomainCache().GetDomainName(token.DomainID)
×
680
                var domainTag metrics.Tag
×
681

×
682
                if err == nil {
×
683
                        domainTag = metrics.DomainTag(domainName)
×
684
                } else {
×
685
                        domainTag = metrics.DomainUnknownTag()
×
686
                }
×
687

688
                scope.Tagged(domainTag).IncCounter(metrics.CadenceErrNonDeterministicCounter)
×
689
        }
690
        err0 = validateTaskToken(token)
159✔
691
        if err0 != nil {
159✔
692
                return h.error(err0, scope, domainID, "")
×
693
        }
×
694
        workflowID := token.WorkflowID
159✔
695

159✔
696
        engine, err1 := h.controller.GetEngine(workflowID)
159✔
697
        if err1 != nil {
159✔
698
                return h.error(err1, scope, domainID, workflowID)
×
699
        }
×
700

701
        err2 := engine.RespondDecisionTaskFailed(ctx, wrappedRequest)
159✔
702
        if err2 != nil {
159✔
703
                return h.error(err2, scope, domainID, workflowID)
×
704
        }
×
705

706
        return nil
159✔
707
}
708

709
// StartWorkflowExecution - creates a new workflow execution
710
func (h *handlerImpl) StartWorkflowExecution(
711
        ctx context.Context,
712
        wrappedRequest *types.HistoryStartWorkflowExecutionRequest,
713
) (resp *types.StartWorkflowExecutionResponse, retError error) {
462✔
714

462✔
715
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
924✔
716
        h.startWG.Wait()
462✔
717

462✔
718
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryStartWorkflowExecutionScope)
462✔
719
        defer sw.Stop()
462✔
720

462✔
721
        domainID := wrappedRequest.GetDomainUUID()
462✔
722
        if domainID == "" {
462✔
723
                return nil, h.error(errDomainNotSet, scope, domainID, "")
×
724
        }
×
725

726
        if ok := h.rateLimiter.Allow(); !ok {
462✔
727
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "")
×
728
        }
×
729

730
        startRequest := wrappedRequest.StartRequest
462✔
731
        workflowID := startRequest.GetWorkflowID()
462✔
732
        engine, err1 := h.controller.GetEngine(workflowID)
462✔
733
        if err1 != nil {
462✔
734
                return nil, h.error(err1, scope, domainID, workflowID)
×
735
        }
×
736

737
        response, err2 := engine.StartWorkflowExecution(ctx, wrappedRequest)
462✔
738
        if err2 != nil {
480✔
739
                return nil, h.error(err2, scope, domainID, workflowID)
18✔
740
        }
18✔
741

742
        return response, nil
444✔
743
}
744

745
// DescribeHistoryHost returns information about the internal states of a history host
746
func (h *handlerImpl) DescribeHistoryHost(
747
        ctx context.Context,
748
        request *types.DescribeHistoryHostRequest,
749
) (resp *types.DescribeHistoryHostResponse, retError error) {
×
750

×
751
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
752
        h.startWG.Wait()
×
753

×
754
        numOfItemsInCacheByID, numOfItemsInCacheByName := h.GetDomainCache().GetCacheSize()
×
755
        status := ""
×
756
        switch h.controller.Status() {
×
757
        case common.DaemonStatusInitialized:
×
758
                status = "initialized"
×
759
        case common.DaemonStatusStarted:
×
760
                status = "started"
×
761
        case common.DaemonStatusStopped:
×
762
                status = "stopped"
×
763
        }
764

765
        resp = &types.DescribeHistoryHostResponse{
×
766
                NumberOfShards: int32(h.controller.NumShards()),
×
767
                ShardIDs:       h.controller.ShardIDs(),
×
768
                DomainCache: &types.DomainCacheInfo{
×
769
                        NumOfItemsInCacheByID:   numOfItemsInCacheByID,
×
770
                        NumOfItemsInCacheByName: numOfItemsInCacheByName,
×
771
                },
×
772
                ShardControllerStatus: status,
×
773
                Address:               h.GetHostInfo().GetAddress(),
×
774
        }
×
775
        return resp, nil
×
776
}
777

778
// RemoveTask returns information about the internal states of a history host
779
func (h *handlerImpl) RemoveTask(
780
        ctx context.Context,
781
        request *types.RemoveTaskRequest,
782
) (retError error) {
×
783
        executionMgr, err := h.GetExecutionManager(int(request.GetShardID()))
×
784
        if err != nil {
×
785
                return err
×
786
        }
×
787

788
        switch taskType := common.TaskType(request.GetType()); taskType {
×
789
        case common.TaskTypeTransfer:
×
790
                return executionMgr.CompleteTransferTask(ctx, &persistence.CompleteTransferTaskRequest{
×
791
                        TaskID: request.GetTaskID(),
×
792
                })
×
793
        case common.TaskTypeTimer:
×
794
                return executionMgr.CompleteTimerTask(ctx, &persistence.CompleteTimerTaskRequest{
×
795
                        VisibilityTimestamp: time.Unix(0, request.GetVisibilityTimestamp()),
×
796
                        TaskID:              request.GetTaskID(),
×
797
                })
×
798
        case common.TaskTypeReplication:
×
799
                return executionMgr.CompleteReplicationTask(ctx, &persistence.CompleteReplicationTaskRequest{
×
800
                        TaskID: request.GetTaskID(),
×
801
                })
×
802
        case common.TaskTypeCrossCluster:
×
803
                return executionMgr.CompleteCrossClusterTask(ctx, &persistence.CompleteCrossClusterTaskRequest{
×
804
                        TargetCluster: request.GetClusterName(),
×
805
                        TaskID:        request.GetTaskID(),
×
806
                })
×
807
        default:
×
808
                return errInvalidTaskType
×
809
        }
810
}
811

812
// CloseShard closes a shard hosted by this instance
813
func (h *handlerImpl) CloseShard(
814
        ctx context.Context,
815
        request *types.CloseShardRequest,
816
) (retError error) {
×
817
        h.controller.RemoveEngineForShard(int(request.GetShardID()))
×
818
        return nil
×
819
}
×
820

821
// ResetQueue resets processing queue states
822
func (h *handlerImpl) ResetQueue(
823
        ctx context.Context,
824
        request *types.ResetQueueRequest,
825
) (retError error) {
×
826

×
827
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
828
        h.startWG.Wait()
×
829

×
830
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryResetQueueScope)
×
831
        defer sw.Stop()
×
832

×
833
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
×
834
        if err != nil {
×
835
                return h.error(err, scope, "", "")
×
836
        }
×
837

838
        switch taskType := common.TaskType(request.GetType()); taskType {
×
839
        case common.TaskTypeTransfer:
×
840
                err = engine.ResetTransferQueue(ctx, request.GetClusterName())
×
841
        case common.TaskTypeTimer:
×
842
                err = engine.ResetTimerQueue(ctx, request.GetClusterName())
×
843
        case common.TaskTypeCrossCluster:
×
844
                err = engine.ResetCrossClusterQueue(ctx, request.GetClusterName())
×
845
        default:
×
846
                err = errInvalidTaskType
×
847
        }
848

849
        if err != nil {
×
850
                return h.error(err, scope, "", "")
×
851
        }
×
852
        return nil
×
853
}
854

855
// DescribeQueue describes processing queue states
856
func (h *handlerImpl) DescribeQueue(
857
        ctx context.Context,
858
        request *types.DescribeQueueRequest,
859
) (resp *types.DescribeQueueResponse, retError error) {
×
860

×
861
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
862
        h.startWG.Wait()
×
863

×
864
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryDescribeQueueScope)
×
865
        defer sw.Stop()
×
866

×
867
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
×
868
        if err != nil {
×
869
                return nil, h.error(err, scope, "", "")
×
870
        }
×
871

872
        switch taskType := common.TaskType(request.GetType()); taskType {
×
873
        case common.TaskTypeTransfer:
×
874
                resp, err = engine.DescribeTransferQueue(ctx, request.GetClusterName())
×
875
        case common.TaskTypeTimer:
×
876
                resp, err = engine.DescribeTimerQueue(ctx, request.GetClusterName())
×
877
        case common.TaskTypeCrossCluster:
×
878
                resp, err = engine.DescribeCrossClusterQueue(ctx, request.GetClusterName())
×
879
        default:
×
880
                err = errInvalidTaskType
×
881
        }
882

883
        if err != nil {
×
884
                return nil, h.error(err, scope, "", "")
×
885
        }
×
886
        return resp, nil
×
887
}
888

889
// DescribeMutableState - returns the internal analysis of workflow execution state
890
func (h *handlerImpl) DescribeMutableState(
891
        ctx context.Context,
892
        request *types.DescribeMutableStateRequest,
893
) (resp *types.DescribeMutableStateResponse, retError error) {
×
894

×
895
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
896
        h.startWG.Wait()
×
897

×
898
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryDescribeMutabelStateScope)
×
899
        defer sw.Stop()
×
900

×
901
        domainID := request.GetDomainUUID()
×
902
        if domainID == "" {
×
903
                return nil, h.error(errDomainNotSet, scope, domainID, "")
×
904
        }
×
905

906
        workflowExecution := request.Execution
×
907
        workflowID := workflowExecution.GetWorkflowID()
×
908
        engine, err1 := h.controller.GetEngine(workflowID)
×
909
        if err1 != nil {
×
910
                return nil, h.error(err1, scope, domainID, workflowID)
×
911
        }
×
912

913
        resp, err2 := engine.DescribeMutableState(ctx, request)
×
914
        if err2 != nil {
×
915
                return nil, h.error(err2, scope, domainID, workflowID)
×
916
        }
×
917
        return resp, nil
×
918
}
919

920
// GetMutableState - returns the id of the next event in the execution's history
921
func (h *handlerImpl) GetMutableState(
922
        ctx context.Context,
923
        getRequest *types.GetMutableStateRequest,
924
) (resp *types.GetMutableStateResponse, retError error) {
432✔
925

432✔
926
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
864✔
927
        h.startWG.Wait()
432✔
928

432✔
929
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryGetMutableStateScope)
432✔
930
        defer sw.Stop()
432✔
931

432✔
932
        domainID := getRequest.GetDomainUUID()
432✔
933
        if domainID == "" {
432✔
934
                return nil, h.error(errDomainNotSet, scope, domainID, "")
×
935
        }
×
936

937
        if ok := h.rateLimiter.Allow(); !ok {
432✔
938
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "")
×
939
        }
×
940

941
        workflowExecution := getRequest.Execution
432✔
942
        workflowID := workflowExecution.GetWorkflowID()
432✔
943
        engine, err1 := h.controller.GetEngine(workflowID)
432✔
944
        if err1 != nil {
432✔
945
                return nil, h.error(err1, scope, domainID, workflowID)
×
946
        }
×
947

948
        resp, err2 := engine.GetMutableState(ctx, getRequest)
432✔
949
        if err2 != nil {
456✔
950
                return nil, h.error(err2, scope, domainID, workflowID)
24✔
951
        }
24✔
952
        return resp, nil
408✔
953
}
954

955
// PollMutableState - returns the id of the next event in the execution's history
956
func (h *handlerImpl) PollMutableState(
957
        ctx context.Context,
958
        getRequest *types.PollMutableStateRequest,
959
) (resp *types.PollMutableStateResponse, retError error) {
384✔
960

384✔
961
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
768✔
962
        h.startWG.Wait()
384✔
963

384✔
964
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryPollMutableStateScope)
384✔
965
        defer sw.Stop()
384✔
966

384✔
967
        domainID := getRequest.GetDomainUUID()
384✔
968
        if domainID == "" {
384✔
969
                return nil, h.error(errDomainNotSet, scope, domainID, "")
×
970
        }
×
971

972
        if ok := h.rateLimiter.Allow(); !ok {
384✔
973
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "")
×
974
        }
×
975

976
        workflowExecution := getRequest.Execution
384✔
977
        workflowID := workflowExecution.GetWorkflowID()
384✔
978
        engine, err1 := h.controller.GetEngine(workflowID)
384✔
979
        if err1 != nil {
384✔
980
                return nil, h.error(err1, scope, domainID, workflowID)
×
981
        }
×
982

983
        resp, err2 := engine.PollMutableState(ctx, getRequest)
384✔
984
        if err2 != nil {
384✔
985
                return nil, h.error(err2, scope, domainID, workflowID)
×
986
        }
×
987
        return resp, nil
384✔
988
}
989

990
// DescribeWorkflowExecution returns information about the specified workflow execution.
991
func (h *handlerImpl) DescribeWorkflowExecution(
992
        ctx context.Context,
993
        request *types.HistoryDescribeWorkflowExecutionRequest,
994
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
93✔
995

93✔
996
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
186✔
997
        h.startWG.Wait()
93✔
998

93✔
999
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryDescribeWorkflowExecutionScope)
93✔
1000
        defer sw.Stop()
93✔
1001

93✔
1002
        domainID := request.GetDomainUUID()
93✔
1003
        if domainID == "" {
93✔
1004
                return nil, h.error(errDomainNotSet, scope, domainID, "")
×
1005
        }
×
1006

1007
        if ok := h.rateLimiter.Allow(); !ok {
93✔
1008
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "")
×
1009
        }
×
1010

1011
        workflowExecution := request.Request.Execution
93✔
1012
        workflowID := workflowExecution.GetWorkflowID()
93✔
1013
        engine, err1 := h.controller.GetEngine(workflowID)
93✔
1014
        if err1 != nil {
93✔
1015
                return nil, h.error(err1, scope, domainID, workflowID)
×
1016
        }
×
1017

1018
        resp, err2 := engine.DescribeWorkflowExecution(ctx, request)
93✔
1019
        if err2 != nil {
93✔
1020
                return nil, h.error(err2, scope, domainID, workflowID)
×
1021
        }
×
1022
        return resp, nil
93✔
1023
}
1024

1025
// RequestCancelWorkflowExecution - requests cancellation of a workflow
1026
func (h *handlerImpl) RequestCancelWorkflowExecution(
1027
        ctx context.Context,
1028
        request *types.HistoryRequestCancelWorkflowExecutionRequest,
1029
) (retError error) {
12✔
1030

12✔
1031
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
24✔
1032
        h.startWG.Wait()
12✔
1033

12✔
1034
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRequestCancelWorkflowExecutionScope)
12✔
1035
        defer sw.Stop()
12✔
1036

12✔
1037
        if h.isShuttingDown() {
12✔
1038
                return errShuttingDown
×
1039
        }
×
1040

1041
        domainID := request.GetDomainUUID()
12✔
1042
        if domainID == "" || request.CancelRequest.GetDomain() == "" {
12✔
1043
                return h.error(errDomainNotSet, scope, domainID, "")
×
1044
        }
×
1045

1046
        if ok := h.rateLimiter.Allow(); !ok {
12✔
1047
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
1048
        }
×
1049

1050
        cancelRequest := request.CancelRequest
12✔
1051
        h.GetLogger().Debug(fmt.Sprintf("RequestCancelWorkflowExecution. DomainID: %v/%v, WorkflowID: %v, RunID: %v.",
12✔
1052
                cancelRequest.GetDomain(),
12✔
1053
                request.GetDomainUUID(),
12✔
1054
                cancelRequest.WorkflowExecution.GetWorkflowID(),
12✔
1055
                cancelRequest.WorkflowExecution.GetRunID()))
12✔
1056

12✔
1057
        workflowID := cancelRequest.WorkflowExecution.GetWorkflowID()
12✔
1058
        engine, err1 := h.controller.GetEngine(workflowID)
12✔
1059
        if err1 != nil {
12✔
1060
                return h.error(err1, scope, domainID, workflowID)
×
1061
        }
×
1062

1063
        err2 := engine.RequestCancelWorkflowExecution(ctx, request)
12✔
1064
        if err2 != nil {
18✔
1065
                return h.error(err2, scope, domainID, workflowID)
6✔
1066
        }
6✔
1067

1068
        return nil
6✔
1069
}
1070

1071
// SignalWorkflowExecution is used to send a signal event to running workflow execution.  This results in
1072
// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
1073
func (h *handlerImpl) SignalWorkflowExecution(
1074
        ctx context.Context,
1075
        wrappedRequest *types.HistorySignalWorkflowExecutionRequest,
1076
) (retError error) {
732✔
1077

732✔
1078
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
1,464✔
1079
        h.startWG.Wait()
732✔
1080

732✔
1081
        scope, sw := h.startRequestProfile(ctx, metrics.HistorySignalWorkflowExecutionScope)
732✔
1082
        defer sw.Stop()
732✔
1083

732✔
1084
        if h.isShuttingDown() {
732✔
1085
                return errShuttingDown
×
1086
        }
×
1087

1088
        domainID := wrappedRequest.GetDomainUUID()
732✔
1089
        if domainID == "" {
732✔
1090
                return h.error(errDomainNotSet, scope, domainID, "")
×
1091
        }
×
1092

1093
        if ok := h.rateLimiter.Allow(); !ok {
732✔
1094
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
1095
        }
×
1096

1097
        workflowExecution := wrappedRequest.SignalRequest.WorkflowExecution
732✔
1098
        workflowID := workflowExecution.GetWorkflowID()
732✔
1099
        engine, err1 := h.controller.GetEngine(workflowID)
732✔
1100
        if err1 != nil {
732✔
1101
                return h.error(err1, scope, domainID, workflowID)
×
1102
        }
×
1103

1104
        err2 := engine.SignalWorkflowExecution(ctx, wrappedRequest)
732✔
1105
        if err2 != nil {
744✔
1106
                return h.error(err2, scope, domainID, workflowID)
12✔
1107
        }
12✔
1108

1109
        return nil
720✔
1110
}
1111

1112
// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
1113
// If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history
1114
// and a decision task being created for the execution.
1115
// If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled
1116
// event recorded in history, and a decision task being created for the execution
1117
func (h *handlerImpl) SignalWithStartWorkflowExecution(
1118
        ctx context.Context,
1119
        wrappedRequest *types.HistorySignalWithStartWorkflowExecutionRequest,
1120
) (resp *types.StartWorkflowExecutionResponse, retError error) {
33✔
1121

33✔
1122
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
66✔
1123
        h.startWG.Wait()
33✔
1124

33✔
1125
        scope, sw := h.startRequestProfile(ctx, metrics.HistorySignalWithStartWorkflowExecutionScope)
33✔
1126
        defer sw.Stop()
33✔
1127

33✔
1128
        if h.isShuttingDown() {
33✔
1129
                return nil, errShuttingDown
×
1130
        }
×
1131

1132
        domainID := wrappedRequest.GetDomainUUID()
33✔
1133
        if domainID == "" {
33✔
1134
                return nil, h.error(errDomainNotSet, scope, domainID, "")
×
1135
        }
×
1136

1137
        if ok := h.rateLimiter.Allow(); !ok {
33✔
1138
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "")
×
1139
        }
×
1140

1141
        signalWithStartRequest := wrappedRequest.SignalWithStartRequest
33✔
1142
        workflowID := signalWithStartRequest.GetWorkflowID()
33✔
1143
        engine, err1 := h.controller.GetEngine(workflowID)
33✔
1144
        if err1 != nil {
33✔
1145
                return nil, h.error(err1, scope, domainID, workflowID)
×
1146
        }
×
1147

1148
        resp, err2 := engine.SignalWithStartWorkflowExecution(ctx, wrappedRequest)
33✔
1149
        if err2 == nil {
60✔
1150
                return resp, nil
27✔
1151
        }
27✔
1152
        // Two simultaneous SignalWithStart requests might try to start a workflow at the same time.
1153
        // This can result in one of the requests failing with one of two possible errors:
1154
        //    1) If it is a brand new WF ID, one of the requests can fail with WorkflowExecutionAlreadyStartedError
1155
        //       (createMode is persistence.CreateWorkflowModeBrandNew)
1156
        //    2) If it an already existing WF ID, one of the requests can fail with a CurrentWorkflowConditionFailedError
1157
        //       (createMode is persisetence.CreateWorkflowModeWorkflowIDReuse)
1158
        // If either error occurs, just go ahead and retry. It should succeed on the subsequent attempt.
1159
        var e1 *persistence.WorkflowExecutionAlreadyStartedError
6✔
1160
        var e2 *persistence.CurrentWorkflowConditionFailedError
6✔
1161
        if !errors.As(err2, &e1) && !errors.As(err2, &e2) {
12✔
1162
                return nil, h.error(err2, scope, domainID, workflowID)
6✔
1163
        }
6✔
1164

1165
        resp, err2 = engine.SignalWithStartWorkflowExecution(ctx, wrappedRequest)
×
1166
        if err2 != nil {
×
1167
                return nil, h.error(err2, scope, domainID, workflowID)
×
1168
        }
×
1169
        return resp, nil
×
1170
}
1171

1172
// RemoveSignalMutableState is used to remove a signal request ID that was previously recorded.  This is currently
1173
// used to clean execution info when signal decision finished.
1174
func (h *handlerImpl) RemoveSignalMutableState(
1175
        ctx context.Context,
1176
        wrappedRequest *types.RemoveSignalMutableStateRequest,
1177
) (retError error) {
6✔
1178

6✔
1179
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
12✔
1180
        h.startWG.Wait()
6✔
1181

6✔
1182
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRemoveSignalMutableStateScope)
6✔
1183
        defer sw.Stop()
6✔
1184

6✔
1185
        if h.isShuttingDown() {
6✔
1186
                return errShuttingDown
×
1187
        }
×
1188

1189
        domainID := wrappedRequest.GetDomainUUID()
6✔
1190
        if domainID == "" {
6✔
1191
                return h.error(errDomainNotSet, scope, domainID, "")
×
1192
        }
×
1193

1194
        if ok := h.rateLimiter.Allow(); !ok {
6✔
1195
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
1196
        }
×
1197

1198
        workflowExecution := wrappedRequest.WorkflowExecution
6✔
1199
        workflowID := workflowExecution.GetWorkflowID()
6✔
1200
        engine, err1 := h.controller.GetEngine(workflowID)
6✔
1201
        if err1 != nil {
6✔
1202
                return h.error(err1, scope, domainID, workflowID)
×
1203
        }
×
1204

1205
        err2 := engine.RemoveSignalMutableState(ctx, wrappedRequest)
6✔
1206
        if err2 != nil {
6✔
1207
                return h.error(err2, scope, domainID, workflowID)
×
1208
        }
×
1209

1210
        return nil
6✔
1211
}
1212

1213
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event
1214
// in the history and immediately terminating the execution instance.
1215
func (h *handlerImpl) TerminateWorkflowExecution(
1216
        ctx context.Context,
1217
        wrappedRequest *types.HistoryTerminateWorkflowExecutionRequest,
1218
) (retError error) {
51✔
1219

51✔
1220
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
102✔
1221
        h.startWG.Wait()
51✔
1222

51✔
1223
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryTerminateWorkflowExecutionScope)
51✔
1224
        defer sw.Stop()
51✔
1225

51✔
1226
        if h.isShuttingDown() {
51✔
1227
                return errShuttingDown
×
1228
        }
×
1229

1230
        domainID := wrappedRequest.GetDomainUUID()
51✔
1231
        if domainID == "" {
51✔
1232
                return h.error(errDomainNotSet, scope, domainID, "")
×
1233
        }
×
1234

1235
        if ok := h.rateLimiter.Allow(); !ok {
51✔
1236
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
1237
        }
×
1238

1239
        workflowExecution := wrappedRequest.TerminateRequest.WorkflowExecution
51✔
1240
        workflowID := workflowExecution.GetWorkflowID()
51✔
1241
        engine, err1 := h.controller.GetEngine(workflowID)
51✔
1242
        if err1 != nil {
51✔
1243
                return h.error(err1, scope, domainID, workflowID)
×
1244
        }
×
1245

1246
        err2 := engine.TerminateWorkflowExecution(ctx, wrappedRequest)
51✔
1247
        if err2 != nil {
51✔
1248
                return h.error(err2, scope, domainID, workflowID)
×
1249
        }
×
1250

1251
        return nil
51✔
1252
}
1253

1254
// ResetWorkflowExecution reset an existing workflow execution
1255
// in the history and immediately terminating the execution instance.
1256
func (h *handlerImpl) ResetWorkflowExecution(
1257
        ctx context.Context,
1258
        wrappedRequest *types.HistoryResetWorkflowExecutionRequest,
1259
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
15✔
1260

15✔
1261
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
30✔
1262
        h.startWG.Wait()
15✔
1263

15✔
1264
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryResetWorkflowExecutionScope)
15✔
1265
        defer sw.Stop()
15✔
1266

15✔
1267
        if h.isShuttingDown() {
15✔
1268
                return nil, errShuttingDown
×
1269
        }
×
1270

1271
        domainID := wrappedRequest.GetDomainUUID()
15✔
1272
        if domainID == "" {
15✔
1273
                return nil, h.error(errDomainNotSet, scope, domainID, "")
×
1274
        }
×
1275

1276
        if ok := h.rateLimiter.Allow(); !ok {
15✔
1277
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "")
×
1278
        }
×
1279

1280
        workflowExecution := wrappedRequest.ResetRequest.WorkflowExecution
15✔
1281
        workflowID := workflowExecution.GetWorkflowID()
15✔
1282
        engine, err1 := h.controller.GetEngine(workflowID)
15✔
1283
        if err1 != nil {
15✔
1284
                return nil, h.error(err1, scope, domainID, workflowID)
×
1285
        }
×
1286

1287
        resp, err2 := engine.ResetWorkflowExecution(ctx, wrappedRequest)
15✔
1288
        if err2 != nil {
15✔
1289
                return nil, h.error(err2, scope, domainID, workflowID)
×
1290
        }
×
1291

1292
        return resp, nil
15✔
1293
}
1294

1295
// QueryWorkflow queries a types.
1296
func (h *handlerImpl) QueryWorkflow(
1297
        ctx context.Context,
1298
        request *types.HistoryQueryWorkflowRequest,
1299
) (resp *types.HistoryQueryWorkflowResponse, retError error) {
45✔
1300
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
90✔
1301
        h.startWG.Wait()
45✔
1302

45✔
1303
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryQueryWorkflowScope)
45✔
1304
        defer sw.Stop()
45✔
1305

45✔
1306
        if h.isShuttingDown() {
45✔
1307
                return nil, errShuttingDown
×
1308
        }
×
1309

1310
        domainID := request.GetDomainUUID()
45✔
1311
        if domainID == "" {
45✔
1312
                return nil, h.error(errDomainNotSet, scope, domainID, "")
×
1313
        }
×
1314

1315
        if ok := h.rateLimiter.Allow(); !ok {
45✔
1316
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "")
×
1317
        }
×
1318

1319
        workflowID := request.GetRequest().GetExecution().GetWorkflowID()
45✔
1320
        engine, err1 := h.controller.GetEngine(workflowID)
45✔
1321
        if err1 != nil {
45✔
1322
                return nil, h.error(err1, scope, domainID, workflowID)
×
1323
        }
×
1324

1325
        resp, err2 := engine.QueryWorkflow(ctx, request)
45✔
1326
        if err2 != nil {
57✔
1327
                return nil, h.error(err2, scope, domainID, workflowID)
12✔
1328
        }
12✔
1329

1330
        return resp, nil
33✔
1331
}
1332

1333
// ScheduleDecisionTask is used for creating a decision task for already started workflow execution.  This is mainly
1334
// used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts
1335
// child execution without creating the decision task and then calls this API after updating the mutable state of
1336
// parent execution.
1337
func (h *handlerImpl) ScheduleDecisionTask(
1338
        ctx context.Context,
1339
        request *types.ScheduleDecisionTaskRequest,
1340
) (retError error) {
18✔
1341

18✔
1342
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
36✔
1343
        h.startWG.Wait()
18✔
1344

18✔
1345
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryScheduleDecisionTaskScope)
18✔
1346
        defer sw.Stop()
18✔
1347

18✔
1348
        if h.isShuttingDown() {
18✔
1349
                return errShuttingDown
×
1350
        }
×
1351

1352
        domainID := request.GetDomainUUID()
18✔
1353
        if domainID == "" {
18✔
1354
                return h.error(errDomainNotSet, scope, domainID, "")
×
1355
        }
×
1356

1357
        if ok := h.rateLimiter.Allow(); !ok {
18✔
1358
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
1359
        }
×
1360

1361
        if request.WorkflowExecution == nil {
18✔
1362
                return h.error(errWorkflowExecutionNotSet, scope, domainID, "")
×
1363
        }
×
1364

1365
        workflowExecution := request.WorkflowExecution
18✔
1366
        workflowID := workflowExecution.GetWorkflowID()
18✔
1367
        engine, err1 := h.controller.GetEngine(workflowID)
18✔
1368
        if err1 != nil {
18✔
1369
                return h.error(err1, scope, domainID, workflowID)
×
1370
        }
×
1371

1372
        err2 := engine.ScheduleDecisionTask(ctx, request)
18✔
1373
        if err2 != nil {
18✔
1374
                return h.error(err2, scope, domainID, workflowID)
×
1375
        }
×
1376

1377
        return nil
18✔
1378
}
1379

1380
// RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent.
1381
// This is mainly called by transfer queue processor during the processing of DeleteExecution task.
1382
func (h *handlerImpl) RecordChildExecutionCompleted(
1383
        ctx context.Context,
1384
        request *types.RecordChildExecutionCompletedRequest,
1385
) (retError error) {
18✔
1386

18✔
1387
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
36✔
1388
        h.startWG.Wait()
18✔
1389

18✔
1390
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordChildExecutionCompletedScope)
18✔
1391
        defer sw.Stop()
18✔
1392

18✔
1393
        if h.isShuttingDown() {
18✔
1394
                return errShuttingDown
×
1395
        }
×
1396

1397
        domainID := request.GetDomainUUID()
18✔
1398
        if domainID == "" {
18✔
1399
                return h.error(errDomainNotSet, scope, domainID, "")
×
1400
        }
×
1401

1402
        if ok := h.rateLimiter.Allow(); !ok {
18✔
1403
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
1404
        }
×
1405

1406
        if request.WorkflowExecution == nil {
18✔
1407
                return h.error(errWorkflowExecutionNotSet, scope, domainID, "")
×
1408
        }
×
1409

1410
        workflowExecution := request.WorkflowExecution
18✔
1411
        workflowID := workflowExecution.GetWorkflowID()
18✔
1412
        engine, err1 := h.controller.GetEngine(workflowID)
18✔
1413
        if err1 != nil {
18✔
1414
                return h.error(err1, scope, domainID, workflowID)
×
1415
        }
×
1416

1417
        err2 := engine.RecordChildExecutionCompleted(ctx, request)
18✔
1418
        if err2 != nil {
21✔
1419
                return h.error(err2, scope, domainID, workflowID)
3✔
1420
        }
3✔
1421

1422
        return nil
15✔
1423
}
1424

1425
// ResetStickyTaskList reset the volatile information in mutable state of a given types.
1426
// Volatile information are the information related to client, such as:
1427
// 1. StickyTaskList
1428
// 2. StickyScheduleToStartTimeout
1429
// 3. ClientLibraryVersion
1430
// 4. ClientFeatureVersion
1431
// 5. ClientImpl
1432
func (h *handlerImpl) ResetStickyTaskList(
1433
        ctx context.Context,
1434
        resetRequest *types.HistoryResetStickyTaskListRequest,
1435
) (resp *types.HistoryResetStickyTaskListResponse, retError error) {
3✔
1436

3✔
1437
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
6✔
1438
        h.startWG.Wait()
3✔
1439

3✔
1440
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryResetStickyTaskListScope)
3✔
1441
        defer sw.Stop()
3✔
1442

3✔
1443
        if h.isShuttingDown() {
3✔
1444
                return nil, errShuttingDown
×
1445
        }
×
1446

1447
        domainID := resetRequest.GetDomainUUID()
3✔
1448
        if domainID == "" {
3✔
1449
                return nil, h.error(errDomainNotSet, scope, domainID, "")
×
1450
        }
×
1451

1452
        if ok := h.rateLimiter.Allow(); !ok {
3✔
1453
                return nil, h.error(errHistoryHostThrottle, scope, domainID, "")
×
1454
        }
×
1455

1456
        workflowID := resetRequest.Execution.GetWorkflowID()
3✔
1457
        engine, err := h.controller.GetEngine(workflowID)
3✔
1458
        if err != nil {
3✔
1459
                return nil, h.error(err, scope, domainID, workflowID)
×
1460
        }
×
1461

1462
        resp, err = engine.ResetStickyTaskList(ctx, resetRequest)
3✔
1463
        if err != nil {
3✔
1464
                return nil, h.error(err, scope, domainID, workflowID)
×
1465
        }
×
1466

1467
        return resp, nil
3✔
1468
}
1469

1470
// ReplicateEventsV2 is called by processor to replicate history events for passive domains
1471
func (h *handlerImpl) ReplicateEventsV2(
1472
        ctx context.Context,
1473
        replicateRequest *types.ReplicateEventsV2Request,
1474
) (retError error) {
3✔
1475

3✔
1476
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
6✔
1477
        h.startWG.Wait()
3✔
1478

3✔
1479
        if h.isShuttingDown() {
3✔
1480
                return errShuttingDown
×
1481
        }
×
1482

1483
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryReplicateEventsV2Scope)
3✔
1484
        defer sw.Stop()
3✔
1485

3✔
1486
        domainID := replicateRequest.GetDomainUUID()
3✔
1487
        if domainID == "" {
3✔
1488
                return h.error(errDomainNotSet, scope, domainID, "")
×
1489
        }
×
1490

1491
        if ok := h.rateLimiter.Allow(); !ok {
3✔
1492
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
1493
        }
×
1494

1495
        workflowExecution := replicateRequest.WorkflowExecution
3✔
1496
        workflowID := workflowExecution.GetWorkflowID()
3✔
1497
        engine, err1 := h.controller.GetEngine(workflowID)
3✔
1498
        if err1 != nil {
3✔
1499
                return h.error(err1, scope, domainID, workflowID)
×
1500
        }
×
1501

1502
        err2 := engine.ReplicateEventsV2(ctx, replicateRequest)
3✔
1503
        if err2 != nil {
3✔
1504
                return h.error(err2, scope, domainID, workflowID)
×
1505
        }
×
1506

1507
        return nil
3✔
1508
}
1509

1510
// SyncShardStatus is called by processor to sync history shard information from another cluster
1511
func (h *handlerImpl) SyncShardStatus(
1512
        ctx context.Context,
1513
        syncShardStatusRequest *types.SyncShardStatusRequest,
1514
) (retError error) {
×
1515

×
1516
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1517
        h.startWG.Wait()
×
1518

×
1519
        scope, sw := h.startRequestProfile(ctx, metrics.HistorySyncShardStatusScope)
×
1520
        defer sw.Stop()
×
1521

×
1522
        if h.isShuttingDown() {
×
1523
                return errShuttingDown
×
1524
        }
×
1525

1526
        if ok := h.rateLimiter.Allow(); !ok {
×
1527
                return h.error(errHistoryHostThrottle, scope, "", "")
×
1528
        }
×
1529

1530
        if syncShardStatusRequest.SourceCluster == "" {
×
1531
                return h.error(errSourceClusterNotSet, scope, "", "")
×
1532
        }
×
1533

1534
        if syncShardStatusRequest.Timestamp == nil {
×
1535
                return h.error(errTimestampNotSet, scope, "", "")
×
1536
        }
×
1537

1538
        // shard ID is already provided in the request
1539
        engine, err := h.controller.GetEngineForShard(int(syncShardStatusRequest.GetShardID()))
×
1540
        if err != nil {
×
1541
                return h.error(err, scope, "", "")
×
1542
        }
×
1543

1544
        err = engine.SyncShardStatus(ctx, syncShardStatusRequest)
×
1545
        if err != nil {
×
1546
                return h.error(err, scope, "", "")
×
1547
        }
×
1548

1549
        return nil
×
1550
}
1551

1552
// SyncActivity is called by processor to sync activity
1553
func (h *handlerImpl) SyncActivity(
1554
        ctx context.Context,
1555
        syncActivityRequest *types.SyncActivityRequest,
1556
) (retError error) {
×
1557

×
1558
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1559
        h.startWG.Wait()
×
1560

×
1561
        scope, sw := h.startRequestProfile(ctx, metrics.HistorySyncActivityScope)
×
1562
        defer sw.Stop()
×
1563

×
1564
        if h.isShuttingDown() {
×
1565
                return errShuttingDown
×
1566
        }
×
1567

1568
        domainID := syncActivityRequest.GetDomainID()
×
1569
        if syncActivityRequest.DomainID == "" || uuid.Parse(syncActivityRequest.GetDomainID()) == nil {
×
1570
                return h.error(errDomainNotSet, scope, domainID, "")
×
1571
        }
×
1572

1573
        if ok := h.rateLimiter.Allow(); !ok {
×
1574
                return h.error(errHistoryHostThrottle, scope, domainID, "")
×
1575
        }
×
1576

1577
        if syncActivityRequest.WorkflowID == "" {
×
1578
                return h.error(errWorkflowIDNotSet, scope, domainID, "")
×
1579
        }
×
1580

1581
        if syncActivityRequest.RunID == "" || uuid.Parse(syncActivityRequest.GetRunID()) == nil {
×
1582
                return h.error(errRunIDNotValid, scope, domainID, "")
×
1583
        }
×
1584

1585
        workflowID := syncActivityRequest.GetWorkflowID()
×
1586
        engine, err := h.controller.GetEngine(workflowID)
×
1587
        if err != nil {
×
1588
                return h.error(err, scope, domainID, workflowID)
×
1589
        }
×
1590

1591
        err = engine.SyncActivity(ctx, syncActivityRequest)
×
1592
        if err != nil {
×
1593
                return h.error(err, scope, domainID, workflowID)
×
1594
        }
×
1595

1596
        return nil
×
1597
}
1598

1599
// GetReplicationMessages is called by remote peers to get replicated messages for cross DC replication
1600
func (h *handlerImpl) GetReplicationMessages(
1601
        ctx context.Context,
1602
        request *types.GetReplicationMessagesRequest,
1603
) (resp *types.GetReplicationMessagesResponse, retError error) {
×
1604
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1605
        h.startWG.Wait()
×
1606

×
1607
        h.GetLogger().Debug("Received GetReplicationMessages call.")
×
1608

×
1609
        _, sw := h.startRequestProfile(ctx, metrics.HistoryGetReplicationMessagesScope)
×
1610
        defer sw.Stop()
×
1611

×
1612
        if h.isShuttingDown() {
×
1613
                return nil, errShuttingDown
×
1614
        }
×
1615

1616
        var wg sync.WaitGroup
×
1617
        wg.Add(len(request.Tokens))
×
1618
        result := new(sync.Map)
×
1619

×
1620
        for _, token := range request.Tokens {
×
1621
                go func(token *types.ReplicationToken) {
×
1622
                        defer wg.Done()
×
1623

×
1624
                        engine, err := h.controller.GetEngineForShard(int(token.GetShardID()))
×
1625
                        if err != nil {
×
1626
                                h.GetLogger().Warn("History engine not found for shard", tag.Error(err))
×
1627
                                return
×
1628
                        }
×
1629
                        tasks, err := engine.GetReplicationMessages(
×
1630
                                ctx,
×
1631
                                request.GetClusterName(),
×
1632
                                token.GetLastRetrievedMessageID(),
×
1633
                        )
×
1634
                        if err != nil {
×
1635
                                h.GetLogger().Warn("Failed to get replication tasks for shard", tag.Error(err))
×
1636
                                return
×
1637
                        }
×
1638

1639
                        result.Store(token.GetShardID(), tasks)
×
1640
                }(token)
1641
        }
1642

1643
        wg.Wait()
×
1644

×
1645
        responseSize := 0
×
1646
        maxResponseSize := h.config.MaxResponseSize
×
1647

×
1648
        messagesByShard := make(map[int32]*types.ReplicationMessages)
×
1649
        result.Range(func(key, value interface{}) bool {
×
1650
                shardID := key.(int32)
×
1651
                tasks := value.(*types.ReplicationMessages)
×
1652

×
1653
                size := proto.FromReplicationMessages(tasks).Size()
×
1654
                if (responseSize + size) >= maxResponseSize {
×
1655
                        // Log shards that did not fit for debugging purposes
×
1656
                        h.GetLogger().Warn("Replication messages did not fit in the response (history host)",
×
1657
                                tag.ShardID(int(shardID)),
×
1658
                                tag.ResponseSize(size),
×
1659
                                tag.ResponseTotalSize(responseSize),
×
1660
                                tag.ResponseMaxSize(maxResponseSize),
×
1661
                        )
×
1662
                } else {
×
1663
                        responseSize += size
×
1664
                        messagesByShard[shardID] = tasks
×
1665
                }
×
1666

1667
                return true
×
1668
        })
1669

1670
        h.GetLogger().Debug("GetReplicationMessages succeeded.")
×
1671

×
1672
        return &types.GetReplicationMessagesResponse{MessagesByShard: messagesByShard}, nil
×
1673
}
1674

1675
// GetDLQReplicationMessages is called by remote peers to get replicated messages for DLQ merging
1676
func (h *handlerImpl) GetDLQReplicationMessages(
1677
        ctx context.Context,
1678
        request *types.GetDLQReplicationMessagesRequest,
1679
) (resp *types.GetDLQReplicationMessagesResponse, retError error) {
×
1680
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1681
        h.startWG.Wait()
×
1682

×
1683
        _, sw := h.startRequestProfile(ctx, metrics.HistoryGetDLQReplicationMessagesScope)
×
1684
        defer sw.Stop()
×
1685

×
1686
        if h.isShuttingDown() {
×
1687
                return nil, errShuttingDown
×
1688
        }
×
1689

1690
        taskInfoPerExecution := map[definition.WorkflowIdentifier][]*types.ReplicationTaskInfo{}
×
1691
        // do batch based on workflow ID and run ID
×
1692
        for _, taskInfo := range request.GetTaskInfos() {
×
1693
                identity := definition.NewWorkflowIdentifier(
×
1694
                        taskInfo.GetDomainID(),
×
1695
                        taskInfo.GetWorkflowID(),
×
1696
                        taskInfo.GetRunID(),
×
1697
                )
×
1698
                if _, ok := taskInfoPerExecution[identity]; !ok {
×
1699
                        taskInfoPerExecution[identity] = []*types.ReplicationTaskInfo{}
×
1700
                }
×
1701
                taskInfoPerExecution[identity] = append(taskInfoPerExecution[identity], taskInfo)
×
1702
        }
1703

1704
        var wg sync.WaitGroup
×
1705
        wg.Add(len(taskInfoPerExecution))
×
1706
        tasksChan := make(chan *types.ReplicationTask, len(request.GetTaskInfos()))
×
1707
        handleTaskInfoPerExecution := func(taskInfos []*types.ReplicationTaskInfo) {
×
1708
                defer wg.Done()
×
1709
                if len(taskInfos) == 0 {
×
1710
                        return
×
1711
                }
×
1712

1713
                engine, err := h.controller.GetEngine(
×
1714
                        taskInfos[0].GetWorkflowID(),
×
1715
                )
×
1716
                if err != nil {
×
1717
                        h.GetLogger().Warn("History engine not found for workflow ID.", tag.Error(err))
×
1718
                        return
×
1719
                }
×
1720

1721
                tasks, err := engine.GetDLQReplicationMessages(
×
1722
                        ctx,
×
1723
                        taskInfos,
×
1724
                )
×
1725
                if err != nil {
×
1726
                        h.GetLogger().Error("Failed to get dlq replication tasks.", tag.Error(err))
×
1727
                        return
×
1728
                }
×
1729

1730
                for _, task := range tasks {
×
1731
                        tasksChan <- task
×
1732
                }
×
1733
        }
1734

1735
        for _, replicationTaskInfos := range taskInfoPerExecution {
×
1736
                go handleTaskInfoPerExecution(replicationTaskInfos)
×
1737
        }
×
1738
        wg.Wait()
×
1739
        close(tasksChan)
×
1740

×
1741
        replicationTasks := make([]*types.ReplicationTask, 0, len(tasksChan))
×
1742
        for task := range tasksChan {
×
1743
                replicationTasks = append(replicationTasks, task)
×
1744
        }
×
1745
        return &types.GetDLQReplicationMessagesResponse{
×
1746
                ReplicationTasks: replicationTasks,
×
1747
        }, nil
×
1748
}
1749

1750
// ReapplyEvents applies stale events to the current workflow and the current run
1751
func (h *handlerImpl) ReapplyEvents(
1752
        ctx context.Context,
1753
        request *types.HistoryReapplyEventsRequest,
1754
) (retError error) {
×
1755

×
1756
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1757
        h.startWG.Wait()
×
1758

×
1759
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryReapplyEventsScope)
×
1760
        defer sw.Stop()
×
1761

×
1762
        if h.isShuttingDown() {
×
1763
                return errShuttingDown
×
1764
        }
×
1765

1766
        domainID := request.GetDomainUUID()
×
1767
        workflowID := request.GetRequest().GetWorkflowExecution().GetWorkflowID()
×
1768
        engine, err := h.controller.GetEngine(workflowID)
×
1769
        if err != nil {
×
1770
                return h.error(err, scope, domainID, workflowID)
×
1771
        }
×
1772
        // deserialize history event object
1773
        historyEvents, err := h.GetPayloadSerializer().DeserializeBatchEvents(&persistence.DataBlob{
×
1774
                Encoding: common.EncodingTypeThriftRW,
×
1775
                Data:     request.GetRequest().GetEvents().GetData(),
×
1776
        })
×
1777
        if err != nil {
×
1778
                return h.error(err, scope, domainID, workflowID)
×
1779
        }
×
1780

1781
        execution := request.GetRequest().GetWorkflowExecution()
×
1782
        if err := engine.ReapplyEvents(
×
1783
                ctx,
×
1784
                request.GetDomainUUID(),
×
1785
                execution.GetWorkflowID(),
×
1786
                execution.GetRunID(),
×
1787
                historyEvents,
×
1788
        ); err != nil {
×
1789
                return h.error(err, scope, domainID, workflowID)
×
1790
        }
×
1791
        return nil
×
1792
}
1793

1794
func (h *handlerImpl) CountDLQMessages(
1795
        ctx context.Context,
1796
        request *types.CountDLQMessagesRequest,
1797
) (resp *types.HistoryCountDLQMessagesResponse, retError error) {
×
1798
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1799
        h.startWG.Wait()
×
1800

×
1801
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryCountDLQMessagesScope)
×
1802
        defer sw.Stop()
×
1803

×
1804
        if h.isShuttingDown() {
×
1805
                return nil, errShuttingDown
×
1806
        }
×
1807

1808
        g := &errgroup.Group{}
×
1809
        var mu sync.Mutex
×
1810
        entries := map[types.HistoryDLQCountKey]int64{}
×
1811
        for _, shardID := range h.controller.ShardIDs() {
×
1812
                shardID := shardID
×
1813
                g.Go(func() (e error) {
×
1814
                        defer func() { log.CapturePanic(recover(), h.GetLogger(), &e) }()
×
1815

1816
                        engine, err := h.controller.GetEngineForShard(int(shardID))
×
1817
                        if err != nil {
×
1818
                                return fmt.Errorf("dlq count for shard %d: %w", shardID, err)
×
1819
                        }
×
1820

1821
                        counts, err := engine.CountDLQMessages(ctx, request.ForceFetch)
×
1822
                        if err != nil {
×
1823
                                return fmt.Errorf("dlq count for shard %d: %w", shardID, err)
×
1824
                        }
×
1825

1826
                        mu.Lock()
×
1827
                        defer mu.Unlock()
×
1828
                        for sourceCluster, count := range counts {
×
1829
                                key := types.HistoryDLQCountKey{ShardID: shardID, SourceCluster: sourceCluster}
×
1830
                                entries[key] = count
×
1831
                        }
×
1832
                        return nil
×
1833
                })
1834
        }
1835

1836
        err := g.Wait()
×
1837
        return &types.HistoryCountDLQMessagesResponse{Entries: entries}, h.error(err, scope, "", "")
×
1838
}
1839

1840
// ReadDLQMessages reads replication DLQ messages
1841
func (h *handlerImpl) ReadDLQMessages(
1842
        ctx context.Context,
1843
        request *types.ReadDLQMessagesRequest,
1844
) (resp *types.ReadDLQMessagesResponse, retError error) {
×
1845

×
1846
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1847
        h.startWG.Wait()
×
1848

×
1849
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryReadDLQMessagesScope)
×
1850
        defer sw.Stop()
×
1851

×
1852
        if h.isShuttingDown() {
×
1853
                return nil, errShuttingDown
×
1854
        }
×
1855

1856
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
×
1857
        if err != nil {
×
1858
                return nil, h.error(err, scope, "", "")
×
1859
        }
×
1860

1861
        return engine.ReadDLQMessages(ctx, request)
×
1862
}
1863

1864
// PurgeDLQMessages deletes replication DLQ messages
1865
func (h *handlerImpl) PurgeDLQMessages(
1866
        ctx context.Context,
1867
        request *types.PurgeDLQMessagesRequest,
1868
) (retError error) {
×
1869

×
1870
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1871
        h.startWG.Wait()
×
1872

×
1873
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryPurgeDLQMessagesScope)
×
1874
        defer sw.Stop()
×
1875

×
1876
        if h.isShuttingDown() {
×
1877
                return errShuttingDown
×
1878
        }
×
1879

1880
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
×
1881
        if err != nil {
×
1882
                return h.error(err, scope, "", "")
×
1883
        }
×
1884

1885
        return engine.PurgeDLQMessages(ctx, request)
×
1886
}
1887

1888
// MergeDLQMessages reads and applies replication DLQ messages
1889
func (h *handlerImpl) MergeDLQMessages(
1890
        ctx context.Context,
1891
        request *types.MergeDLQMessagesRequest,
1892
) (resp *types.MergeDLQMessagesResponse, retError error) {
×
1893

×
1894
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
1895
        h.startWG.Wait()
×
1896

×
1897
        if h.isShuttingDown() {
×
1898
                return nil, errShuttingDown
×
1899
        }
×
1900

1901
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryMergeDLQMessagesScope)
×
1902
        defer sw.Stop()
×
1903

×
1904
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
×
1905
        if err != nil {
×
1906
                return nil, h.error(err, scope, "", "")
×
1907
        }
×
1908

1909
        return engine.MergeDLQMessages(ctx, request)
×
1910
}
1911

1912
// RefreshWorkflowTasks refreshes all the tasks of a workflow
1913
func (h *handlerImpl) RefreshWorkflowTasks(
1914
        ctx context.Context,
1915
        request *types.HistoryRefreshWorkflowTasksRequest) (retError error) {
×
1916

×
1917
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRefreshWorkflowTasksScope)
×
1918
        defer sw.Stop()
×
1919

×
1920
        if h.isShuttingDown() {
×
1921
                return errShuttingDown
×
1922
        }
×
1923

1924
        domainID := request.DomainUIID
×
1925
        execution := request.GetRequest().GetExecution()
×
1926
        workflowID := execution.GetWorkflowID()
×
1927
        engine, err := h.controller.GetEngine(workflowID)
×
1928
        if err != nil {
×
1929
                return h.error(err, scope, domainID, workflowID)
×
1930
        }
×
1931

1932
        err = engine.RefreshWorkflowTasks(
×
1933
                ctx,
×
1934
                domainID,
×
1935
                types.WorkflowExecution{
×
1936
                        WorkflowID: execution.WorkflowID,
×
1937
                        RunID:      execution.RunID,
×
1938
                },
×
1939
        )
×
1940

×
1941
        if err != nil {
×
1942
                return h.error(err, scope, domainID, workflowID)
×
1943
        }
×
1944

1945
        return nil
×
1946
}
1947

1948
// NotifyFailoverMarkers sends the failover markers to failover coordinator.
1949
// The coordinator decides when the failover finishes based on received failover marker.
1950
func (h *handlerImpl) NotifyFailoverMarkers(
1951
        ctx context.Context,
1952
        request *types.NotifyFailoverMarkersRequest,
1953
) (retError error) {
×
1954

×
1955
        _, sw := h.startRequestProfile(ctx, metrics.HistoryNotifyFailoverMarkersScope)
×
1956
        defer sw.Stop()
×
1957

×
1958
        for _, token := range request.GetFailoverMarkerTokens() {
×
1959
                marker := token.GetFailoverMarker()
×
1960
                h.GetLogger().Debug("Handling failover maker", tag.WorkflowDomainID(marker.GetDomainID()))
×
1961
                h.failoverCoordinator.ReceiveFailoverMarkers(token.GetShardIDs(), token.GetFailoverMarker())
×
1962
        }
×
1963
        return nil
×
1964
}
1965

1966
func (h *handlerImpl) GetCrossClusterTasks(
1967
        ctx context.Context,
1968
        request *types.GetCrossClusterTasksRequest,
1969
) (resp *types.GetCrossClusterTasksResponse, retError error) {
1✔
1970
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
2✔
1971
        h.startWG.Wait()
1✔
1972

1✔
1973
        _, sw := h.startRequestProfile(ctx, metrics.HistoryGetCrossClusterTasksScope)
1✔
1974
        defer sw.Stop()
1✔
1975

1✔
1976
        if h.isShuttingDown() {
1✔
1977
                return nil, errShuttingDown
×
1978
        }
×
1979

1980
        ctx, cancel := common.CreateChildContext(ctx, 0.05)
1✔
1981
        defer cancel()
1✔
1982

1✔
1983
        futureByShardID := make(map[int32]future.Future, len(request.ShardIDs))
1✔
1984
        for _, shardID := range request.ShardIDs {
11✔
1985
                future, settable := future.NewFuture()
10✔
1986
                futureByShardID[shardID] = future
10✔
1987
                go func(shardID int32) {
20✔
1988
                        logger := h.GetLogger().WithTags(tag.ShardID(int(shardID)))
10✔
1989
                        engine, err := h.controller.GetEngineForShard(int(shardID))
10✔
1990
                        if err != nil {
10✔
1991
                                logger.Error("History engine not found for shard", tag.Error(err))
×
1992
                                var owner membership.HostInfo
×
1993
                                if info, err := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(int(shardID))); err == nil {
×
1994
                                        owner = info
×
1995
                                }
×
1996
                                settable.Set(nil, shard.CreateShardOwnershipLostError(h.GetHostInfo(), owner))
×
1997
                                return
×
1998
                        }
1999

2000
                        if tasks, err := engine.GetCrossClusterTasks(ctx, request.TargetCluster); err != nil {
15✔
2001
                                logger.Error("Failed to get cross cluster tasks", tag.Error(err))
5✔
2002
                                settable.Set(nil, h.convertError(err))
5✔
2003
                        } else {
10✔
2004
                                settable.Set(tasks, nil)
5✔
2005
                        }
5✔
2006
                }(shardID)
2007
        }
2008

2009
        response := &types.GetCrossClusterTasksResponse{
1✔
2010
                TasksByShard:       make(map[int32][]*types.CrossClusterTaskRequest),
1✔
2011
                FailedCauseByShard: make(map[int32]types.GetTaskFailedCause),
1✔
2012
        }
1✔
2013
        for shardID, future := range futureByShardID {
11✔
2014
                var taskRequests []*types.CrossClusterTaskRequest
10✔
2015
                if futureErr := future.Get(ctx, &taskRequests); futureErr != nil {
15✔
2016
                        response.FailedCauseByShard[shardID] = common.ConvertErrToGetTaskFailedCause(futureErr)
5✔
2017
                } else {
10✔
2018
                        response.TasksByShard[shardID] = taskRequests
5✔
2019
                }
5✔
2020
        }
2021
        // not using a waitGroup for created goroutines here
2022
        // as once all futures are unblocked,
2023
        // those goroutines will eventually be completed
2024

2025
        return response, nil
1✔
2026
}
2027

2028
func (h *handlerImpl) RespondCrossClusterTasksCompleted(
2029
        ctx context.Context,
2030
        request *types.RespondCrossClusterTasksCompletedRequest,
2031
) (resp *types.RespondCrossClusterTasksCompletedResponse, retError error) {
2✔
2032
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
4✔
2033
        h.startWG.Wait()
2✔
2034

2✔
2035
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondCrossClusterTasksCompletedScope)
2✔
2036
        defer sw.Stop()
2✔
2037

2✔
2038
        if h.isShuttingDown() {
2✔
2039
                return nil, errShuttingDown
×
2040
        }
×
2041

2042
        engine, err := h.controller.GetEngineForShard(int(request.GetShardID()))
2✔
2043
        if err != nil {
2✔
2044
                return nil, h.error(err, scope, "", "")
×
2045
        }
×
2046

2047
        err = engine.RespondCrossClusterTasksCompleted(ctx, request.TargetCluster, request.TaskResponses)
2✔
2048
        if err != nil {
2✔
2049
                return nil, h.error(err, scope, "", "")
×
2050
        }
×
2051

2052
        response := &types.RespondCrossClusterTasksCompletedResponse{}
2✔
2053
        if request.FetchNewTasks {
3✔
2054
                fetchTaskCtx, cancel := common.CreateChildContext(ctx, 0.05)
1✔
2055
                defer cancel()
1✔
2056

1✔
2057
                response.Tasks, err = engine.GetCrossClusterTasks(fetchTaskCtx, request.TargetCluster)
1✔
2058
                if err != nil {
1✔
2059
                        return nil, h.error(err, scope, "", "")
×
2060
                }
×
2061
        }
2062
        return response, nil
2✔
2063
}
2064

2065
func (h *handlerImpl) GetFailoverInfo(
2066
        ctx context.Context,
2067
        request *types.GetFailoverInfoRequest,
2068
) (resp *types.GetFailoverInfoResponse, retError error) {
×
2069
        defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }()
×
2070
        h.startWG.Wait()
×
2071

×
2072
        scope, sw := h.startRequestProfile(ctx, metrics.HistoryGetFailoverInfoScope)
×
2073
        defer sw.Stop()
×
2074

×
2075
        if h.isShuttingDown() {
×
2076
                return nil, errShuttingDown
×
2077
        }
×
2078

2079
        resp, err := h.failoverCoordinator.GetFailoverInfo(request.GetDomainID())
×
2080
        if err != nil {
×
2081
                return nil, h.error(err, scope, request.GetDomainID(), "")
×
2082
        }
×
2083
        return resp, nil
×
2084
}
2085

2086
// convertError is a helper method to convert ShardOwnershipLostError from persistence layer returned by various
2087
// HistoryEngine API calls to ShardOwnershipLost error return by HistoryService for client to be redirected to the
2088
// correct shard.
2089
func (h *handlerImpl) convertError(err error) error {
167✔
2090
        switch err := err.(type) {
167✔
2091
        case *persistence.ShardOwnershipLostError:
×
2092
                info, err2 := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(err.ShardID))
×
2093
                if err2 != nil {
×
2094
                        return shard.CreateShardOwnershipLostError(h.GetHostInfo(), membership.HostInfo{})
×
2095
                }
×
2096

2097
                return shard.CreateShardOwnershipLostError(h.GetHostInfo(), info)
×
2098
        case *persistence.WorkflowExecutionAlreadyStartedError:
×
2099
                return &types.InternalServiceError{Message: err.Msg}
×
2100
        case *persistence.CurrentWorkflowConditionFailedError:
×
2101
                return &types.InternalServiceError{Message: err.Msg}
×
2102
        case *persistence.TransactionSizeLimitError:
×
2103
                return &types.BadRequestError{Message: err.Msg}
×
2104
        }
2105

2106
        return err
167✔
2107
}
2108

2109
func (h *handlerImpl) updateErrorMetric(
2110
        scope metrics.Scope,
2111
        domainID string,
2112
        workflowID string,
2113
        err error,
2114
) {
162✔
2115

162✔
2116
        if err == context.DeadlineExceeded || err == context.Canceled {
165✔
2117
                scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
3✔
2118
                return
3✔
2119
        }
3✔
2120

2121
        switch err := err.(type) {
159✔
2122
        case *types.ShardOwnershipLostError:
×
2123
                scope.IncCounter(metrics.CadenceErrShardOwnershipLostCounter)
×
2124
        case *types.EventAlreadyStartedError:
×
2125
                scope.IncCounter(metrics.CadenceErrEventAlreadyStartedCounter)
×
2126
        case *types.BadRequestError:
×
2127
                scope.IncCounter(metrics.CadenceErrBadRequestCounter)
×
2128
        case *types.DomainNotActiveError:
×
2129
                scope.IncCounter(metrics.CadenceErrBadRequestCounter)
×
2130
        case *types.WorkflowExecutionAlreadyStartedError:
24✔
2131
                scope.IncCounter(metrics.CadenceErrExecutionAlreadyStartedCounter)
24✔
2132
        case *types.EntityNotExistsError:
104✔
2133
                scope.IncCounter(metrics.CadenceErrEntityNotExistsCounter)
104✔
2134
        case *types.WorkflowExecutionAlreadyCompletedError:
19✔
2135
                scope.IncCounter(metrics.CadenceErrWorkflowExecutionAlreadyCompletedCounter)
19✔
2136
        case *types.CancellationAlreadyRequestedError:
3✔
2137
                scope.IncCounter(metrics.CadenceErrCancellationAlreadyRequestedCounter)
3✔
2138
        case *types.LimitExceededError:
×
2139
                scope.IncCounter(metrics.CadenceErrLimitExceededCounter)
×
2140
        case *types.RetryTaskV2Error:
×
2141
                scope.IncCounter(metrics.CadenceErrRetryTaskCounter)
×
2142
        case *types.ServiceBusyError:
×
2143
                scope.IncCounter(metrics.CadenceErrServiceBusyCounter)
×
2144
        case *yarpcerrors.Status:
×
2145
                if err.Code() == yarpcerrors.CodeDeadlineExceeded {
×
2146
                        scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
×
2147
                }
×
2148
                scope.IncCounter(metrics.CadenceFailures)
×
2149
        case *types.InternalServiceError:
×
2150
                scope.IncCounter(metrics.CadenceFailures)
×
2151
                h.GetLogger().Error("Internal service error",
×
2152
                        tag.Error(err),
×
2153
                        tag.WorkflowID(workflowID),
×
2154
                        tag.WorkflowDomainID(domainID))
×
2155
        default:
9✔
2156
                scope.IncCounter(metrics.CadenceFailures)
9✔
2157
                h.getLoggerWithTags(domainID, workflowID).Error("Uncategorized error", tag.Error(err))
9✔
2158
        }
2159
}
2160

2161
func (h *handlerImpl) error(
2162
        err error,
2163
        scope metrics.Scope,
2164
        domainID string,
2165
        workflowID string,
2166
) error {
162✔
2167

162✔
2168
        err = h.convertError(err)
162✔
2169
        h.updateErrorMetric(scope, domainID, workflowID, err)
162✔
2170

162✔
2171
        return err
162✔
2172
}
162✔
2173

2174
func (h *handlerImpl) getLoggerWithTags(
2175
        domainID string,
2176
        workflowID string,
2177
) log.Logger {
9✔
2178

9✔
2179
        logger := h.GetLogger()
9✔
2180
        if domainID != "" {
18✔
2181
                logger = logger.WithTags(tag.WorkflowDomainID(domainID))
9✔
2182
        }
9✔
2183

2184
        if workflowID != "" {
18✔
2185
                logger = logger.WithTags(tag.WorkflowID(workflowID))
9✔
2186
        }
9✔
2187

2188
        return logger
9✔
2189
}
2190

2191
func (h *handlerImpl) emitInfoOrDebugLog(
2192
        domainID string,
2193
        msg string,
2194
        tags ...tag.Tag,
2195
) {
1,426✔
2196
        if h.config.EnableDebugMode && h.config.EnableTaskInfoLogByDomainID(domainID) {
1,426✔
2197
                h.GetLogger().Info(msg, tags...)
×
2198
        } else {
1,426✔
2199
                h.GetLogger().Debug(msg, tags...)
1,426✔
2200
        }
1,426✔
2201
}
2202

2203
func (h *handlerImpl) startRequestProfile(ctx context.Context, scope int) (metrics.Scope, metrics.Stopwatch) {
5,531✔
2204
        metricsScope := h.GetMetricsClient().Scope(scope, metrics.GetContextTags(ctx)...)
5,531✔
2205
        metricsScope.IncCounter(metrics.CadenceRequests)
5,531✔
2206
        sw := metricsScope.StartTimer(metrics.CadenceLatency)
5,531✔
2207
        return metricsScope, sw
5,531✔
2208
}
5,531✔
2209

2210
func validateTaskToken(token *common.TaskToken) error {
1,804✔
2211
        if token.WorkflowID == "" {
1,804✔
2212
                return errWorkflowIDNotSet
×
2213
        }
×
2214
        if token.RunID != "" && uuid.Parse(token.RunID) == nil {
1,804✔
2215
                return errRunIDNotValid
×
2216
        }
×
2217
        return nil
1,804✔
2218
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc