• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018f127b-cd0f-43c3-b47a-35b58b7623db

24 Apr 2024 11:40PM UTC coverage: 67.722% (+0.008%) from 67.714%
018f127b-cd0f-43c3-b47a-35b58b7623db

push

buildkite

web-flow
Add double read for latency comparison for Pinot Migration (#5927)

* Add double read for latency comparison for Pinot Migration

* use go routine to make sure the primary read will not fail; update unit tests

* reformat

111 of 112 new or added lines in 4 files covered. (99.11%)

61 existing lines in 13 files now uncovered.

99335 of 146680 relevant lines covered (67.72%)

2377.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.74
/common/persistence/execution_manager.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package persistence
23

24
import (
25
        "context"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/types"
31
)
32

33
type (
34
        // executionManagerImpl implements ExecutionManager based on ExecutionStore, statsComputer and PayloadSerializer
35
        executionManagerImpl struct {
36
                serializer    PayloadSerializer
37
                persistence   ExecutionStore
38
                statsComputer statsComputer
39
                logger        log.Logger
40
        }
41
)
42

43
var _ ExecutionManager = (*executionManagerImpl)(nil)
44

45
// NewExecutionManagerImpl returns new ExecutionManager
46
func NewExecutionManagerImpl(
47
        persistence ExecutionStore,
48
        logger log.Logger,
49
        serializer PayloadSerializer,
50
) ExecutionManager {
106✔
51
        return &executionManagerImpl{
106✔
52
                serializer:    serializer,
106✔
53
                persistence:   persistence,
106✔
54
                statsComputer: statsComputer{},
106✔
55
                logger:        logger,
106✔
56
        }
106✔
57
}
106✔
58

59
func (m *executionManagerImpl) GetName() string {
1✔
60
        return m.persistence.GetName()
1✔
61
}
1✔
62

63
func (m *executionManagerImpl) GetShardID() int {
6,440✔
64
        return m.persistence.GetShardID()
6,440✔
65
}
6,440✔
66

67
// The below three APIs are related to serialization/deserialization
68

69
func (m *executionManagerImpl) GetWorkflowExecution(
70
        ctx context.Context,
71
        request *GetWorkflowExecutionRequest,
72
) (*GetWorkflowExecutionResponse, error) {
1,202✔
73

1,202✔
74
        internalRequest := &InternalGetWorkflowExecutionRequest{
1,202✔
75
                DomainID:  request.DomainID,
1,202✔
76
                Execution: request.Execution,
1,202✔
77
                RangeID:   request.RangeID,
1,202✔
78
        }
1,202✔
79
        response, err := m.persistence.GetWorkflowExecution(ctx, internalRequest)
1,202✔
80
        if err != nil {
1,674✔
81
                return nil, err
472✔
82
        }
472✔
83
        newResponse := &GetWorkflowExecutionResponse{
733✔
84
                State: &WorkflowMutableState{
733✔
85
                        TimerInfos:         response.State.TimerInfos,
733✔
86
                        RequestCancelInfos: response.State.RequestCancelInfos,
733✔
87
                        SignalInfos:        response.State.SignalInfos,
733✔
88
                        SignalRequestedIDs: response.State.SignalRequestedIDs,
733✔
89
                        ReplicationState:   response.State.ReplicationState, // TODO: remove this after all 2DC workflows complete
733✔
90
                        Checksum:           response.State.Checksum,
733✔
91
                },
733✔
92
        }
733✔
93

733✔
94
        newResponse.State.ActivityInfos, err = m.DeserializeActivityInfos(response.State.ActivityInfos)
733✔
95
        if err != nil {
733✔
96
                return nil, err
×
97
        }
×
98
        newResponse.State.ChildExecutionInfos, err = m.DeserializeChildExecutionInfos(response.State.ChildExecutionInfos)
733✔
99
        if err != nil {
733✔
100
                return nil, err
×
101
        }
×
102
        newResponse.State.BufferedEvents, err = m.DeserializeBufferedEvents(response.State.BufferedEvents)
733✔
103
        if err != nil {
733✔
104
                return nil, err
×
105
        }
×
106
        newResponse.State.ExecutionInfo, newResponse.State.ExecutionStats, err = m.DeserializeExecutionInfo(response.State.ExecutionInfo)
733✔
107
        if err != nil {
733✔
108
                return nil, err
×
109
        }
×
110
        versionHistories, err := m.DeserializeVersionHistories(response.State.VersionHistories)
733✔
111
        if err != nil {
733✔
112
                return nil, err
×
113
        }
×
114
        newResponse.State.VersionHistories = versionHistories
733✔
115
        newResponse.MutableStateStats = m.statsComputer.computeMutableStateStats(response)
733✔
116

733✔
117
        if len(newResponse.State.Checksum.Value) == 0 {
1,466✔
118
                newResponse.State.Checksum, err = m.serializer.DeserializeChecksum(response.State.ChecksumData)
733✔
119
                if err != nil {
733✔
120
                        return nil, err
×
121
                }
×
122
        }
123

124
        return newResponse, nil
733✔
125
}
126

127
func (m *executionManagerImpl) DeserializeExecutionInfo(
128
        info *InternalWorkflowExecutionInfo,
129
) (*WorkflowExecutionInfo, *ExecutionStats, error) {
733✔
130

733✔
131
        completionEvent, err := m.serializer.DeserializeEvent(info.CompletionEvent)
733✔
132
        if err != nil {
733✔
133
                return nil, nil, err
×
134
        }
×
135

136
        autoResetPoints, err := m.serializer.DeserializeResetPoints(info.AutoResetPoints)
733✔
137
        if err != nil {
733✔
138
                return nil, nil, err
×
139
        }
×
140

141
        newInfo := &WorkflowExecutionInfo{
733✔
142
                CompletionEvent: completionEvent,
733✔
143

733✔
144
                DomainID:                           info.DomainID,
733✔
145
                WorkflowID:                         info.WorkflowID,
733✔
146
                RunID:                              info.RunID,
733✔
147
                FirstExecutionRunID:                info.FirstExecutionRunID,
733✔
148
                ParentDomainID:                     info.ParentDomainID,
733✔
149
                ParentWorkflowID:                   info.ParentWorkflowID,
733✔
150
                ParentRunID:                        info.ParentRunID,
733✔
151
                InitiatedID:                        info.InitiatedID,
733✔
152
                CompletionEventBatchID:             info.CompletionEventBatchID,
733✔
153
                TaskList:                           info.TaskList,
733✔
154
                IsCron:                             len(info.CronSchedule) > 0,
733✔
155
                WorkflowTypeName:                   info.WorkflowTypeName,
733✔
156
                WorkflowTimeout:                    int32(info.WorkflowTimeout.Seconds()),
733✔
157
                DecisionStartToCloseTimeout:        int32(info.DecisionStartToCloseTimeout.Seconds()),
733✔
158
                ExecutionContext:                   info.ExecutionContext,
733✔
159
                State:                              info.State,
733✔
160
                CloseStatus:                        info.CloseStatus,
733✔
161
                LastFirstEventID:                   info.LastFirstEventID,
733✔
162
                LastEventTaskID:                    info.LastEventTaskID,
733✔
163
                NextEventID:                        info.NextEventID,
733✔
164
                LastProcessedEvent:                 info.LastProcessedEvent,
733✔
165
                StartTimestamp:                     info.StartTimestamp,
733✔
166
                LastUpdatedTimestamp:               info.LastUpdatedTimestamp,
733✔
167
                CreateRequestID:                    info.CreateRequestID,
733✔
168
                SignalCount:                        info.SignalCount,
733✔
169
                DecisionVersion:                    info.DecisionVersion,
733✔
170
                DecisionScheduleID:                 info.DecisionScheduleID,
733✔
171
                DecisionStartedID:                  info.DecisionStartedID,
733✔
172
                DecisionRequestID:                  info.DecisionRequestID,
733✔
173
                DecisionTimeout:                    int32(info.DecisionTimeout.Seconds()),
733✔
174
                DecisionAttempt:                    info.DecisionAttempt,
733✔
175
                DecisionStartedTimestamp:           info.DecisionStartedTimestamp.UnixNano(),
733✔
176
                DecisionScheduledTimestamp:         info.DecisionScheduledTimestamp.UnixNano(),
733✔
177
                DecisionOriginalScheduledTimestamp: info.DecisionOriginalScheduledTimestamp.UnixNano(),
733✔
178
                CancelRequested:                    info.CancelRequested,
733✔
179
                CancelRequestID:                    info.CancelRequestID,
733✔
180
                StickyTaskList:                     info.StickyTaskList,
733✔
181
                StickyScheduleToStartTimeout:       int32(info.StickyScheduleToStartTimeout.Seconds()),
733✔
182
                ClientLibraryVersion:               info.ClientLibraryVersion,
733✔
183
                ClientFeatureVersion:               info.ClientFeatureVersion,
733✔
184
                ClientImpl:                         info.ClientImpl,
733✔
185
                Attempt:                            info.Attempt,
733✔
186
                HasRetryPolicy:                     info.HasRetryPolicy,
733✔
187
                InitialInterval:                    int32(info.InitialInterval.Seconds()),
733✔
188
                BackoffCoefficient:                 info.BackoffCoefficient,
733✔
189
                MaximumInterval:                    int32(info.MaximumInterval.Seconds()),
733✔
190
                ExpirationTime:                     info.ExpirationTime,
733✔
191
                MaximumAttempts:                    info.MaximumAttempts,
733✔
192
                NonRetriableErrors:                 info.NonRetriableErrors,
733✔
193
                BranchToken:                        info.BranchToken,
733✔
194
                CronSchedule:                       info.CronSchedule,
733✔
195
                ExpirationSeconds:                  int32(info.ExpirationInterval.Seconds()),
733✔
196
                AutoResetPoints:                    autoResetPoints,
733✔
197
                SearchAttributes:                   info.SearchAttributes,
733✔
198
                Memo:                               info.Memo,
733✔
199
                PartitionConfig:                    info.PartitionConfig,
733✔
200
        }
733✔
201
        newStats := &ExecutionStats{
733✔
202
                HistorySize: info.HistorySize,
733✔
203
        }
733✔
204
        return newInfo, newStats, nil
733✔
205
}
206

207
func (m *executionManagerImpl) DeserializeBufferedEvents(
208
        blobs []*DataBlob,
209
) ([]*types.HistoryEvent, error) {
733✔
210

733✔
211
        events := make([]*types.HistoryEvent, 0)
733✔
212
        for _, b := range blobs {
736✔
213
                history, err := m.serializer.DeserializeBatchEvents(b)
3✔
214
                if err != nil {
3✔
215
                        return nil, err
×
216
                }
×
217
                events = append(events, history...)
3✔
218
        }
219
        return events, nil
733✔
220
}
221

222
func (m *executionManagerImpl) DeserializeChildExecutionInfos(
223
        infos map[int64]*InternalChildExecutionInfo,
224
) (map[int64]*ChildExecutionInfo, error) {
733✔
225

733✔
226
        newInfos := make(map[int64]*ChildExecutionInfo)
733✔
227
        for k, v := range infos {
738✔
228
                initiatedEvent, err := m.serializer.DeserializeEvent(v.InitiatedEvent)
5✔
229
                if err != nil {
5✔
230
                        return nil, err
×
231
                }
×
232
                startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent)
5✔
233
                if err != nil {
5✔
234
                        return nil, err
×
235
                }
×
236
                c := &ChildExecutionInfo{
5✔
237
                        InitiatedEvent: initiatedEvent,
5✔
238
                        StartedEvent:   startedEvent,
5✔
239

5✔
240
                        Version:               v.Version,
5✔
241
                        InitiatedID:           v.InitiatedID,
5✔
242
                        InitiatedEventBatchID: v.InitiatedEventBatchID,
5✔
243
                        StartedID:             v.StartedID,
5✔
244
                        StartedWorkflowID:     v.StartedWorkflowID,
5✔
245
                        StartedRunID:          v.StartedRunID,
5✔
246
                        CreateRequestID:       v.CreateRequestID,
5✔
247
                        DomainID:              v.DomainID,
5✔
248
                        DomainNameDEPRECATED:  v.DomainNameDEPRECATED,
5✔
249
                        WorkflowTypeName:      v.WorkflowTypeName,
5✔
250
                        ParentClosePolicy:     v.ParentClosePolicy,
5✔
251
                }
5✔
252

5✔
253
                // Needed for backward compatibility reason.
5✔
254
                // ChildWorkflowExecutionStartedEvent was only used by transfer queue processing of StartChildWorkflow.
5✔
255
                // Updated the code to instead directly read WorkflowId and RunId from mutable state
5✔
256
                // Existing mutable state won't have those values set so instead use started event to set StartedWorkflowID and
5✔
257
                // StartedRunID on the mutable state before passing it to application
5✔
258
                if startedEvent != nil && startedEvent.ChildWorkflowExecutionStartedEventAttributes != nil &&
5✔
259
                        startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution != nil {
5✔
260
                        startedExecution := startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution
×
261
                        c.StartedWorkflowID = startedExecution.GetWorkflowID()
×
262
                        c.StartedRunID = startedExecution.GetRunID()
×
263
                }
×
264
                newInfos[k] = c
5✔
265
        }
266
        return newInfos, nil
733✔
267
}
268

269
func (m *executionManagerImpl) DeserializeActivityInfos(
270
        infos map[int64]*InternalActivityInfo,
271
) (map[int64]*ActivityInfo, error) {
733✔
272

733✔
273
        newInfos := make(map[int64]*ActivityInfo)
733✔
274
        for k, v := range infos {
851✔
275
                scheduledEvent, err := m.serializer.DeserializeEvent(v.ScheduledEvent)
118✔
276
                if err != nil {
118✔
277
                        return nil, err
×
278
                }
×
279
                startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent)
118✔
280
                if err != nil {
118✔
281
                        return nil, err
×
282
                }
×
283
                a := &ActivityInfo{
118✔
284
                        ScheduledEvent: scheduledEvent,
118✔
285
                        StartedEvent:   startedEvent,
118✔
286

118✔
287
                        Version:                                 v.Version,
118✔
288
                        ScheduleID:                              v.ScheduleID,
118✔
289
                        ScheduledEventBatchID:                   v.ScheduledEventBatchID,
118✔
290
                        ScheduledTime:                           v.ScheduledTime,
118✔
291
                        StartedID:                               v.StartedID,
118✔
292
                        StartedTime:                             v.StartedTime,
118✔
293
                        ActivityID:                              v.ActivityID,
118✔
294
                        RequestID:                               v.RequestID,
118✔
295
                        Details:                                 v.Details,
118✔
296
                        ScheduleToStartTimeout:                  int32(v.ScheduleToStartTimeout.Seconds()),
118✔
297
                        ScheduleToCloseTimeout:                  int32(v.ScheduleToCloseTimeout.Seconds()),
118✔
298
                        StartToCloseTimeout:                     int32(v.StartToCloseTimeout.Seconds()),
118✔
299
                        HeartbeatTimeout:                        int32(v.HeartbeatTimeout.Seconds()),
118✔
300
                        CancelRequested:                         v.CancelRequested,
118✔
301
                        CancelRequestID:                         v.CancelRequestID,
118✔
302
                        LastHeartBeatUpdatedTime:                v.LastHeartBeatUpdatedTime,
118✔
303
                        TimerTaskStatus:                         v.TimerTaskStatus,
118✔
304
                        Attempt:                                 v.Attempt,
118✔
305
                        DomainID:                                v.DomainID,
118✔
306
                        StartedIdentity:                         v.StartedIdentity,
118✔
307
                        TaskList:                                v.TaskList,
118✔
308
                        HasRetryPolicy:                          v.HasRetryPolicy,
118✔
309
                        InitialInterval:                         int32(v.InitialInterval.Seconds()),
118✔
310
                        BackoffCoefficient:                      v.BackoffCoefficient,
118✔
311
                        MaximumInterval:                         int32(v.MaximumInterval.Seconds()),
118✔
312
                        ExpirationTime:                          v.ExpirationTime,
118✔
313
                        MaximumAttempts:                         v.MaximumAttempts,
118✔
314
                        NonRetriableErrors:                      v.NonRetriableErrors,
118✔
315
                        LastFailureReason:                       v.LastFailureReason,
118✔
316
                        LastWorkerIdentity:                      v.LastWorkerIdentity,
118✔
317
                        LastFailureDetails:                      v.LastFailureDetails,
118✔
318
                        LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds,
118✔
319
                }
118✔
320
                newInfos[k] = a
118✔
321
        }
322
        return newInfos, nil
733✔
323
}
324

325
func (m *executionManagerImpl) UpdateWorkflowExecution(
326
        ctx context.Context,
327
        request *UpdateWorkflowExecutionRequest,
328
) (*UpdateWorkflowExecutionResponse, error) {
4,367✔
329

4,367✔
330
        serializedWorkflowMutation, err := m.SerializeWorkflowMutation(&request.UpdateWorkflowMutation, request.Encoding)
4,367✔
331
        if err != nil {
4,367✔
332
                return nil, err
×
333
        }
×
334
        var serializedNewWorkflowSnapshot *InternalWorkflowSnapshot
4,367✔
335
        if request.NewWorkflowSnapshot != nil {
4,542✔
336
                serializedNewWorkflowSnapshot, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding)
175✔
337
                if err != nil {
175✔
338
                        return nil, err
×
339
                }
×
340
        }
341

342
        newRequest := &InternalUpdateWorkflowExecutionRequest{
4,367✔
343
                RangeID: request.RangeID,
4,367✔
344

4,367✔
345
                Mode: request.Mode,
4,367✔
346

4,367✔
347
                UpdateWorkflowMutation: *serializedWorkflowMutation,
4,367✔
348
                NewWorkflowSnapshot:    serializedNewWorkflowSnapshot,
4,367✔
349

4,367✔
350
                WorkflowRequestMode: request.WorkflowRequestMode,
4,367✔
351
        }
4,367✔
352
        msuss := m.statsComputer.computeMutableStateUpdateStats(newRequest)
4,367✔
353
        err = m.persistence.UpdateWorkflowExecution(ctx, newRequest)
4,367✔
354
        if err != nil {
4,367✔
355
                return nil, err
×
356
        }
×
357
        return &UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
4,367✔
358
}
359

360
func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos(
361
        infos []*ChildExecutionInfo,
362
        encoding common.EncodingType,
363
) ([]*InternalChildExecutionInfo, error) {
5,058✔
364

5,058✔
365
        newInfos := make([]*InternalChildExecutionInfo, 0)
5,058✔
366
        for _, v := range infos {
5,097✔
367
                initiatedEvent, err := m.serializer.SerializeEvent(v.InitiatedEvent, encoding)
39✔
368
                if err != nil {
39✔
369
                        return nil, err
×
370
                }
×
371
                startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
39✔
372
                if err != nil {
39✔
373
                        return nil, err
×
374
                }
×
375
                i := &InternalChildExecutionInfo{
39✔
376
                        InitiatedEvent: initiatedEvent,
39✔
377
                        StartedEvent:   startedEvent,
39✔
378

39✔
379
                        Version:               v.Version,
39✔
380
                        InitiatedID:           v.InitiatedID,
39✔
381
                        InitiatedEventBatchID: v.InitiatedEventBatchID,
39✔
382
                        CreateRequestID:       v.CreateRequestID,
39✔
383
                        StartedID:             v.StartedID,
39✔
384
                        StartedWorkflowID:     v.StartedWorkflowID,
39✔
385
                        StartedRunID:          v.StartedRunID,
39✔
386
                        DomainID:              v.DomainID,
39✔
387
                        DomainNameDEPRECATED:  v.DomainNameDEPRECATED,
39✔
388
                        WorkflowTypeName:      v.WorkflowTypeName,
39✔
389
                        ParentClosePolicy:     v.ParentClosePolicy,
39✔
390
                }
39✔
391
                newInfos = append(newInfos, i)
39✔
392
        }
393
        return newInfos, nil
5,058✔
394
}
395

396
func (m *executionManagerImpl) SerializeUpsertActivityInfos(
397
        infos []*ActivityInfo,
398
        encoding common.EncodingType,
399
) ([]*InternalActivityInfo, error) {
5,058✔
400

5,058✔
401
        newInfos := make([]*InternalActivityInfo, 0)
5,058✔
402
        for _, v := range infos {
6,314✔
403
                scheduledEvent, err := m.serializer.SerializeEvent(v.ScheduledEvent, encoding)
1,256✔
404
                if err != nil {
1,256✔
405
                        return nil, err
×
406
                }
×
407
                startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
1,256✔
408
                if err != nil {
1,256✔
409
                        return nil, err
×
410
                }
×
411
                i := &InternalActivityInfo{
1,256✔
412
                        Version:                                 v.Version,
1,256✔
413
                        ScheduleID:                              v.ScheduleID,
1,256✔
414
                        ScheduledEventBatchID:                   v.ScheduledEventBatchID,
1,256✔
415
                        ScheduledEvent:                          scheduledEvent,
1,256✔
416
                        ScheduledTime:                           v.ScheduledTime,
1,256✔
417
                        StartedID:                               v.StartedID,
1,256✔
418
                        StartedEvent:                            startedEvent,
1,256✔
419
                        StartedTime:                             v.StartedTime,
1,256✔
420
                        ActivityID:                              v.ActivityID,
1,256✔
421
                        RequestID:                               v.RequestID,
1,256✔
422
                        Details:                                 v.Details,
1,256✔
423
                        ScheduleToStartTimeout:                  common.SecondsToDuration(int64(v.ScheduleToStartTimeout)),
1,256✔
424
                        ScheduleToCloseTimeout:                  common.SecondsToDuration(int64(v.ScheduleToCloseTimeout)),
1,256✔
425
                        StartToCloseTimeout:                     common.SecondsToDuration(int64(v.StartToCloseTimeout)),
1,256✔
426
                        HeartbeatTimeout:                        common.SecondsToDuration(int64(v.HeartbeatTimeout)),
1,256✔
427
                        CancelRequested:                         v.CancelRequested,
1,256✔
428
                        CancelRequestID:                         v.CancelRequestID,
1,256✔
429
                        LastHeartBeatUpdatedTime:                v.LastHeartBeatUpdatedTime,
1,256✔
430
                        TimerTaskStatus:                         v.TimerTaskStatus,
1,256✔
431
                        Attempt:                                 v.Attempt,
1,256✔
432
                        DomainID:                                v.DomainID,
1,256✔
433
                        StartedIdentity:                         v.StartedIdentity,
1,256✔
434
                        TaskList:                                v.TaskList,
1,256✔
435
                        HasRetryPolicy:                          v.HasRetryPolicy,
1,256✔
436
                        InitialInterval:                         common.SecondsToDuration(int64(v.InitialInterval)),
1,256✔
437
                        BackoffCoefficient:                      v.BackoffCoefficient,
1,256✔
438
                        MaximumInterval:                         common.SecondsToDuration(int64(v.MaximumInterval)),
1,256✔
439
                        ExpirationTime:                          v.ExpirationTime,
1,256✔
440
                        MaximumAttempts:                         v.MaximumAttempts,
1,256✔
441
                        NonRetriableErrors:                      v.NonRetriableErrors,
1,256✔
442
                        LastFailureReason:                       v.LastFailureReason,
1,256✔
443
                        LastWorkerIdentity:                      v.LastWorkerIdentity,
1,256✔
444
                        LastFailureDetails:                      v.LastFailureDetails,
1,256✔
445
                        LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds,
1,256✔
446
                }
1,256✔
447
                newInfos = append(newInfos, i)
1,256✔
448
        }
449
        return newInfos, nil
5,058✔
450
}
451

452
func (m *executionManagerImpl) SerializeExecutionInfo(
453
        info *WorkflowExecutionInfo,
454
        stats *ExecutionStats,
455
        encoding common.EncodingType,
456
) (*InternalWorkflowExecutionInfo, error) {
5,058✔
457

5,058✔
458
        if info == nil {
5,058✔
459
                return &InternalWorkflowExecutionInfo{}, nil
×
460
        }
×
461
        completionEvent, err := m.serializer.SerializeEvent(info.CompletionEvent, encoding)
5,058✔
462
        if err != nil {
5,058✔
463
                return nil, err
×
464
        }
×
465

466
        resetPoints, err := m.serializer.SerializeResetPoints(info.AutoResetPoints, encoding)
5,058✔
467
        if err != nil {
5,058✔
468
                return nil, err
×
469
        }
×
470

471
        return &InternalWorkflowExecutionInfo{
5,058✔
472
                DomainID:                           info.DomainID,
5,058✔
473
                WorkflowID:                         info.WorkflowID,
5,058✔
474
                RunID:                              info.RunID,
5,058✔
475
                FirstExecutionRunID:                info.FirstExecutionRunID,
5,058✔
476
                ParentDomainID:                     info.ParentDomainID,
5,058✔
477
                ParentWorkflowID:                   info.ParentWorkflowID,
5,058✔
478
                ParentRunID:                        info.ParentRunID,
5,058✔
479
                InitiatedID:                        info.InitiatedID,
5,058✔
480
                CompletionEventBatchID:             info.CompletionEventBatchID,
5,058✔
481
                CompletionEvent:                    completionEvent,
5,058✔
482
                TaskList:                           info.TaskList,
5,058✔
483
                WorkflowTypeName:                   info.WorkflowTypeName,
5,058✔
484
                WorkflowTimeout:                    common.SecondsToDuration(int64(info.WorkflowTimeout)),
5,058✔
485
                DecisionStartToCloseTimeout:        common.SecondsToDuration(int64(info.DecisionStartToCloseTimeout)),
5,058✔
486
                ExecutionContext:                   info.ExecutionContext,
5,058✔
487
                State:                              info.State,
5,058✔
488
                CloseStatus:                        info.CloseStatus,
5,058✔
489
                LastFirstEventID:                   info.LastFirstEventID,
5,058✔
490
                LastEventTaskID:                    info.LastEventTaskID,
5,058✔
491
                NextEventID:                        info.NextEventID,
5,058✔
492
                LastProcessedEvent:                 info.LastProcessedEvent,
5,058✔
493
                StartTimestamp:                     info.StartTimestamp,
5,058✔
494
                LastUpdatedTimestamp:               info.LastUpdatedTimestamp,
5,058✔
495
                CreateRequestID:                    info.CreateRequestID,
5,058✔
496
                SignalCount:                        info.SignalCount,
5,058✔
497
                DecisionVersion:                    info.DecisionVersion,
5,058✔
498
                DecisionScheduleID:                 info.DecisionScheduleID,
5,058✔
499
                DecisionStartedID:                  info.DecisionStartedID,
5,058✔
500
                DecisionRequestID:                  info.DecisionRequestID,
5,058✔
501
                DecisionTimeout:                    common.SecondsToDuration(int64(info.DecisionTimeout)),
5,058✔
502
                DecisionAttempt:                    info.DecisionAttempt,
5,058✔
503
                DecisionStartedTimestamp:           time.Unix(0, info.DecisionStartedTimestamp).UTC(),
5,058✔
504
                DecisionScheduledTimestamp:         time.Unix(0, info.DecisionScheduledTimestamp).UTC(),
5,058✔
505
                DecisionOriginalScheduledTimestamp: time.Unix(0, info.DecisionOriginalScheduledTimestamp).UTC(),
5,058✔
506
                CancelRequested:                    info.CancelRequested,
5,058✔
507
                CancelRequestID:                    info.CancelRequestID,
5,058✔
508
                StickyTaskList:                     info.StickyTaskList,
5,058✔
509
                StickyScheduleToStartTimeout:       common.SecondsToDuration(int64(info.StickyScheduleToStartTimeout)),
5,058✔
510
                ClientLibraryVersion:               info.ClientLibraryVersion,
5,058✔
511
                ClientFeatureVersion:               info.ClientFeatureVersion,
5,058✔
512
                ClientImpl:                         info.ClientImpl,
5,058✔
513
                AutoResetPoints:                    resetPoints,
5,058✔
514
                Attempt:                            info.Attempt,
5,058✔
515
                HasRetryPolicy:                     info.HasRetryPolicy,
5,058✔
516
                InitialInterval:                    common.SecondsToDuration(int64(info.InitialInterval)),
5,058✔
517
                BackoffCoefficient:                 info.BackoffCoefficient,
5,058✔
518
                MaximumInterval:                    common.SecondsToDuration(int64(info.MaximumInterval)),
5,058✔
519
                ExpirationTime:                     info.ExpirationTime,
5,058✔
520
                MaximumAttempts:                    info.MaximumAttempts,
5,058✔
521
                NonRetriableErrors:                 info.NonRetriableErrors,
5,058✔
522
                BranchToken:                        info.BranchToken,
5,058✔
523
                CronSchedule:                       info.CronSchedule,
5,058✔
524
                ExpirationInterval:                 common.SecondsToDuration(int64(info.ExpirationSeconds)),
5,058✔
525
                Memo:                               info.Memo,
5,058✔
526
                SearchAttributes:                   info.SearchAttributes,
5,058✔
527
                PartitionConfig:                    info.PartitionConfig,
5,058✔
528

5,058✔
529
                // attributes which are not related to mutable state
5,058✔
530
                HistorySize: stats.HistorySize,
5,058✔
531
        }, nil
5,058✔
532
}
533

534
func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
535
        ctx context.Context,
536
        request *ConflictResolveWorkflowExecutionRequest,
537
) (*ConflictResolveWorkflowExecutionResponse, error) {
3✔
538

3✔
539
        serializedResetWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.ResetWorkflowSnapshot, request.Encoding)
3✔
540
        if err != nil {
3✔
541
                return nil, err
×
542
        }
×
543
        var serializedCurrentWorkflowMutation *InternalWorkflowMutation
3✔
544
        if request.CurrentWorkflowMutation != nil {
3✔
UNCOV
545
                serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation, request.Encoding)
×
UNCOV
546
                if err != nil {
×
547
                        return nil, err
×
548
                }
×
549
        }
550
        var serializedNewWorkflowMutation *InternalWorkflowSnapshot
3✔
551
        if request.NewWorkflowSnapshot != nil {
3✔
552
                serializedNewWorkflowMutation, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding)
×
553
                if err != nil {
×
554
                        return nil, err
×
555
                }
×
556
        }
557

558
        newRequest := &InternalConflictResolveWorkflowExecutionRequest{
3✔
559
                RangeID: request.RangeID,
3✔
560

3✔
561
                Mode: request.Mode,
3✔
562

3✔
563
                ResetWorkflowSnapshot: *serializedResetWorkflowSnapshot,
3✔
564

3✔
565
                NewWorkflowSnapshot: serializedNewWorkflowMutation,
3✔
566

3✔
567
                CurrentWorkflowMutation: serializedCurrentWorkflowMutation,
3✔
568

3✔
569
                WorkflowRequestMode: request.WorkflowRequestMode,
3✔
570
        }
3✔
571
        msuss := m.statsComputer.computeMutableStateConflictResolveStats(newRequest)
3✔
572
        err = m.persistence.ConflictResolveWorkflowExecution(ctx, newRequest)
3✔
573
        if err != nil {
3✔
574
                return nil, err
×
575
        }
×
576
        return &ConflictResolveWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
3✔
577
}
578

579
func (m *executionManagerImpl) CreateWorkflowExecution(
580
        ctx context.Context,
581
        request *CreateWorkflowExecutionRequest,
582
) (*CreateWorkflowExecutionResponse, error) {
522✔
583

522✔
584
        encoding := common.EncodingTypeThriftRW
522✔
585

522✔
586
        serializedNewWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.NewWorkflowSnapshot, encoding)
522✔
587
        if err != nil {
522✔
588
                return nil, err
×
589
        }
×
590

591
        newRequest := &InternalCreateWorkflowExecutionRequest{
522✔
592
                RangeID: request.RangeID,
522✔
593

522✔
594
                Mode: request.Mode,
522✔
595

522✔
596
                PreviousRunID:            request.PreviousRunID,
522✔
597
                PreviousLastWriteVersion: request.PreviousLastWriteVersion,
522✔
598

522✔
599
                NewWorkflowSnapshot: *serializedNewWorkflowSnapshot,
522✔
600

522✔
601
                WorkflowRequestMode: request.WorkflowRequestMode,
522✔
602
        }
522✔
603

522✔
604
        msuss := m.statsComputer.computeMutableStateCreateStats(newRequest)
522✔
605
        _, err = m.persistence.CreateWorkflowExecution(ctx, newRequest)
522✔
606
        if err != nil {
582✔
607
                return nil, err
60✔
608
        }
60✔
609
        return &CreateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
465✔
610
}
611

612
func (m *executionManagerImpl) SerializeWorkflowMutation(
613
        input *WorkflowMutation,
614
        encoding common.EncodingType,
615
) (*InternalWorkflowMutation, error) {
4,367✔
616

4,367✔
617
        serializedExecutionInfo, err := m.SerializeExecutionInfo(
4,367✔
618
                input.ExecutionInfo,
4,367✔
619
                input.ExecutionStats,
4,367✔
620
                encoding,
4,367✔
621
        )
4,367✔
622
        if err != nil {
4,367✔
623
                return nil, err
×
624
        }
×
625
        serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
4,367✔
626
        if err != nil {
4,367✔
627
                return nil, err
×
628
        }
×
629
        serializedUpsertActivityInfos, err := m.SerializeUpsertActivityInfos(input.UpsertActivityInfos, encoding)
4,367✔
630
        if err != nil {
4,367✔
631
                return nil, err
×
632
        }
×
633
        serializedUpsertChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.UpsertChildExecutionInfos, encoding)
4,367✔
634
        if err != nil {
4,367✔
635
                return nil, err
×
636
        }
×
637
        var serializedNewBufferedEvents *DataBlob
4,367✔
638
        if input.NewBufferedEvents != nil {
5,009✔
639
                serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, encoding)
642✔
640
                if err != nil {
642✔
641
                        return nil, err
×
642
                }
×
643
        }
644

645
        startVersion, err := getStartVersion(input.VersionHistories)
4,367✔
646
        if err != nil {
4,367✔
647
                return nil, err
×
648
        }
×
649
        lastWriteVersion, err := getLastWriteVersion(input.VersionHistories)
4,367✔
650
        if err != nil {
4,367✔
651
                return nil, err
×
652
        }
×
653
        checksumData, err := m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON)
4,367✔
654
        if err != nil {
4,367✔
655
                return nil, err
×
656
        }
×
657

658
        return &InternalWorkflowMutation{
4,367✔
659
                ExecutionInfo:    serializedExecutionInfo,
4,367✔
660
                VersionHistories: serializedVersionHistories,
4,367✔
661
                StartVersion:     startVersion,
4,367✔
662
                LastWriteVersion: lastWriteVersion,
4,367✔
663

4,367✔
664
                UpsertActivityInfos:       serializedUpsertActivityInfos,
4,367✔
665
                DeleteActivityInfos:       input.DeleteActivityInfos,
4,367✔
666
                UpsertTimerInfos:          input.UpsertTimerInfos,
4,367✔
667
                DeleteTimerInfos:          input.DeleteTimerInfos,
4,367✔
668
                UpsertChildExecutionInfos: serializedUpsertChildExecutionInfos,
4,367✔
669
                DeleteChildExecutionInfos: input.DeleteChildExecutionInfos,
4,367✔
670
                UpsertRequestCancelInfos:  input.UpsertRequestCancelInfos,
4,367✔
671
                DeleteRequestCancelInfos:  input.DeleteRequestCancelInfos,
4,367✔
672
                UpsertSignalInfos:         input.UpsertSignalInfos,
4,367✔
673
                DeleteSignalInfos:         input.DeleteSignalInfos,
4,367✔
674
                UpsertSignalRequestedIDs:  input.UpsertSignalRequestedIDs,
4,367✔
675
                DeleteSignalRequestedIDs:  input.DeleteSignalRequestedIDs,
4,367✔
676
                NewBufferedEvents:         serializedNewBufferedEvents,
4,367✔
677
                ClearBufferedEvents:       input.ClearBufferedEvents,
4,367✔
678

4,367✔
679
                TransferTasks:     input.TransferTasks,
4,367✔
680
                CrossClusterTasks: input.CrossClusterTasks,
4,367✔
681
                ReplicationTasks:  input.ReplicationTasks,
4,367✔
682
                TimerTasks:        input.TimerTasks,
4,367✔
683

4,367✔
684
                WorkflowRequests: input.WorkflowRequests,
4,367✔
685

4,367✔
686
                Condition:    input.Condition,
4,367✔
687
                Checksum:     input.Checksum,
4,367✔
688
                ChecksumData: checksumData,
4,367✔
689
        }, nil
4,367✔
690
}
691

692
func (m *executionManagerImpl) SerializeWorkflowSnapshot(
693
        input *WorkflowSnapshot,
694
        encoding common.EncodingType,
695
) (*InternalWorkflowSnapshot, error) {
694✔
696

694✔
697
        serializedExecutionInfo, err := m.SerializeExecutionInfo(
694✔
698
                input.ExecutionInfo,
694✔
699
                input.ExecutionStats,
694✔
700
                encoding,
694✔
701
        )
694✔
702
        if err != nil {
694✔
703
                return nil, err
×
704
        }
×
705
        serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
694✔
706
        if err != nil {
694✔
707
                return nil, err
×
708
        }
×
709
        serializedActivityInfos, err := m.SerializeUpsertActivityInfos(input.ActivityInfos, encoding)
694✔
710
        if err != nil {
694✔
711
                return nil, err
×
712
        }
×
713
        serializedChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.ChildExecutionInfos, encoding)
694✔
714
        if err != nil {
694✔
715
                return nil, err
×
716
        }
×
717

718
        startVersion, err := getStartVersion(input.VersionHistories)
694✔
719
        if err != nil {
694✔
720
                return nil, err
×
721
        }
×
722
        lastWriteVersion, err := getLastWriteVersion(input.VersionHistories)
694✔
723
        if err != nil {
694✔
724
                return nil, err
×
725
        }
×
726

727
        checksumData, err := m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON)
694✔
728
        if err != nil {
694✔
729
                return nil, err
×
730
        }
×
731

732
        return &InternalWorkflowSnapshot{
694✔
733
                ExecutionInfo:    serializedExecutionInfo,
694✔
734
                VersionHistories: serializedVersionHistories,
694✔
735
                StartVersion:     startVersion,
694✔
736
                LastWriteVersion: lastWriteVersion,
694✔
737

694✔
738
                ActivityInfos:       serializedActivityInfos,
694✔
739
                TimerInfos:          input.TimerInfos,
694✔
740
                ChildExecutionInfos: serializedChildExecutionInfos,
694✔
741
                RequestCancelInfos:  input.RequestCancelInfos,
694✔
742
                SignalInfos:         input.SignalInfos,
694✔
743
                SignalRequestedIDs:  input.SignalRequestedIDs,
694✔
744

694✔
745
                TransferTasks:     input.TransferTasks,
694✔
746
                CrossClusterTasks: input.CrossClusterTasks,
694✔
747
                ReplicationTasks:  input.ReplicationTasks,
694✔
748
                TimerTasks:        input.TimerTasks,
694✔
749

694✔
750
                WorkflowRequests: input.WorkflowRequests,
694✔
751

694✔
752
                Condition:    input.Condition,
694✔
753
                Checksum:     input.Checksum,
694✔
754
                ChecksumData: checksumData,
694✔
755
        }, nil
694✔
756
}
757

758
func (m *executionManagerImpl) SerializeVersionHistories(
759
        versionHistories *VersionHistories,
760
        encoding common.EncodingType,
761
) (*DataBlob, error) {
5,058✔
762

5,058✔
763
        if versionHistories == nil {
5,060✔
764
                return nil, nil
2✔
765
        }
2✔
766
        return m.serializer.SerializeVersionHistories(versionHistories.ToInternalType(), encoding)
5,056✔
767
}
768

769
func (m *executionManagerImpl) DeserializeVersionHistories(
770
        blob *DataBlob,
771
) (*VersionHistories, error) {
733✔
772

733✔
773
        if blob == nil {
734✔
774
                return nil, nil
1✔
775
        }
1✔
776
        versionHistories, err := m.serializer.DeserializeVersionHistories(blob)
732✔
777
        if err != nil {
732✔
778
                return nil, err
×
779
        }
×
780
        return NewVersionHistoriesFromInternalType(versionHistories), nil
732✔
781
}
782

783
func (m *executionManagerImpl) DeleteWorkflowExecution(
784
        ctx context.Context,
785
        request *DeleteWorkflowExecutionRequest,
786
) error {
55✔
787
        return m.persistence.DeleteWorkflowExecution(ctx, request)
55✔
788
}
55✔
789

790
func (m *executionManagerImpl) DeleteCurrentWorkflowExecution(
791
        ctx context.Context,
792
        request *DeleteCurrentWorkflowExecutionRequest,
793
) error {
55✔
794
        return m.persistence.DeleteCurrentWorkflowExecution(ctx, request)
55✔
795
}
55✔
796

797
func (m *executionManagerImpl) GetCurrentExecution(
798
        ctx context.Context,
799
        request *GetCurrentExecutionRequest,
800
) (*GetCurrentExecutionResponse, error) {
184✔
801
        return m.persistence.GetCurrentExecution(ctx, request)
184✔
802
}
184✔
803

804
func (m *executionManagerImpl) ListCurrentExecutions(
805
        ctx context.Context,
806
        request *ListCurrentExecutionsRequest,
807
) (*ListCurrentExecutionsResponse, error) {
1✔
808
        return m.persistence.ListCurrentExecutions(ctx, request)
1✔
809
}
1✔
810

811
func (m *executionManagerImpl) IsWorkflowExecutionExists(
812
        ctx context.Context,
813
        request *IsWorkflowExecutionExistsRequest,
814
) (*IsWorkflowExecutionExistsResponse, error) {
1✔
815
        return m.persistence.IsWorkflowExecutionExists(ctx, request)
1✔
816
}
1✔
817

818
func (m *executionManagerImpl) ListConcreteExecutions(
819
        ctx context.Context,
820
        request *ListConcreteExecutionsRequest,
821
) (*ListConcreteExecutionsResponse, error) {
×
822
        response, err := m.persistence.ListConcreteExecutions(ctx, request)
×
823
        if err != nil {
×
824
                return nil, err
×
825
        }
×
826
        newResponse := &ListConcreteExecutionsResponse{
×
827
                Executions: make([]*ListConcreteExecutionsEntity, len(response.Executions)),
×
828
                PageToken:  response.NextPageToken,
×
829
        }
×
830
        for i, e := range response.Executions {
×
831
                info, _, err := m.DeserializeExecutionInfo(e.ExecutionInfo)
×
832
                if err != nil {
×
833
                        return nil, err
×
834
                }
×
835
                vh, err := m.DeserializeVersionHistories(e.VersionHistories)
×
836
                if err != nil {
×
837
                        return nil, err
×
838
                }
×
839
                newResponse.Executions[i] = &ListConcreteExecutionsEntity{
×
840
                        ExecutionInfo:    info,
×
841
                        VersionHistories: vh,
×
842
                }
×
843
        }
844
        return newResponse, nil
×
845
}
846

847
// Transfer task related methods
848
func (m *executionManagerImpl) GetTransferTasks(
849
        ctx context.Context,
850
        request *GetTransferTasksRequest,
851
) (*GetTransferTasksResponse, error) {
2,294✔
852
        return m.persistence.GetTransferTasks(ctx, request)
2,294✔
853
}
2,294✔
854

855
func (m *executionManagerImpl) CompleteTransferTask(
856
        ctx context.Context,
857
        request *CompleteTransferTaskRequest,
858
) error {
1✔
859
        return m.persistence.CompleteTransferTask(ctx, request)
1✔
860
}
1✔
861

862
func (m *executionManagerImpl) RangeCompleteTransferTask(
863
        ctx context.Context,
864
        request *RangeCompleteTransferTaskRequest,
865
) (*RangeCompleteTransferTaskResponse, error) {
80✔
866
        return m.persistence.RangeCompleteTransferTask(ctx, request)
80✔
867
}
80✔
868

869
// Cross-cluster task related methods
870
func (m *executionManagerImpl) GetCrossClusterTasks(
871
        ctx context.Context,
872
        request *GetCrossClusterTasksRequest,
873
) (*GetCrossClusterTasksResponse, error) {
152✔
874
        return m.persistence.GetCrossClusterTasks(ctx, request)
152✔
875
}
152✔
876

877
func (m *executionManagerImpl) CompleteCrossClusterTask(
878
        ctx context.Context,
879
        request *CompleteCrossClusterTaskRequest,
880
) error {
1✔
881
        return m.persistence.CompleteCrossClusterTask(ctx, request)
1✔
882
}
1✔
883

884
func (m *executionManagerImpl) RangeCompleteCrossClusterTask(
885
        ctx context.Context,
886
        request *RangeCompleteCrossClusterTaskRequest,
887
) (*RangeCompleteCrossClusterTaskResponse, error) {
115✔
888
        return m.persistence.RangeCompleteCrossClusterTask(ctx, request)
115✔
889
}
115✔
890

891
// Replication task related methods
892
func (m *executionManagerImpl) GetReplicationTasks(
893
        ctx context.Context,
894
        request *GetReplicationTasksRequest,
895
) (*GetReplicationTasksResponse, error) {
92✔
896
        resp, err := m.persistence.GetReplicationTasks(ctx, request)
92✔
897
        if err != nil {
93✔
898
                return nil, err
1✔
899
        }
1✔
900

901
        return &GetReplicationTasksResponse{
91✔
902
                Tasks:         m.fromInternalReplicationTaskInfos(resp.Tasks),
91✔
903
                NextPageToken: resp.NextPageToken,
91✔
904
        }, nil
91✔
905
}
906

907
func (m *executionManagerImpl) CompleteReplicationTask(
908
        ctx context.Context,
909
        request *CompleteReplicationTaskRequest,
910
) error {
1✔
911
        return m.persistence.CompleteReplicationTask(ctx, request)
1✔
912
}
1✔
913

914
func (m *executionManagerImpl) RangeCompleteReplicationTask(
915
        ctx context.Context,
916
        request *RangeCompleteReplicationTaskRequest,
917
) (*RangeCompleteReplicationTaskResponse, error) {
101✔
918
        return m.persistence.RangeCompleteReplicationTask(ctx, request)
101✔
919
}
101✔
920

921
func (m *executionManagerImpl) PutReplicationTaskToDLQ(
922
        ctx context.Context,
923
        request *PutReplicationTaskToDLQRequest,
924
) error {
3✔
925
        internalRequest := &InternalPutReplicationTaskToDLQRequest{
3✔
926
                SourceClusterName: request.SourceClusterName,
3✔
927
                TaskInfo:          m.toInternalReplicationTaskInfo(request.TaskInfo),
3✔
928
        }
3✔
929
        return m.persistence.PutReplicationTaskToDLQ(ctx, internalRequest)
3✔
930
}
3✔
931

932
func (m *executionManagerImpl) GetReplicationTasksFromDLQ(
933
        ctx context.Context,
934
        request *GetReplicationTasksFromDLQRequest,
935
) (*GetReplicationTasksFromDLQResponse, error) {
3✔
936
        resp, err := m.persistence.GetReplicationTasksFromDLQ(ctx, request)
3✔
937
        if err != nil {
3✔
938
                return nil, err
×
939
        }
×
940
        return &GetReplicationTasksFromDLQResponse{
3✔
941
                Tasks:         m.fromInternalReplicationTaskInfos(resp.Tasks),
3✔
942
                NextPageToken: resp.NextPageToken,
3✔
943
        }, nil
3✔
944
}
945

946
func (m *executionManagerImpl) GetReplicationDLQSize(
947
        ctx context.Context,
948
        request *GetReplicationDLQSizeRequest,
949
) (*GetReplicationDLQSizeResponse, error) {
13✔
950
        return m.persistence.GetReplicationDLQSize(ctx, request)
13✔
951
}
13✔
952

953
func (m *executionManagerImpl) DeleteReplicationTaskFromDLQ(
954
        ctx context.Context,
955
        request *DeleteReplicationTaskFromDLQRequest,
956
) error {
1✔
957
        return m.persistence.DeleteReplicationTaskFromDLQ(ctx, request)
1✔
958
}
1✔
959

960
func (m *executionManagerImpl) RangeDeleteReplicationTaskFromDLQ(
961
        ctx context.Context,
962
        request *RangeDeleteReplicationTaskFromDLQRequest,
963
) (*RangeDeleteReplicationTaskFromDLQResponse, error) {
1✔
964
        return m.persistence.RangeDeleteReplicationTaskFromDLQ(ctx, request)
1✔
965
}
1✔
966

967
func (m *executionManagerImpl) CreateFailoverMarkerTasks(
968
        ctx context.Context,
969
        request *CreateFailoverMarkersRequest,
970
) error {
1✔
971
        return m.persistence.CreateFailoverMarkerTasks(ctx, request)
1✔
972
}
1✔
973

974
// Timer related methods.
975
func (m *executionManagerImpl) GetTimerIndexTasks(
976
        ctx context.Context,
977
        request *GetTimerIndexTasksRequest,
978
) (*GetTimerIndexTasksResponse, error) {
3,125✔
979
        return m.persistence.GetTimerIndexTasks(ctx, request)
3,125✔
980
}
3,125✔
981

982
func (m *executionManagerImpl) CompleteTimerTask(
983
        ctx context.Context,
984
        request *CompleteTimerTaskRequest,
985
) error {
1✔
986
        return m.persistence.CompleteTimerTask(ctx, request)
1✔
987
}
1✔
988

989
func (m *executionManagerImpl) RangeCompleteTimerTask(
990
        ctx context.Context,
991
        request *RangeCompleteTimerTaskRequest,
992
) (*RangeCompleteTimerTaskResponse, error) {
27✔
993
        return m.persistence.RangeCompleteTimerTask(ctx, request)
27✔
994
}
27✔
995

996
func (m *executionManagerImpl) Close() {
64✔
997
        m.persistence.Close()
64✔
998
}
64✔
999

1000
func (m *executionManagerImpl) fromInternalReplicationTaskInfos(internalInfos []*InternalReplicationTaskInfo) []*ReplicationTaskInfo {
92✔
1001
        if internalInfos == nil {
183✔
1002
                return nil
91✔
1003
        }
91✔
1004
        infos := make([]*ReplicationTaskInfo, len(internalInfos))
4✔
1005
        for i := 0; i < len(internalInfos); i++ {
9✔
1006
                infos[i] = m.fromInternalReplicationTaskInfo(internalInfos[i])
5✔
1007
        }
5✔
1008
        return infos
4✔
1009
}
1010

1011
func (m *executionManagerImpl) fromInternalReplicationTaskInfo(internalInfo *InternalReplicationTaskInfo) *ReplicationTaskInfo {
5✔
1012
        if internalInfo == nil {
5✔
1013
                return nil
×
1014
        }
×
1015
        return &ReplicationTaskInfo{
5✔
1016
                DomainID:          internalInfo.DomainID,
5✔
1017
                WorkflowID:        internalInfo.WorkflowID,
5✔
1018
                RunID:             internalInfo.RunID,
5✔
1019
                TaskID:            internalInfo.TaskID,
5✔
1020
                TaskType:          internalInfo.TaskType,
5✔
1021
                FirstEventID:      internalInfo.FirstEventID,
5✔
1022
                NextEventID:       internalInfo.NextEventID,
5✔
1023
                Version:           internalInfo.Version,
5✔
1024
                ScheduledID:       internalInfo.ScheduledID,
5✔
1025
                BranchToken:       internalInfo.BranchToken,
5✔
1026
                NewRunBranchToken: internalInfo.NewRunBranchToken,
5✔
1027
                CreationTime:      internalInfo.CreationTime.UnixNano(),
5✔
1028
        }
5✔
1029
}
1030

1031
func (m *executionManagerImpl) toInternalReplicationTaskInfo(info *ReplicationTaskInfo) *InternalReplicationTaskInfo {
3✔
1032
        if info == nil {
3✔
1033
                return nil
×
1034
        }
×
1035
        return &InternalReplicationTaskInfo{
3✔
1036
                DomainID:          info.DomainID,
3✔
1037
                WorkflowID:        info.WorkflowID,
3✔
1038
                RunID:             info.RunID,
3✔
1039
                TaskID:            info.TaskID,
3✔
1040
                TaskType:          info.TaskType,
3✔
1041
                FirstEventID:      info.FirstEventID,
3✔
1042
                NextEventID:       info.NextEventID,
3✔
1043
                Version:           info.Version,
3✔
1044
                ScheduledID:       info.ScheduledID,
3✔
1045
                BranchToken:       info.BranchToken,
3✔
1046
                NewRunBranchToken: info.NewRunBranchToken,
3✔
1047
                CreationTime:      time.Unix(0, info.CreationTime),
3✔
1048
        }
3✔
1049
}
1050

1051
func getStartVersion(
1052
        versionHistories *VersionHistories,
1053
) (int64, error) {
5,058✔
1054

5,058✔
1055
        if versionHistories == nil {
5,060✔
1056
                return common.EmptyVersion, nil
2✔
1057
        }
2✔
1058

1059
        versionHistory, err := versionHistories.GetCurrentVersionHistory()
5,056✔
1060
        if err != nil {
5,056✔
1061
                return 0, err
×
1062
        }
×
1063
        versionHistoryItem, err := versionHistory.GetFirstItem()
5,056✔
1064
        if err != nil {
5,056✔
1065
                return 0, err
×
1066
        }
×
1067
        return versionHistoryItem.Version, nil
5,056✔
1068
}
1069

1070
func getLastWriteVersion(
1071
        versionHistories *VersionHistories,
1072
) (int64, error) {
5,058✔
1073

5,058✔
1074
        if versionHistories == nil {
5,060✔
1075
                return common.EmptyVersion, nil
2✔
1076
        }
2✔
1077

1078
        versionHistory, err := versionHistories.GetCurrentVersionHistory()
5,056✔
1079
        if err != nil {
5,056✔
1080
                return 0, err
×
1081
        }
×
1082
        versionHistoryItem, err := versionHistory.GetLastItem()
5,056✔
1083
        if err != nil {
5,056✔
1084
                return 0, err
×
1085
        }
×
1086
        return versionHistoryItem.Version, nil
5,056✔
1087
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc